[19/22] Put Stream and Message classes in separate files

Submitted by Christophe de Dinechin on Feb. 28, 2018, 3:43 p.m.

Details

Message ID 20180228154325.25791-20-christophe@dinechin.org
State New
Headers show
Series "streaming-agent: C++ refactoring" ( rev: 2 1 ) in Spice

Not browsing as part of any series.

Commit Message

Christophe de Dinechin Feb. 28, 2018, 3:43 p.m.
From: Christophe de Dinechin <dinechin@redhat.com>

Doing this change will make it possible to move the capture loop to the
concrete-agent.cpp file.

Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
---
 include/spice-streaming-agent/errors.hpp |   2 +
 src/Makefile.am                          |   2 +
 src/message.hpp                          |  41 ++++++
 src/spice-streaming-agent.cpp            | 209 +------------------------------
 src/stream.cpp                           | 172 +++++++++++++++++++++++++
 src/stream.hpp                           |  55 ++++++++
 6 files changed, 276 insertions(+), 205 deletions(-)
 create mode 100644 src/message.hpp
 create mode 100644 src/stream.cpp
 create mode 100644 src/stream.hpp

Patch hide | download patch | download mbox

diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
index 870a0fd..62ae010 100644
--- a/include/spice-streaming-agent/errors.hpp
+++ b/include/spice-streaming-agent/errors.hpp
@@ -90,4 +90,6 @@  protected:
 
 }} // namespace spice::streaming_agent
 
+extern bool quit_requested;
+
 #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
diff --git a/src/Makefile.am b/src/Makefile.am
index 2507844..923a103 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,5 +55,7 @@  spice_streaming_agent_SOURCES = \
 	mjpeg-fallback.hpp \
 	jpeg.cpp \
 	jpeg.hpp \
+	stream.cpp \
+	stream.hpp \
 	errors.cpp \
 	$(NULL)
diff --git a/src/message.hpp b/src/message.hpp
new file mode 100644
index 0000000..28b3e28
--- /dev/null
+++ b/src/message.hpp
@@ -0,0 +1,41 @@ 
+/* Formatting messages
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
+#define SPICE_STREAMING_AGENT_MESSAGE_HPP
+
+#include <spice/stream-device.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+template <typename Payload, typename Info, unsigned Type>
+class Message
+{
+public:
+    template <typename ...PayloadArgs>
+    Message(PayloadArgs... payload)
+        : hdr(StreamDevHeader {
+              .protocol_version = STREAM_DEVICE_PROTOCOL,
+              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
+              .type = Type,
+              .size = (uint32_t) Info::size(payload...)
+          })
+    { }
+    void write_header(Stream &stream)
+    {
+        stream.write_all("header", &hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr;
+    typedef Payload payload_t;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 35e65bb..c401a34 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -5,6 +5,8 @@ 
  */
 
 #include "concrete-agent.hpp"
+#include "stream.hpp"
+#include "message.hpp"
 #include "hexdump.h"
 #include "mjpeg-fallback.hpp"
 
@@ -21,11 +23,9 @@ 
 #include <inttypes.h>
 #include <string.h>
 #include <getopt.h>
-#include <unistd.h>
 #include <errno.h>
-#include <fcntl.h>
+#include <unistd.h>
 #include <sys/time.h>
-#include <poll.h>
 #include <syslog.h>
 #include <signal.h>
 #include <exception>
@@ -57,76 +57,6 @@  static uint64_t get_time(void)
 
 }
 
-class Stream
-{
-    typedef std::set<SpiceVideoCodecType> codecs_t;
-
-public:
-    Stream(const char *name)
-        : codecs()
-    {
-        streamfd = open(name, O_RDWR);
-        if (streamfd < 0) {
-            throw IOError("failed to open streaming device", errno);
-        }
-    }
-    ~Stream()
-    {
-        close(streamfd);
-    }
-
-    const codecs_t &client_codecs() { return codecs; }
-    bool streaming_requested() { return is_streaming; }
-
-    template <typename Message, typename ...PayloadArgs>
-    void send(PayloadArgs... payload)
-    {
-        Message message(payload...);
-        std::lock_guard<std::mutex> stream_guard(mutex);
-        message.write_header(*this);
-        message.write_message_body(*this, payload...);
-    }
-
-    int read_command(bool blocking);
-    void write_all(const char *operation, const void *buf, const size_t len);
-
-private:
-    int have_something_to_read(int timeout);
-    void handle_stream_start_stop(uint32_t len);
-    void handle_stream_capabilities(uint32_t len);
-    void handle_stream_error(uint32_t len);
-    void read_command_from_device(void);
-
-private:
-    std::mutex mutex;
-    codecs_t codecs;
-    int streamfd = -1;
-    bool is_streaming = false;
-};
-
-template <typename Payload, typename Info, unsigned Type>
-class Message
-{
-public:
-    template <typename ...PayloadArgs>
-    Message(PayloadArgs... payload)
-        : hdr(StreamDevHeader {
-              .protocol_version = STREAM_DEVICE_PROTOCOL,
-              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
-              .type = Type,
-              .size = (uint32_t) Info::size(payload...)
-          })
-    { }
-    void write_header(Stream &stream)
-    {
-        stream.write_all("header", &hdr, sizeof(hdr));
-    }
-
-protected:
-    StreamDevHeader hdr;
-    typedef Payload payload_t;
-};
-
 class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
 {
 public:
@@ -156,20 +86,6 @@  public:
     }
 };
 
-class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
-{
-public:
-    CapabilitiesMessage() : Message() {}
-    static size_t size()
-    {
-        return sizeof(payload_t);
-    }
-    void write_message_body(Stream &stream)
-    {
-        /* No body for capabilities message */
-    }
-};
-
 class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
 {
 public:
@@ -329,124 +245,7 @@  X11CursorThread::X11CursorThread(Stream &stream)
 
 }} // namespace spice::streaming_agent
 
-static bool quit_requested = false;
-
-int Stream::have_something_to_read(int timeout)
-{
-    struct pollfd pollfd = {streamfd, POLLIN, 0};
-
-    if (poll(&pollfd, 1, timeout) < 0) {
-        syslog(LOG_ERR, "poll FAILED\n");
-        return -1;
-    }
-
-    if (pollfd.revents == POLLIN) {
-        return 1;
-    }
-
-    return 0;
-}
-
-void Stream::handle_stream_start_stop(uint32_t len)
-{
-    uint8_t msg[256];
-
-    if (len >= sizeof(msg)) {
-        throw MessageDataError("message is too long", len, sizeof(msg));
-    }
-    int n = read(streamfd, &msg, len);
-    if (n != (int) len) {
-        throw MessageDataError("read start/stop command from device failed", n, len, errno);
-    }
-    is_streaming = (msg[0] != 0); /* num_codecs */
-    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
-           is_streaming ? "START" : "STOP");
-    codecs.clear();
-    for (int i = 1; i <= msg[0]; ++i) {
-        codecs.insert((SpiceVideoCodecType) msg[i]);
-    }
-}
-
-void Stream::handle_stream_capabilities(uint32_t len)
-{
-    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
-
-    if (len > sizeof(caps)) {
-        throw MessageDataError("capability message too long", len, sizeof(caps));
-    }
-    int n = read(streamfd, caps, len);
-    if (n != (int) len) {
-        throw MessageDataError("read capabilities from device failed", n, len, errno);
-    }
-
-    // we currently do not support extensions so just reply so
-    send<CapabilitiesMessage>();
-}
-
-void Stream::handle_stream_error(uint32_t len)
-{
-    // TODO read message and use it
-    throw ProtocolError("got an error message from server");
-}
-
-void Stream::read_command_from_device()
-{
-    StreamDevHeader hdr;
-    int n;
-
-    std::lock_guard<std::mutex> stream_guard(mutex);
-    n = read(streamfd, &hdr, sizeof(hdr));
-    if (n != sizeof(hdr)) {
-        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
-    }
-    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
-        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
-    }
-
-    switch (hdr.type) {
-    case STREAM_TYPE_CAPABILITIES:
-        return handle_stream_capabilities(hdr.size);
-    case STREAM_TYPE_NOTIFY_ERROR:
-        return handle_stream_error(hdr.size);
-    case STREAM_TYPE_START_STOP:
-        return handle_stream_start_stop(hdr.size);
-    }
-    throw MessageDataError("unknown message type", hdr.type, 0);
-}
-
-int Stream::read_command(bool blocking)
-{
-    int timeout = blocking?-1:0;
-    while (!quit_requested) {
-        if (!have_something_to_read(timeout)) {
-            if (!blocking) {
-                return 0;
-            }
-            sleep(1);
-            continue;
-        }
-        read_command_from_device();
-        break;
-    }
-
-    return 1;
-}
-
-void Stream::write_all(const char *operation, const void *buf, const size_t len)
-{
-    size_t written = 0;
-    while (written < len) {
-        int l = write(streamfd, (const char *) buf + written, len - written);
-        if (l < 0) {
-            if (errno == EINTR) {
-                continue;
-            }
-            throw WriteError("write failed", operation, errno).syslog();
-        }
-        written += l;
-    }
-    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
-}
+bool quit_requested = false;
 
 static void handle_interrupt(int intr)
 {
diff --git a/src/stream.cpp b/src/stream.cpp
new file mode 100644
index 0000000..f756097
--- /dev/null
+++ b/src/stream.cpp
@@ -0,0 +1,172 @@ 
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+
+#include "stream.hpp"
+#include "message.hpp"
+
+#include <spice/stream-device.h>
+
+#include <spice-streaming-agent/errors.hpp>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <syslog.h>
+#include <unistd.h>
+
+namespace spice
+{
+namespace streaming_agent
+{
+
+class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
+                                           STREAM_TYPE_CAPABILITIES>
+{
+public:
+    CapabilitiesMessage() : Message() {}
+    static size_t size()
+    {
+        return sizeof(payload_t);
+    }
+    void write_message_body(Stream &stream)
+    {
+        /* No body for capabilities message */
+    }
+};
+
+Stream::Stream(const char *name)
+    : codecs()
+{
+    streamfd = open(name, O_RDWR);
+    if (streamfd < 0) {
+        throw IOError("failed to open streaming device", errno);
+    }
+}
+
+Stream::~Stream()
+{
+    close(streamfd);
+}
+
+int Stream::have_something_to_read(int timeout)
+{
+    struct pollfd pollfd = {streamfd, POLLIN, 0};
+
+    if (poll(&pollfd, 1, timeout) < 0) {
+        syslog(LOG_ERR, "poll FAILED\n");
+        return -1;
+    }
+
+    if (pollfd.revents == POLLIN) {
+        return 1;
+    }
+
+    return 0;
+}
+
+void Stream::handle_stream_start_stop(uint32_t len)
+{
+    uint8_t msg[256];
+
+    if (len >= sizeof(msg)) {
+        throw MessageDataError("message is too long", len, sizeof(msg));
+    }
+    int n = read(streamfd, &msg, len);
+    if (n != (int) len) {
+        throw MessageDataError("read start/stop command from device failed", n, len, errno);
+    }
+    is_streaming = (msg[0] != 0); /* num_codecs */
+    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
+           is_streaming ? "START" : "STOP");
+    codecs.clear();
+    for (int i = 1; i <= msg[0]; ++i) {
+        codecs.insert((SpiceVideoCodecType) msg[i]);
+    }
+}
+
+void Stream::handle_stream_capabilities(uint32_t len)
+{
+    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
+
+    if (len > sizeof(caps)) {
+        throw MessageDataError("capability message too long", len, sizeof(caps));
+    }
+    int n = read(streamfd, caps, len);
+    if (n != (int) len) {
+        throw MessageDataError("read capabilities from device failed", n, len, errno);
+    }
+
+    // we currently do not support extensions so just reply so
+    send<CapabilitiesMessage>();
+}
+
+void Stream::handle_stream_error(uint32_t len)
+{
+    // TODO read message and use it
+    throw ProtocolError("got an error message from server");
+}
+
+void Stream::read_command_from_device()
+{
+    StreamDevHeader hdr;
+    int n;
+
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    n = read(streamfd, &hdr, sizeof(hdr));
+    if (n != sizeof(hdr)) {
+        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
+    }
+    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
+        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
+    }
+
+    switch (hdr.type) {
+    case STREAM_TYPE_CAPABILITIES:
+        return handle_stream_capabilities(hdr.size);
+    case STREAM_TYPE_NOTIFY_ERROR:
+        return handle_stream_error(hdr.size);
+    case STREAM_TYPE_START_STOP:
+        return handle_stream_start_stop(hdr.size);
+    }
+    throw MessageDataError("unknown message type", hdr.type, 0);
+}
+
+int Stream::read_command(bool blocking)
+{
+    int timeout = blocking?-1:0;
+    while (!quit_requested) {
+        if (!have_something_to_read(timeout)) {
+            if (!blocking) {
+                return 0;
+            }
+            sleep(1);
+            continue;
+        }
+        read_command_from_device();
+        break;
+    }
+
+    return 1;
+}
+
+void Stream::write_all(const char *operation, const void *buf, const size_t len)
+{
+    size_t written = 0;
+    while (written < len) {
+        int l = write(streamfd, (const char *) buf + written, len - written);
+        if (l < 0) {
+            if (errno == EINTR) {
+                continue;
+            }
+            throw WriteError("write failed", operation, errno).syslog();
+        }
+        written += l;
+    }
+    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
+}
+
+}} // namespace spice::streaming_agent
diff --git a/src/stream.hpp b/src/stream.hpp
new file mode 100644
index 0000000..b689f36
--- /dev/null
+++ b/src/stream.hpp
@@ -0,0 +1,55 @@ 
+/* Encapsulation of the stream used to communicate between agent and server
+ *
+ * \copyright
+ * Copyright 2018 Red Hat Inc. All rights reserved.
+ */
+#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
+#define SPICE_STREAMING_AGENT_STREAM_HPP
+
+#include <spice/enums.h>
+#include <set>
+#include <mutex>
+
+namespace spice {
+namespace streaming_agent {
+
+class Stream
+{
+    typedef std::set<SpiceVideoCodecType> codecs_t;
+
+public:
+    Stream(const char *name);
+    ~Stream();
+
+    const codecs_t &client_codecs() { return codecs; }
+    bool streaming_requested() { return is_streaming; }
+
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs... payload)
+    {
+        Message message(payload...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload...);
+    }
+
+    int read_command(bool blocking);
+    void write_all(const char *operation, const void *buf, const size_t len);
+
+private:
+    int have_something_to_read(int timeout);
+    void handle_stream_start_stop(uint32_t len);
+    void handle_stream_capabilities(uint32_t len);
+    void handle_stream_error(uint32_t len);
+    void read_command_from_device(void);
+
+private:
+    std::mutex mutex;
+    codecs_t codecs;
+    int streamfd = -1;
+    bool is_streaming = false;
+};
+
+}} // namespace spice::streaming_agent
+
+#endif // SPICE_STREAMING_AGENT_ERRORS_HPP

Comments

On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> From: Christophe de Dinechin <dinechin@redhat.com>
> 
> Doing this change will make it possible to move the capture loop to the
> concrete-agent.cpp file.
> 
> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> ---
>  include/spice-streaming-agent/errors.hpp |   2 +
>  src/Makefile.am                          |   2 +
>  src/message.hpp                          |  41 ++++++
>  src/spice-streaming-agent.cpp            | 209 +------------------------------
>  src/stream.cpp                           | 172 +++++++++++++++++++++++++
>  src/stream.hpp                           |  55 ++++++++
>  6 files changed, 276 insertions(+), 205 deletions(-)
>  create mode 100644 src/message.hpp
>  create mode 100644 src/stream.cpp
>  create mode 100644 src/stream.hpp
> 
> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> index 870a0fd..62ae010 100644
> --- a/include/spice-streaming-agent/errors.hpp
> +++ b/include/spice-streaming-agent/errors.hpp
> @@ -90,4 +90,6 @@ protected:
>  
>  }} // namespace spice::streaming_agent
>  
> +extern bool quit_requested;

Putting quit_requested into errors.hpp? Why?

> +
>  #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 2507844..923a103 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>  	mjpeg-fallback.hpp \
>  	jpeg.cpp \
>  	jpeg.hpp \
> +	stream.cpp \
> +	stream.hpp \
>  	errors.cpp \
>  	$(NULL)
> diff --git a/src/message.hpp b/src/message.hpp
> new file mode 100644
> index 0000000..28b3e28
> --- /dev/null
> +++ b/src/message.hpp
> @@ -0,0 +1,41 @@
> +/* Formatting messages
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> +
> +#include <spice/stream-device.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +template <typename Payload, typename Info, unsigned Type>
> +class Message
> +{
> +public:
> +    template <typename ...PayloadArgs>
> +    Message(PayloadArgs... payload)
> +        : hdr(StreamDevHeader {
> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> +              .type = Type,
> +              .size = (uint32_t) Info::size(payload...)
> +          })
> +    { }
> +    void write_header(Stream &stream)
> +    {
> +        stream.write_all("header", &hdr, sizeof(hdr));
> +    }
> +
> +protected:
> +    StreamDevHeader hdr;
> +    typedef Payload payload_t;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 35e65bb..c401a34 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -5,6 +5,8 @@
>   */
>  
>  #include "concrete-agent.hpp"
> +#include "stream.hpp"
> +#include "message.hpp"
>  #include "hexdump.h"
>  #include "mjpeg-fallback.hpp"
>  
> @@ -21,11 +23,9 @@
>  #include <inttypes.h>
>  #include <string.h>
>  #include <getopt.h>
> -#include <unistd.h>
>  #include <errno.h>
> -#include <fcntl.h>
> +#include <unistd.h>
>  #include <sys/time.h>
> -#include <poll.h>
>  #include <syslog.h>
>  #include <signal.h>
>  #include <exception>
> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>  
>  }
>  
> -class Stream
> -{
> -    typedef std::set<SpiceVideoCodecType> codecs_t;
> -
> -public:
> -    Stream(const char *name)
> -        : codecs()
> -    {
> -        streamfd = open(name, O_RDWR);
> -        if (streamfd < 0) {
> -            throw IOError("failed to open streaming device", errno);
> -        }
> -    }
> -    ~Stream()
> -    {
> -        close(streamfd);
> -    }
> -
> -    const codecs_t &client_codecs() { return codecs; }
> -    bool streaming_requested() { return is_streaming; }
> -
> -    template <typename Message, typename ...PayloadArgs>
> -    void send(PayloadArgs... payload)
> -    {
> -        Message message(payload...);
> -        std::lock_guard<std::mutex> stream_guard(mutex);
> -        message.write_header(*this);
> -        message.write_message_body(*this, payload...);
> -    }
> -
> -    int read_command(bool blocking);
> -    void write_all(const char *operation, const void *buf, const size_t len);
> -
> -private:
> -    int have_something_to_read(int timeout);
> -    void handle_stream_start_stop(uint32_t len);
> -    void handle_stream_capabilities(uint32_t len);
> -    void handle_stream_error(uint32_t len);
> -    void read_command_from_device(void);
> -
> -private:
> -    std::mutex mutex;
> -    codecs_t codecs;
> -    int streamfd = -1;
> -    bool is_streaming = false;
> -};
> -
> -template <typename Payload, typename Info, unsigned Type>
> -class Message
> -{
> -public:
> -    template <typename ...PayloadArgs>
> -    Message(PayloadArgs... payload)
> -        : hdr(StreamDevHeader {
> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> -              .type = Type,
> -              .size = (uint32_t) Info::size(payload...)
> -          })
> -    { }
> -    void write_header(Stream &stream)
> -    {
> -        stream.write_all("header", &hdr, sizeof(hdr));
> -    }
> -
> -protected:
> -    StreamDevHeader hdr;
> -    typedef Payload payload_t;
> -};
> -
>  class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>  {
>  public:
> @@ -156,20 +86,6 @@ public:
>      }
>  };
>  
> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> -{
> -public:
> -    CapabilitiesMessage() : Message() {}
> -    static size_t size()
> -    {
> -        return sizeof(payload_t);
> -    }
> -    void write_message_body(Stream &stream)
> -    {
> -        /* No body for capabilities message */
> -    }
> -};
> -
>  class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>  {
>  public:
> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>  
>  }} // namespace spice::streaming_agent
>  
> -static bool quit_requested = false;
> -
> -int Stream::have_something_to_read(int timeout)
> -{
> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> -
> -    if (poll(&pollfd, 1, timeout) < 0) {
> -        syslog(LOG_ERR, "poll FAILED\n");
> -        return -1;
> -    }
> -
> -    if (pollfd.revents == POLLIN) {
> -        return 1;
> -    }
> -
> -    return 0;
> -}
> -
> -void Stream::handle_stream_start_stop(uint32_t len)
> -{
> -    uint8_t msg[256];
> -
> -    if (len >= sizeof(msg)) {
> -        throw MessageDataError("message is too long", len, sizeof(msg));
> -    }
> -    int n = read(streamfd, &msg, len);
> -    if (n != (int) len) {
> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> -    }
> -    is_streaming = (msg[0] != 0); /* num_codecs */
> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> -           is_streaming ? "START" : "STOP");
> -    codecs.clear();
> -    for (int i = 1; i <= msg[0]; ++i) {
> -        codecs.insert((SpiceVideoCodecType) msg[i]);
> -    }
> -}
> -
> -void Stream::handle_stream_capabilities(uint32_t len)
> -{
> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> -
> -    if (len > sizeof(caps)) {
> -        throw MessageDataError("capability message too long", len, sizeof(caps));
> -    }
> -    int n = read(streamfd, caps, len);
> -    if (n != (int) len) {
> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> -    }
> -
> -    // we currently do not support extensions so just reply so
> -    send<CapabilitiesMessage>();
> -}
> -
> -void Stream::handle_stream_error(uint32_t len)
> -{
> -    // TODO read message and use it
> -    throw ProtocolError("got an error message from server");
> -}
> -
> -void Stream::read_command_from_device()
> -{
> -    StreamDevHeader hdr;
> -    int n;
> -
> -    std::lock_guard<std::mutex> stream_guard(mutex);
> -    n = read(streamfd, &hdr, sizeof(hdr));
> -    if (n != sizeof(hdr)) {
> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> -    }
> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> -    }
> -
> -    switch (hdr.type) {
> -    case STREAM_TYPE_CAPABILITIES:
> -        return handle_stream_capabilities(hdr.size);
> -    case STREAM_TYPE_NOTIFY_ERROR:
> -        return handle_stream_error(hdr.size);
> -    case STREAM_TYPE_START_STOP:
> -        return handle_stream_start_stop(hdr.size);
> -    }
> -    throw MessageDataError("unknown message type", hdr.type, 0);
> -}
> -
> -int Stream::read_command(bool blocking)
> -{
> -    int timeout = blocking?-1:0;
> -    while (!quit_requested) {
> -        if (!have_something_to_read(timeout)) {
> -            if (!blocking) {
> -                return 0;
> -            }
> -            sleep(1);
> -            continue;
> -        }
> -        read_command_from_device();
> -        break;
> -    }
> -
> -    return 1;
> -}
> -
> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> -{
> -    size_t written = 0;
> -    while (written < len) {
> -        int l = write(streamfd, (const char *) buf + written, len - written);
> -        if (l < 0) {
> -            if (errno == EINTR) {
> -                continue;
> -            }
> -            throw WriteError("write failed", operation, errno).syslog();
> -        }
> -        written += l;
> -    }
> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> -}
> +bool quit_requested = false;
>  
>  static void handle_interrupt(int intr)
>  {
> diff --git a/src/stream.cpp b/src/stream.cpp
> new file mode 100644
> index 0000000..f756097
> --- /dev/null
> +++ b/src/stream.cpp
> @@ -0,0 +1,172 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +
> +#include "stream.hpp"
> +#include "message.hpp"
> +
> +#include <spice/stream-device.h>
> +
> +#include <spice-streaming-agent/errors.hpp>
> +
> +#include <sys/types.h>
> +#include <sys/stat.h>
> +#include <fcntl.h>
> +#include <poll.h>
> +#include <syslog.h>
> +#include <unistd.h>
> +
> +namespace spice
> +{
> +namespace streaming_agent
> +{
> +
> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> +                                           STREAM_TYPE_CAPABILITIES>
> +{
> +public:
> +    CapabilitiesMessage() : Message() {}
> +    static size_t size()
> +    {
> +        return sizeof(payload_t);
> +    }
> +    void write_message_body(Stream &stream)
> +    {
> +        /* No body for capabilities message */
> +    }
> +};

Not sure I like scattering the messages across source files that happen
to use them, though I suppose you did it because each message (like the
X11Cursor) may require different header files included? Perhaps it is
the way to go...

> +
> +Stream::Stream(const char *name)
> +    : codecs()
> +{
> +    streamfd = open(name, O_RDWR);
> +    if (streamfd < 0) {
> +        throw IOError("failed to open streaming device", errno);
> +    }
> +}
> +
> +Stream::~Stream()
> +{
> +    close(streamfd);
> +}
> +
> +int Stream::have_something_to_read(int timeout)
> +{
> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> +
> +    if (poll(&pollfd, 1, timeout) < 0) {
> +        syslog(LOG_ERR, "poll FAILED\n");
> +        return -1;
> +    }
> +
> +    if (pollfd.revents == POLLIN) {
> +        return 1;
> +    }
> +
> +    return 0;
> +}
> +
> +void Stream::handle_stream_start_stop(uint32_t len)
> +{
> +    uint8_t msg[256];
> +
> +    if (len >= sizeof(msg)) {
> +        throw MessageDataError("message is too long", len, sizeof(msg));
> +    }
> +    int n = read(streamfd, &msg, len);
> +    if (n != (int) len) {
> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> +    }
> +    is_streaming = (msg[0] != 0); /* num_codecs */
> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> +           is_streaming ? "START" : "STOP");
> +    codecs.clear();
> +    for (int i = 1; i <= msg[0]; ++i) {
> +        codecs.insert((SpiceVideoCodecType) msg[i]);
> +    }
> +}
> +
> +void Stream::handle_stream_capabilities(uint32_t len)
> +{
> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> +
> +    if (len > sizeof(caps)) {
> +        throw MessageDataError("capability message too long", len, sizeof(caps));
> +    }
> +    int n = read(streamfd, caps, len);
> +    if (n != (int) len) {
> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> +    }
> +
> +    // we currently do not support extensions so just reply so
> +    send<CapabilitiesMessage>();
> +}
> +
> +void Stream::handle_stream_error(uint32_t len)
> +{
> +    // TODO read message and use it
> +    throw ProtocolError("got an error message from server");
> +}
> +
> +void Stream::read_command_from_device()
> +{
> +    StreamDevHeader hdr;
> +    int n;
> +
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    n = read(streamfd, &hdr, sizeof(hdr));
> +    if (n != sizeof(hdr)) {
> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> +    }
> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> +    }
> +
> +    switch (hdr.type) {
> +    case STREAM_TYPE_CAPABILITIES:
> +        return handle_stream_capabilities(hdr.size);
> +    case STREAM_TYPE_NOTIFY_ERROR:
> +        return handle_stream_error(hdr.size);
> +    case STREAM_TYPE_START_STOP:
> +        return handle_stream_start_stop(hdr.size);
> +    }
> +    throw MessageDataError("unknown message type", hdr.type, 0);
> +}
> +
> +int Stream::read_command(bool blocking)
> +{
> +    int timeout = blocking?-1:0;
> +    while (!quit_requested) {
> +        if (!have_something_to_read(timeout)) {
> +            if (!blocking) {
> +                return 0;
> +            }
> +            sleep(1);
> +            continue;
> +        }
> +        read_command_from_device();
> +        break;
> +    }
> +
> +    return 1;
> +}
> +
> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> +{
> +    size_t written = 0;
> +    while (written < len) {
> +        int l = write(streamfd, (const char *) buf + written, len - written);
> +        if (l < 0) {
> +            if (errno == EINTR) {
> +                continue;
> +            }
> +            throw WriteError("write failed", operation, errno).syslog();
> +        }
> +        written += l;
> +    }
> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> +}
> +
> +}} // namespace spice::streaming_agent
> diff --git a/src/stream.hpp b/src/stream.hpp
> new file mode 100644
> index 0000000..b689f36
> --- /dev/null
> +++ b/src/stream.hpp
> @@ -0,0 +1,55 @@
> +/* Encapsulation of the stream used to communicate between agent and server
> + *
> + * \copyright
> + * Copyright 2018 Red Hat Inc. All rights reserved.
> + */
> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> +#define SPICE_STREAMING_AGENT_STREAM_HPP
> +
> +#include <spice/enums.h>
> +#include <set>
> +#include <mutex>
> +
> +namespace spice {
> +namespace streaming_agent {
> +
> +class Stream
> +{
> +    typedef std::set<SpiceVideoCodecType> codecs_t;
> +
> +public:
> +    Stream(const char *name);
> +    ~Stream();
> +
> +    const codecs_t &client_codecs() { return codecs; }
> +    bool streaming_requested() { return is_streaming; }
> +
> +    template <typename Message, typename ...PayloadArgs>
> +    void send(PayloadArgs... payload)
> +    {
> +        Message message(payload...);
> +        std::lock_guard<std::mutex> stream_guard(mutex);
> +        message.write_header(*this);
> +        message.write_message_body(*this, payload...);
> +    }
> +
> +    int read_command(bool blocking);
> +    void write_all(const char *operation, const void *buf, const size_t len);
> +
> +private:
> +    int have_something_to_read(int timeout);
> +    void handle_stream_start_stop(uint32_t len);
> +    void handle_stream_capabilities(uint32_t len);
> +    void handle_stream_error(uint32_t len);
> +    void read_command_from_device(void);
> +
> +private:
> +    std::mutex mutex;
> +    codecs_t codecs;
> +    int streamfd = -1;
> +    bool is_streaming = false;
> +};
> +
> +}} // namespace spice::streaming_agent
> +
> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> 
> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin@redhat.com>
>> 
>> Doing this change will make it possible to move the capture loop to the
>> concrete-agent.cpp file.
>> 
>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>> ---
>> include/spice-streaming-agent/errors.hpp |   2 +
>> src/Makefile.am                          |   2 +
>> src/message.hpp                          |  41 ++++++
>> src/spice-streaming-agent.cpp            | 209 +------------------------------
>> src/stream.cpp                           | 172 +++++++++++++++++++++++++
>> src/stream.hpp                           |  55 ++++++++
>> 6 files changed, 276 insertions(+), 205 deletions(-)
>> create mode 100644 src/message.hpp
>> create mode 100644 src/stream.cpp
>> create mode 100644 src/stream.hpp
>> 
>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>> index 870a0fd..62ae010 100644
>> --- a/include/spice-streaming-agent/errors.hpp
>> +++ b/include/spice-streaming-agent/errors.hpp
>> @@ -90,4 +90,6 @@ protected:
>> 
>> }} // namespace spice::streaming_agent
>> 
>> +extern bool quit_requested;
> 
> Putting quit_requested into errors.hpp? Why?

Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.


> 
>> +
>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>> diff --git a/src/Makefile.am b/src/Makefile.am
>> index 2507844..923a103 100644
>> --- a/src/Makefile.am
>> +++ b/src/Makefile.am
>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>> 	mjpeg-fallback.hpp \
>> 	jpeg.cpp \
>> 	jpeg.hpp \
>> +	stream.cpp \
>> +	stream.hpp \
>> 	errors.cpp \
>> 	$(NULL)
>> diff --git a/src/message.hpp b/src/message.hpp
>> new file mode 100644
>> index 0000000..28b3e28
>> --- /dev/null
>> +++ b/src/message.hpp
>> @@ -0,0 +1,41 @@
>> +/* Formatting messages
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>> +
>> +#include <spice/stream-device.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +template <typename Payload, typename Info, unsigned Type>
>> +class Message
>> +{
>> +public:
>> +    template <typename ...PayloadArgs>
>> +    Message(PayloadArgs... payload)
>> +        : hdr(StreamDevHeader {
>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>> +              .type = Type,
>> +              .size = (uint32_t) Info::size(payload...)
>> +          })
>> +    { }
>> +    void write_header(Stream &stream)
>> +    {
>> +        stream.write_all("header", &hdr, sizeof(hdr));
>> +    }
>> +
>> +protected:
>> +    StreamDevHeader hdr;
>> +    typedef Payload payload_t;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index 35e65bb..c401a34 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -5,6 +5,8 @@
>>  */
>> 
>> #include "concrete-agent.hpp"
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> #include "hexdump.h"
>> #include "mjpeg-fallback.hpp"
>> 
>> @@ -21,11 +23,9 @@
>> #include <inttypes.h>
>> #include <string.h>
>> #include <getopt.h>
>> -#include <unistd.h>
>> #include <errno.h>
>> -#include <fcntl.h>
>> +#include <unistd.h>
>> #include <sys/time.h>
>> -#include <poll.h>
>> #include <syslog.h>
>> #include <signal.h>
>> #include <exception>
>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>> 
>> }
>> 
>> -class Stream
>> -{
>> -    typedef std::set<SpiceVideoCodecType> codecs_t;
>> -
>> -public:
>> -    Stream(const char *name)
>> -        : codecs()
>> -    {
>> -        streamfd = open(name, O_RDWR);
>> -        if (streamfd < 0) {
>> -            throw IOError("failed to open streaming device", errno);
>> -        }
>> -    }
>> -    ~Stream()
>> -    {
>> -        close(streamfd);
>> -    }
>> -
>> -    const codecs_t &client_codecs() { return codecs; }
>> -    bool streaming_requested() { return is_streaming; }
>> -
>> -    template <typename Message, typename ...PayloadArgs>
>> -    void send(PayloadArgs... payload)
>> -    {
>> -        Message message(payload...);
>> -        std::lock_guard<std::mutex> stream_guard(mutex);
>> -        message.write_header(*this);
>> -        message.write_message_body(*this, payload...);
>> -    }
>> -
>> -    int read_command(bool blocking);
>> -    void write_all(const char *operation, const void *buf, const size_t len);
>> -
>> -private:
>> -    int have_something_to_read(int timeout);
>> -    void handle_stream_start_stop(uint32_t len);
>> -    void handle_stream_capabilities(uint32_t len);
>> -    void handle_stream_error(uint32_t len);
>> -    void read_command_from_device(void);
>> -
>> -private:
>> -    std::mutex mutex;
>> -    codecs_t codecs;
>> -    int streamfd = -1;
>> -    bool is_streaming = false;
>> -};
>> -
>> -template <typename Payload, typename Info, unsigned Type>
>> -class Message
>> -{
>> -public:
>> -    template <typename ...PayloadArgs>
>> -    Message(PayloadArgs... payload)
>> -        : hdr(StreamDevHeader {
>> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
>> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>> -              .type = Type,
>> -              .size = (uint32_t) Info::size(payload...)
>> -          })
>> -    { }
>> -    void write_header(Stream &stream)
>> -    {
>> -        stream.write_all("header", &hdr, sizeof(hdr));
>> -    }
>> -
>> -protected:
>> -    StreamDevHeader hdr;
>> -    typedef Payload payload_t;
>> -};
>> -
>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>> {
>> public:
>> @@ -156,20 +86,6 @@ public:
>>     }
>> };
>> 
>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>> -{
>> -public:
>> -    CapabilitiesMessage() : Message() {}
>> -    static size_t size()
>> -    {
>> -        return sizeof(payload_t);
>> -    }
>> -    void write_message_body(Stream &stream)
>> -    {
>> -        /* No body for capabilities message */
>> -    }
>> -};
>> -
>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>> {
>> public:
>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>> 
>> }} // namespace spice::streaming_agent
>> 
>> -static bool quit_requested = false;
>> -
>> -int Stream::have_something_to_read(int timeout)
>> -{
>> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
>> -
>> -    if (poll(&pollfd, 1, timeout) < 0) {
>> -        syslog(LOG_ERR, "poll FAILED\n");
>> -        return -1;
>> -    }
>> -
>> -    if (pollfd.revents == POLLIN) {
>> -        return 1;
>> -    }
>> -
>> -    return 0;
>> -}
>> -
>> -void Stream::handle_stream_start_stop(uint32_t len)
>> -{
>> -    uint8_t msg[256];
>> -
>> -    if (len >= sizeof(msg)) {
>> -        throw MessageDataError("message is too long", len, sizeof(msg));
>> -    }
>> -    int n = read(streamfd, &msg, len);
>> -    if (n != (int) len) {
>> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> -    }
>> -    is_streaming = (msg[0] != 0); /* num_codecs */
>> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> -           is_streaming ? "START" : "STOP");
>> -    codecs.clear();
>> -    for (int i = 1; i <= msg[0]; ++i) {
>> -        codecs.insert((SpiceVideoCodecType) msg[i]);
>> -    }
>> -}
>> -
>> -void Stream::handle_stream_capabilities(uint32_t len)
>> -{
>> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> -
>> -    if (len > sizeof(caps)) {
>> -        throw MessageDataError("capability message too long", len, sizeof(caps));
>> -    }
>> -    int n = read(streamfd, caps, len);
>> -    if (n != (int) len) {
>> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
>> -    }
>> -
>> -    // we currently do not support extensions so just reply so
>> -    send<CapabilitiesMessage>();
>> -}
>> -
>> -void Stream::handle_stream_error(uint32_t len)
>> -{
>> -    // TODO read message and use it
>> -    throw ProtocolError("got an error message from server");
>> -}
>> -
>> -void Stream::read_command_from_device()
>> -{
>> -    StreamDevHeader hdr;
>> -    int n;
>> -
>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>> -    n = read(streamfd, &hdr, sizeof(hdr));
>> -    if (n != sizeof(hdr)) {
>> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> -    }
>> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> -    }
>> -
>> -    switch (hdr.type) {
>> -    case STREAM_TYPE_CAPABILITIES:
>> -        return handle_stream_capabilities(hdr.size);
>> -    case STREAM_TYPE_NOTIFY_ERROR:
>> -        return handle_stream_error(hdr.size);
>> -    case STREAM_TYPE_START_STOP:
>> -        return handle_stream_start_stop(hdr.size);
>> -    }
>> -    throw MessageDataError("unknown message type", hdr.type, 0);
>> -}
>> -
>> -int Stream::read_command(bool blocking)
>> -{
>> -    int timeout = blocking?-1:0;
>> -    while (!quit_requested) {
>> -        if (!have_something_to_read(timeout)) {
>> -            if (!blocking) {
>> -                return 0;
>> -            }
>> -            sleep(1);
>> -            continue;
>> -        }
>> -        read_command_from_device();
>> -        break;
>> -    }
>> -
>> -    return 1;
>> -}
>> -
>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> -{
>> -    size_t written = 0;
>> -    while (written < len) {
>> -        int l = write(streamfd, (const char *) buf + written, len - written);
>> -        if (l < 0) {
>> -            if (errno == EINTR) {
>> -                continue;
>> -            }
>> -            throw WriteError("write failed", operation, errno).syslog();
>> -        }
>> -        written += l;
>> -    }
>> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> -}
>> +bool quit_requested = false;
>> 
>> static void handle_interrupt(int intr)
>> {
>> diff --git a/src/stream.cpp b/src/stream.cpp
>> new file mode 100644
>> index 0000000..f756097
>> --- /dev/null
>> +++ b/src/stream.cpp
>> @@ -0,0 +1,172 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +
>> +#include "stream.hpp"
>> +#include "message.hpp"
>> +
>> +#include <spice/stream-device.h>
>> +
>> +#include <spice-streaming-agent/errors.hpp>
>> +
>> +#include <sys/types.h>
>> +#include <sys/stat.h>
>> +#include <fcntl.h>
>> +#include <poll.h>
>> +#include <syslog.h>
>> +#include <unistd.h>
>> +
>> +namespace spice
>> +{
>> +namespace streaming_agent
>> +{
>> +
>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>> +                                           STREAM_TYPE_CAPABILITIES>
>> +{
>> +public:
>> +    CapabilitiesMessage() : Message() {}
>> +    static size_t size()
>> +    {
>> +        return sizeof(payload_t);
>> +    }
>> +    void write_message_body(Stream &stream)
>> +    {
>> +        /* No body for capabilities message */
>> +    }
>> +};
> 
> Not sure I like scattering the messages across source files that happen
> to use them, though I suppose you did it because each message (like the
> X11Cursor) may require different header files included? Perhaps it is
> the way to go…

No, it’s really to de-couple things, a good way to check if encapsulation was correct.


> 
>> +
>> +Stream::Stream(const char *name)
>> +    : codecs()
>> +{
>> +    streamfd = open(name, O_RDWR);
>> +    if (streamfd < 0) {
>> +        throw IOError("failed to open streaming device", errno);
>> +    }
>> +}
>> +
>> +Stream::~Stream()
>> +{
>> +    close(streamfd);
>> +}
>> +
>> +int Stream::have_something_to_read(int timeout)
>> +{
>> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
>> +
>> +    if (poll(&pollfd, 1, timeout) < 0) {
>> +        syslog(LOG_ERR, "poll FAILED\n");
>> +        return -1;
>> +    }
>> +
>> +    if (pollfd.revents == POLLIN) {
>> +        return 1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +void Stream::handle_stream_start_stop(uint32_t len)
>> +{
>> +    uint8_t msg[256];
>> +
>> +    if (len >= sizeof(msg)) {
>> +        throw MessageDataError("message is too long", len, sizeof(msg));
>> +    }
>> +    int n = read(streamfd, &msg, len);
>> +    if (n != (int) len) {
>> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>> +    }
>> +    is_streaming = (msg[0] != 0); /* num_codecs */
>> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>> +           is_streaming ? "START" : "STOP");
>> +    codecs.clear();
>> +    for (int i = 1; i <= msg[0]; ++i) {
>> +        codecs.insert((SpiceVideoCodecType) msg[i]);
>> +    }
>> +}
>> +
>> +void Stream::handle_stream_capabilities(uint32_t len)
>> +{
>> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> +
>> +    if (len > sizeof(caps)) {
>> +        throw MessageDataError("capability message too long", len, sizeof(caps));
>> +    }
>> +    int n = read(streamfd, caps, len);
>> +    if (n != (int) len) {
>> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
>> +    }
>> +
>> +    // we currently do not support extensions so just reply so
>> +    send<CapabilitiesMessage>();
>> +}
>> +
>> +void Stream::handle_stream_error(uint32_t len)
>> +{
>> +    // TODO read message and use it
>> +    throw ProtocolError("got an error message from server");
>> +}
>> +
>> +void Stream::read_command_from_device()
>> +{
>> +    StreamDevHeader hdr;
>> +    int n;
>> +
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    n = read(streamfd, &hdr, sizeof(hdr));
>> +    if (n != sizeof(hdr)) {
>> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>> +    }
>> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>> +    }
>> +
>> +    switch (hdr.type) {
>> +    case STREAM_TYPE_CAPABILITIES:
>> +        return handle_stream_capabilities(hdr.size);
>> +    case STREAM_TYPE_NOTIFY_ERROR:
>> +        return handle_stream_error(hdr.size);
>> +    case STREAM_TYPE_START_STOP:
>> +        return handle_stream_start_stop(hdr.size);
>> +    }
>> +    throw MessageDataError("unknown message type", hdr.type, 0);
>> +}
>> +
>> +int Stream::read_command(bool blocking)
>> +{
>> +    int timeout = blocking?-1:0;
>> +    while (!quit_requested) {
>> +        if (!have_something_to_read(timeout)) {
>> +            if (!blocking) {
>> +                return 0;
>> +            }
>> +            sleep(1);
>> +            continue;
>> +        }
>> +        read_command_from_device();
>> +        break;
>> +    }
>> +
>> +    return 1;
>> +}
>> +
>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> +{
>> +    size_t written = 0;
>> +    while (written < len) {
>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>> +        if (l < 0) {
>> +            if (errno == EINTR) {
>> +                continue;
>> +            }
>> +            throw WriteError("write failed", operation, errno).syslog();
>> +        }
>> +        written += l;
>> +    }
>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> +}
>> +
>> +}} // namespace spice::streaming_agent
>> diff --git a/src/stream.hpp b/src/stream.hpp
>> new file mode 100644
>> index 0000000..b689f36
>> --- /dev/null
>> +++ b/src/stream.hpp
>> @@ -0,0 +1,55 @@
>> +/* Encapsulation of the stream used to communicate between agent and server
>> + *
>> + * \copyright
>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>> + */
>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>> +
>> +#include <spice/enums.h>
>> +#include <set>
>> +#include <mutex>
>> +
>> +namespace spice {
>> +namespace streaming_agent {
>> +
>> +class Stream
>> +{
>> +    typedef std::set<SpiceVideoCodecType> codecs_t;
>> +
>> +public:
>> +    Stream(const char *name);
>> +    ~Stream();
>> +
>> +    const codecs_t &client_codecs() { return codecs; }
>> +    bool streaming_requested() { return is_streaming; }
>> +
>> +    template <typename Message, typename ...PayloadArgs>
>> +    void send(PayloadArgs... payload)
>> +    {
>> +        Message message(payload...);
>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>> +        message.write_header(*this);
>> +        message.write_message_body(*this, payload...);
>> +    }
>> +
>> +    int read_command(bool blocking);
>> +    void write_all(const char *operation, const void *buf, const size_t len);
>> +
>> +private:
>> +    int have_something_to_read(int timeout);
>> +    void handle_stream_start_stop(uint32_t len);
>> +    void handle_stream_capabilities(uint32_t len);
>> +    void handle_stream_error(uint32_t len);
>> +    void read_command_from_device(void);
>> +
>> +private:
>> +    std::mutex mutex;
>> +    codecs_t codecs;
>> +    int streamfd = -1;
>> +    bool is_streaming = false;
>> +};
>> +
>> +}} // namespace spice::streaming_agent
>> +
>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > 
> > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > 
> > > Doing this change will make it possible to move the capture loop to the
> > > concrete-agent.cpp file.
> > > 
> > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > ---
> > > include/spice-streaming-agent/errors.hpp |   2 +
> > > src/Makefile.am                          |   2 +
> > > src/message.hpp                          |  41 ++++++
> > > src/spice-streaming-agent.cpp            | 209 +------------------------------
> > > src/stream.cpp                           | 172 +++++++++++++++++++++++++
> > > src/stream.hpp                           |  55 ++++++++
> > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > create mode 100644 src/message.hpp
> > > create mode 100644 src/stream.cpp
> > > create mode 100644 src/stream.hpp
> > > 
> > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> > > index 870a0fd..62ae010 100644
> > > --- a/include/spice-streaming-agent/errors.hpp
> > > +++ b/include/spice-streaming-agent/errors.hpp
> > > @@ -90,4 +90,6 @@ protected:
> > > 
> > > }} // namespace spice::streaming_agent
> > > 
> > > +extern bool quit_requested;
> > 
> > Putting quit_requested into errors.hpp? Why?
> 
> Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.

I don't think the flag belongs to the errors header at all, let alone a
public one. It's a generic control flow mechanism to signal the
termination of the program. The only relation to errors is that in case
of errors you want to (usually) also quit.

Therefore the quit flag should not be coupled with errors and instead
used in the main control flow spot where global error handling is
taking place (and then in the signal handler). As a static variable it
should also be proliferating through the program as little as possible.

I've actually had the following idea for the quit flag, which I think
promotes the locality of the flag in the class design:


// file agent.{hpp,cpp}
class Agent {
public:
    bool& quit_flag() {
        return quit_requested;
    }

    void do_capture() {
        while(!quit_requested) {
            // ...
        }
    }
private:
    bool quit_requested = false;
};


// file main.cpp
static bool* quit_requested = nullptr;

void handle_sigterm() {
    *quit_requested = true;
}

int main() {
    Agent agent;
    quit_requested = &agent.quit_flag();
    ...
}


Some corner case handling (the quit_reqested pointer not yet set, etc.)
was left out for clarity.

This keeps the flag local to the loop in which it is relevant and the
static variable local to the main.cpp file. Thus it increases
modularity (which arguably we do not need that much here).

I would also use a different quit flag for the cursor thread (again,
local to the X11CursorUpdater) and take care of it in main() after the
Agent::do_capture() loop quits.

> 
> > 
> > > +
> > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > index 2507844..923a103 100644
> > > --- a/src/Makefile.am
> > > +++ b/src/Makefile.am
> > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > 	mjpeg-fallback.hpp \
> > > 	jpeg.cpp \
> > > 	jpeg.hpp \
> > > +	stream.cpp \
> > > +	stream.hpp \
> > > 	errors.cpp \
> > > 	$(NULL)
> > > diff --git a/src/message.hpp b/src/message.hpp
> > > new file mode 100644
> > > index 0000000..28b3e28
> > > --- /dev/null
> > > +++ b/src/message.hpp
> > > @@ -0,0 +1,41 @@
> > > +/* Formatting messages
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > +
> > > +#include <spice/stream-device.h>
> > > +
> > > +namespace spice
> > > +{
> > > +namespace streaming_agent
> > > +{
> > > +
> > > +template <typename Payload, typename Info, unsigned Type>
> > > +class Message
> > > +{
> > > +public:
> > > +    template <typename ...PayloadArgs>
> > > +    Message(PayloadArgs... payload)
> > > +        : hdr(StreamDevHeader {
> > > +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > +              .type = Type,
> > > +              .size = (uint32_t) Info::size(payload...)
> > > +          })
> > > +    { }
> > > +    void write_header(Stream &stream)
> > > +    {
> > > +        stream.write_all("header", &hdr, sizeof(hdr));
> > > +    }
> > > +
> > > +protected:
> > > +    StreamDevHeader hdr;
> > > +    typedef Payload payload_t;
> > > +};
> > > +
> > > +}} // namespace spice::streaming_agent
> > > +
> > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > index 35e65bb..c401a34 100644
> > > --- a/src/spice-streaming-agent.cpp
> > > +++ b/src/spice-streaming-agent.cpp
> > > @@ -5,6 +5,8 @@
> > >  */
> > > 
> > > #include "concrete-agent.hpp"
> > > +#include "stream.hpp"
> > > +#include "message.hpp"
> > > #include "hexdump.h"
> > > #include "mjpeg-fallback.hpp"
> > > 
> > > @@ -21,11 +23,9 @@
> > > #include <inttypes.h>
> > > #include <string.h>
> > > #include <getopt.h>
> > > -#include <unistd.h>
> > > #include <errno.h>
> > > -#include <fcntl.h>
> > > +#include <unistd.h>
> > > #include <sys/time.h>
> > > -#include <poll.h>
> > > #include <syslog.h>
> > > #include <signal.h>
> > > #include <exception>
> > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > > 
> > > }
> > > 
> > > -class Stream
> > > -{
> > > -    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > -
> > > -public:
> > > -    Stream(const char *name)
> > > -        : codecs()
> > > -    {
> > > -        streamfd = open(name, O_RDWR);
> > > -        if (streamfd < 0) {
> > > -            throw IOError("failed to open streaming device", errno);
> > > -        }
> > > -    }
> > > -    ~Stream()
> > > -    {
> > > -        close(streamfd);
> > > -    }
> > > -
> > > -    const codecs_t &client_codecs() { return codecs; }
> > > -    bool streaming_requested() { return is_streaming; }
> > > -
> > > -    template <typename Message, typename ...PayloadArgs>
> > > -    void send(PayloadArgs... payload)
> > > -    {
> > > -        Message message(payload...);
> > > -        std::lock_guard<std::mutex> stream_guard(mutex);
> > > -        message.write_header(*this);
> > > -        message.write_message_body(*this, payload...);
> > > -    }
> > > -
> > > -    int read_command(bool blocking);
> > > -    void write_all(const char *operation, const void *buf, const size_t len);
> > > -
> > > -private:
> > > -    int have_something_to_read(int timeout);
> > > -    void handle_stream_start_stop(uint32_t len);
> > > -    void handle_stream_capabilities(uint32_t len);
> > > -    void handle_stream_error(uint32_t len);
> > > -    void read_command_from_device(void);
> > > -
> > > -private:
> > > -    std::mutex mutex;
> > > -    codecs_t codecs;
> > > -    int streamfd = -1;
> > > -    bool is_streaming = false;
> > > -};
> > > -
> > > -template <typename Payload, typename Info, unsigned Type>
> > > -class Message
> > > -{
> > > -public:
> > > -    template <typename ...PayloadArgs>
> > > -    Message(PayloadArgs... payload)
> > > -        : hdr(StreamDevHeader {
> > > -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > -              .type = Type,
> > > -              .size = (uint32_t) Info::size(payload...)
> > > -          })
> > > -    { }
> > > -    void write_header(Stream &stream)
> > > -    {
> > > -        stream.write_all("header", &hdr, sizeof(hdr));
> > > -    }
> > > -
> > > -protected:
> > > -    StreamDevHeader hdr;
> > > -    typedef Payload payload_t;
> > > -};
> > > -
> > > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> > > {
> > > public:
> > > @@ -156,20 +86,6 @@ public:
> > >     }
> > > };
> > > 
> > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > -{
> > > -public:
> > > -    CapabilitiesMessage() : Message() {}
> > > -    static size_t size()
> > > -    {
> > > -        return sizeof(payload_t);
> > > -    }
> > > -    void write_message_body(Stream &stream)
> > > -    {
> > > -        /* No body for capabilities message */
> > > -    }
> > > -};
> > > -
> > > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > {
> > > public:
> > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
> > > 
> > > }} // namespace spice::streaming_agent
> > > 
> > > -static bool quit_requested = false;
> > > -
> > > -int Stream::have_something_to_read(int timeout)
> > > -{
> > > -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > -
> > > -    if (poll(&pollfd, 1, timeout) < 0) {
> > > -        syslog(LOG_ERR, "poll FAILED\n");
> > > -        return -1;
> > > -    }
> > > -
> > > -    if (pollfd.revents == POLLIN) {
> > > -        return 1;
> > > -    }
> > > -
> > > -    return 0;
> > > -}
> > > -
> > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > -{
> > > -    uint8_t msg[256];
> > > -
> > > -    if (len >= sizeof(msg)) {
> > > -        throw MessageDataError("message is too long", len, sizeof(msg));
> > > -    }
> > > -    int n = read(streamfd, &msg, len);
> > > -    if (n != (int) len) {
> > > -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > -    }
> > > -    is_streaming = (msg[0] != 0); /* num_codecs */
> > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > -           is_streaming ? "START" : "STOP");
> > > -    codecs.clear();
> > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > -        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > -    }
> > > -}
> > > -
> > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > -{
> > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > -
> > > -    if (len > sizeof(caps)) {
> > > -        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > -    }
> > > -    int n = read(streamfd, caps, len);
> > > -    if (n != (int) len) {
> > > -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > -    }
> > > -
> > > -    // we currently do not support extensions so just reply so
> > > -    send<CapabilitiesMessage>();
> > > -}
> > > -
> > > -void Stream::handle_stream_error(uint32_t len)
> > > -{
> > > -    // TODO read message and use it
> > > -    throw ProtocolError("got an error message from server");
> > > -}
> > > -
> > > -void Stream::read_command_from_device()
> > > -{
> > > -    StreamDevHeader hdr;
> > > -    int n;
> > > -
> > > -    std::lock_guard<std::mutex> stream_guard(mutex);
> > > -    n = read(streamfd, &hdr, sizeof(hdr));
> > > -    if (n != sizeof(hdr)) {
> > > -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > -    }
> > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > -    }
> > > -
> > > -    switch (hdr.type) {
> > > -    case STREAM_TYPE_CAPABILITIES:
> > > -        return handle_stream_capabilities(hdr.size);
> > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > -        return handle_stream_error(hdr.size);
> > > -    case STREAM_TYPE_START_STOP:
> > > -        return handle_stream_start_stop(hdr.size);
> > > -    }
> > > -    throw MessageDataError("unknown message type", hdr.type, 0);
> > > -}
> > > -
> > > -int Stream::read_command(bool blocking)
> > > -{
> > > -    int timeout = blocking?-1:0;
> > > -    while (!quit_requested) {
> > > -        if (!have_something_to_read(timeout)) {
> > > -            if (!blocking) {
> > > -                return 0;
> > > -            }
> > > -            sleep(1);
> > > -            continue;
> > > -        }
> > > -        read_command_from_device();
> > > -        break;
> > > -    }
> > > -
> > > -    return 1;
> > > -}
> > > -
> > > -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > -{
> > > -    size_t written = 0;
> > > -    while (written < len) {
> > > -        int l = write(streamfd, (const char *) buf + written, len - written);
> > > -        if (l < 0) {
> > > -            if (errno == EINTR) {
> > > -                continue;
> > > -            }
> > > -            throw WriteError("write failed", operation, errno).syslog();
> > > -        }
> > > -        written += l;
> > > -    }
> > > -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > -}
> > > +bool quit_requested = false;
> > > 
> > > static void handle_interrupt(int intr)
> > > {
> > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > new file mode 100644
> > > index 0000000..f756097
> > > --- /dev/null
> > > +++ b/src/stream.cpp
> > > @@ -0,0 +1,172 @@
> > > +/* Encapsulation of the stream used to communicate between agent and server
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +
> > > +#include "stream.hpp"
> > > +#include "message.hpp"
> > > +
> > > +#include <spice/stream-device.h>
> > > +
> > > +#include <spice-streaming-agent/errors.hpp>
> > > +
> > > +#include <sys/types.h>
> > > +#include <sys/stat.h>
> > > +#include <fcntl.h>
> > > +#include <poll.h>
> > > +#include <syslog.h>
> > > +#include <unistd.h>
> > > +
> > > +namespace spice
> > > +{
> > > +namespace streaming_agent
> > > +{
> > > +
> > > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> > > +                                           STREAM_TYPE_CAPABILITIES>
> > > +{
> > > +public:
> > > +    CapabilitiesMessage() : Message() {}
> > > +    static size_t size()
> > > +    {
> > > +        return sizeof(payload_t);
> > > +    }
> > > +    void write_message_body(Stream &stream)
> > > +    {
> > > +        /* No body for capabilities message */
> > > +    }
> > > +};
> > 
> > Not sure I like scattering the messages across source files that happen
> > to use them, though I suppose you did it because each message (like the
> > X11Cursor) may require different header files included? Perhaps it is
> > the way to go…
> 
> No, it’s really to de-couple things, a good way to check if encapsulation was correct.
> 
> 
> > 
> > > +
> > > +Stream::Stream(const char *name)
> > > +    : codecs()
> > > +{
> > > +    streamfd = open(name, O_RDWR);
> > > +    if (streamfd < 0) {
> > > +        throw IOError("failed to open streaming device", errno);
> > > +    }
> > > +}
> > > +
> > > +Stream::~Stream()
> > > +{
> > > +    close(streamfd);
> > > +}
> > > +
> > > +int Stream::have_something_to_read(int timeout)
> > > +{
> > > +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > +
> > > +    if (poll(&pollfd, 1, timeout) < 0) {
> > > +        syslog(LOG_ERR, "poll FAILED\n");
> > > +        return -1;
> > > +    }
> > > +
> > > +    if (pollfd.revents == POLLIN) {
> > > +        return 1;
> > > +    }
> > > +
> > > +    return 0;
> > > +}
> > > +
> > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > +{
> > > +    uint8_t msg[256];
> > > +
> > > +    if (len >= sizeof(msg)) {
> > > +        throw MessageDataError("message is too long", len, sizeof(msg));
> > > +    }
> > > +    int n = read(streamfd, &msg, len);
> > > +    if (n != (int) len) {
> > > +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > +    }
> > > +    is_streaming = (msg[0] != 0); /* num_codecs */
> > > +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > +           is_streaming ? "START" : "STOP");
> > > +    codecs.clear();
> > > +    for (int i = 1; i <= msg[0]; ++i) {
> > > +        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > +    }
> > > +}
> > > +
> > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > +{
> > > +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > +
> > > +    if (len > sizeof(caps)) {
> > > +        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > +    }
> > > +    int n = read(streamfd, caps, len);
> > > +    if (n != (int) len) {
> > > +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > +    }
> > > +
> > > +    // we currently do not support extensions so just reply so
> > > +    send<CapabilitiesMessage>();
> > > +}
> > > +
> > > +void Stream::handle_stream_error(uint32_t len)
> > > +{
> > > +    // TODO read message and use it
> > > +    throw ProtocolError("got an error message from server");
> > > +}
> > > +
> > > +void Stream::read_command_from_device()
> > > +{
> > > +    StreamDevHeader hdr;
> > > +    int n;
> > > +
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    n = read(streamfd, &hdr, sizeof(hdr));
> > > +    if (n != sizeof(hdr)) {
> > > +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > +    }
> > > +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > +    }
> > > +
> > > +    switch (hdr.type) {
> > > +    case STREAM_TYPE_CAPABILITIES:
> > > +        return handle_stream_capabilities(hdr.size);
> > > +    case STREAM_TYPE_NOTIFY_ERROR:
> > > +        return handle_stream_error(hdr.size);
> > > +    case STREAM_TYPE_START_STOP:
> > > +        return handle_stream_start_stop(hdr.size);
> > > +    }
> > > +    throw MessageDataError("unknown message type", hdr.type, 0);
> > > +}
> > > +
> > > +int Stream::read_command(bool blocking)
> > > +{
> > > +    int timeout = blocking?-1:0;
> > > +    while (!quit_requested) {
> > > +        if (!have_something_to_read(timeout)) {
> > > +            if (!blocking) {
> > > +                return 0;
> > > +            }
> > > +            sleep(1);
> > > +            continue;
> > > +        }
> > > +        read_command_from_device();
> > > +        break;
> > > +    }
> > > +
> > > +    return 1;
> > > +}
> > > +
> > > +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > +{
> > > +    size_t written = 0;
> > > +    while (written < len) {
> > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > > +        if (l < 0) {
> > > +            if (errno == EINTR) {
> > > +                continue;
> > > +            }
> > > +            throw WriteError("write failed", operation, errno).syslog();
> > > +        }
> > > +        written += l;
> > > +    }
> > > +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > +}
> > > +
> > > +}} // namespace spice::streaming_agent
> > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > new file mode 100644
> > > index 0000000..b689f36
> > > --- /dev/null
> > > +++ b/src/stream.hpp
> > > @@ -0,0 +1,55 @@
> > > +/* Encapsulation of the stream used to communicate between agent and server
> > > + *
> > > + * \copyright
> > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > + */
> > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > +
> > > +#include <spice/enums.h>
> > > +#include <set>
> > > +#include <mutex>
> > > +
> > > +namespace spice {
> > > +namespace streaming_agent {
> > > +
> > > +class Stream
> > > +{
> > > +    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > +
> > > +public:
> > > +    Stream(const char *name);
> > > +    ~Stream();
> > > +
> > > +    const codecs_t &client_codecs() { return codecs; }
> > > +    bool streaming_requested() { return is_streaming; }
> > > +
> > > +    template <typename Message, typename ...PayloadArgs>
> > > +    void send(PayloadArgs... payload)
> > > +    {
> > > +        Message message(payload...);
> > > +        std::lock_guard<std::mutex> stream_guard(mutex);
> > > +        message.write_header(*this);
> > > +        message.write_message_body(*this, payload...);
> > > +    }
> > > +
> > > +    int read_command(bool blocking);
> > > +    void write_all(const char *operation, const void *buf, const size_t len);
> > > +
> > > +private:
> > > +    int have_something_to_read(int timeout);
> > > +    void handle_stream_start_stop(uint32_t len);
> > > +    void handle_stream_capabilities(uint32_t len);
> > > +    void handle_stream_error(uint32_t len);
> > > +    void read_command_from_device(void);
> > > +
> > > +private:
> > > +    std::mutex mutex;
> > > +    codecs_t codecs;
> > > +    int streamfd = -1;
> > > +    bool is_streaming = false;
> > > +};
> > > +
> > > +}} // namespace spice::streaming_agent
> > > +
> > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> 
>
> On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> 
> On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
>>> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
>>> 
>>> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>>>> From: Christophe de Dinechin <dinechin@redhat.com>
>>>> 
>>>> Doing this change will make it possible to move the capture loop to the
>>>> concrete-agent.cpp file.
>>>> 
>>>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>>>> ---
>>>> include/spice-streaming-agent/errors.hpp |   2 +
>>>> src/Makefile.am                          |   2 +
>>>> src/message.hpp                          |  41 ++++++
>>>> src/spice-streaming-agent.cpp            | 209 +------------------------------
>>>> src/stream.cpp                           | 172 +++++++++++++++++++++++++
>>>> src/stream.hpp                           |  55 ++++++++
>>>> 6 files changed, 276 insertions(+), 205 deletions(-)
>>>> create mode 100644 src/message.hpp
>>>> create mode 100644 src/stream.cpp
>>>> create mode 100644 src/stream.hpp
>>>> 
>>>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>>>> index 870a0fd..62ae010 100644
>>>> --- a/include/spice-streaming-agent/errors.hpp
>>>> +++ b/include/spice-streaming-agent/errors.hpp
>>>> @@ -90,4 +90,6 @@ protected:
>>>> 
>>>> }} // namespace spice::streaming_agent
>>>> 
>>>> +extern bool quit_requested;
>>> 
>>> Putting quit_requested into errors.hpp? Why?
>> 
>> Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
> 
> I don't think the flag belongs to the errors header at all, let alone a
> public one. It's a generic control flow mechanism to signal the
> termination of the program. The only relation to errors is that in case
> of errors you want to (usually) also quit.

Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.


> 
> Therefore the quit flag should not be coupled with errors and instead
> used in the main control flow spot where global error handling is
> taking place (and then in the signal handler). As a static variable it
> should also be proliferating through the program as little as possible.
> 
> I've actually had the following idea for the quit flag, which I think
> promotes the locality of the flag in the class design:
> 
> 
> // file agent.{hpp,cpp}
> class Agent {
> public:
>    bool& quit_flag() {
>        return quit_requested;
>    }
> 
>    void do_capture() {
>        while(!quit_requested) {
>            // ...
>        }
>    }
> private:
>    bool quit_requested = false;
> };
> 
> 
> // file main.cpp
> static bool* quit_requested = nullptr;
> 
> void handle_sigterm() {
>    *quit_requested = true;
> }
> 
> int main() {
>    Agent agent;
>    quit_requested = &agent.quit_flag();
>    ...
> }
> 
> 
> Some corner case handling (the quit_reqested pointer not yet set, etc.)
> was left out for clarity.

I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?

> 
> This keeps the flag local to the loop in which it is relevant and the
> static variable local to the main.cpp file. Thus it increases
> modularity (which arguably we do not need that much here).
> 
> I would also use a different quit flag for the cursor thread (again,
> local to the X11CursorUpdater) and take care of it in main() after the
> Agent::do_capture() loop quits.

What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?

To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.

> 
>> 
>>> 
>>>> +
>>>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>>>> diff --git a/src/Makefile.am b/src/Makefile.am
>>>> index 2507844..923a103 100644
>>>> --- a/src/Makefile.am
>>>> +++ b/src/Makefile.am
>>>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>>>> 	mjpeg-fallback.hpp \
>>>> 	jpeg.cpp \
>>>> 	jpeg.hpp \
>>>> +	stream.cpp \
>>>> +	stream.hpp \
>>>> 	errors.cpp \
>>>> 	$(NULL)
>>>> diff --git a/src/message.hpp b/src/message.hpp
>>>> new file mode 100644
>>>> index 0000000..28b3e28
>>>> --- /dev/null
>>>> +++ b/src/message.hpp
>>>> @@ -0,0 +1,41 @@
>>>> +/* Formatting messages
>>>> + *
>>>> + * \copyright
>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>> + */
>>>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>> +
>>>> +#include <spice/stream-device.h>
>>>> +
>>>> +namespace spice
>>>> +{
>>>> +namespace streaming_agent
>>>> +{
>>>> +
>>>> +template <typename Payload, typename Info, unsigned Type>
>>>> +class Message
>>>> +{
>>>> +public:
>>>> +    template <typename ...PayloadArgs>
>>>> +    Message(PayloadArgs... payload)
>>>> +        : hdr(StreamDevHeader {
>>>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>> +              .type = Type,
>>>> +              .size = (uint32_t) Info::size(payload...)
>>>> +          })
>>>> +    { }
>>>> +    void write_header(Stream &stream)
>>>> +    {
>>>> +        stream.write_all("header", &hdr, sizeof(hdr));
>>>> +    }
>>>> +
>>>> +protected:
>>>> +    StreamDevHeader hdr;
>>>> +    typedef Payload payload_t;
>>>> +};
>>>> +
>>>> +}} // namespace spice::streaming_agent
>>>> +
>>>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>>>> index 35e65bb..c401a34 100644
>>>> --- a/src/spice-streaming-agent.cpp
>>>> +++ b/src/spice-streaming-agent.cpp
>>>> @@ -5,6 +5,8 @@
>>>> */
>>>> 
>>>> #include "concrete-agent.hpp"
>>>> +#include "stream.hpp"
>>>> +#include "message.hpp"
>>>> #include "hexdump.h"
>>>> #include "mjpeg-fallback.hpp"
>>>> 
>>>> @@ -21,11 +23,9 @@
>>>> #include <inttypes.h>
>>>> #include <string.h>
>>>> #include <getopt.h>
>>>> -#include <unistd.h>
>>>> #include <errno.h>
>>>> -#include <fcntl.h>
>>>> +#include <unistd.h>
>>>> #include <sys/time.h>
>>>> -#include <poll.h>
>>>> #include <syslog.h>
>>>> #include <signal.h>
>>>> #include <exception>
>>>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>>>> 
>>>> }
>>>> 
>>>> -class Stream
>>>> -{
>>>> -    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>> -
>>>> -public:
>>>> -    Stream(const char *name)
>>>> -        : codecs()
>>>> -    {
>>>> -        streamfd = open(name, O_RDWR);
>>>> -        if (streamfd < 0) {
>>>> -            throw IOError("failed to open streaming device", errno);
>>>> -        }
>>>> -    }
>>>> -    ~Stream()
>>>> -    {
>>>> -        close(streamfd);
>>>> -    }
>>>> -
>>>> -    const codecs_t &client_codecs() { return codecs; }
>>>> -    bool streaming_requested() { return is_streaming; }
>>>> -
>>>> -    template <typename Message, typename ...PayloadArgs>
>>>> -    void send(PayloadArgs... payload)
>>>> -    {
>>>> -        Message message(payload...);
>>>> -        std::lock_guard<std::mutex> stream_guard(mutex);
>>>> -        message.write_header(*this);
>>>> -        message.write_message_body(*this, payload...);
>>>> -    }
>>>> -
>>>> -    int read_command(bool blocking);
>>>> -    void write_all(const char *operation, const void *buf, const size_t len);
>>>> -
>>>> -private:
>>>> -    int have_something_to_read(int timeout);
>>>> -    void handle_stream_start_stop(uint32_t len);
>>>> -    void handle_stream_capabilities(uint32_t len);
>>>> -    void handle_stream_error(uint32_t len);
>>>> -    void read_command_from_device(void);
>>>> -
>>>> -private:
>>>> -    std::mutex mutex;
>>>> -    codecs_t codecs;
>>>> -    int streamfd = -1;
>>>> -    bool is_streaming = false;
>>>> -};
>>>> -
>>>> -template <typename Payload, typename Info, unsigned Type>
>>>> -class Message
>>>> -{
>>>> -public:
>>>> -    template <typename ...PayloadArgs>
>>>> -    Message(PayloadArgs... payload)
>>>> -        : hdr(StreamDevHeader {
>>>> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>> -              .type = Type,
>>>> -              .size = (uint32_t) Info::size(payload...)
>>>> -          })
>>>> -    { }
>>>> -    void write_header(Stream &stream)
>>>> -    {
>>>> -        stream.write_all("header", &hdr, sizeof(hdr));
>>>> -    }
>>>> -
>>>> -protected:
>>>> -    StreamDevHeader hdr;
>>>> -    typedef Payload payload_t;
>>>> -};
>>>> -
>>>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>>>> {
>>>> public:
>>>> @@ -156,20 +86,6 @@ public:
>>>>    }
>>>> };
>>>> 
>>>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>>>> -{
>>>> -public:
>>>> -    CapabilitiesMessage() : Message() {}
>>>> -    static size_t size()
>>>> -    {
>>>> -        return sizeof(payload_t);
>>>> -    }
>>>> -    void write_message_body(Stream &stream)
>>>> -    {
>>>> -        /* No body for capabilities message */
>>>> -    }
>>>> -};
>>>> -
>>>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>>>> {
>>>> public:
>>>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>>>> 
>>>> }} // namespace spice::streaming_agent
>>>> 
>>>> -static bool quit_requested = false;
>>>> -
>>>> -int Stream::have_something_to_read(int timeout)
>>>> -{
>>>> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>> -
>>>> -    if (poll(&pollfd, 1, timeout) < 0) {
>>>> -        syslog(LOG_ERR, "poll FAILED\n");
>>>> -        return -1;
>>>> -    }
>>>> -
>>>> -    if (pollfd.revents == POLLIN) {
>>>> -        return 1;
>>>> -    }
>>>> -
>>>> -    return 0;
>>>> -}
>>>> -
>>>> -void Stream::handle_stream_start_stop(uint32_t len)
>>>> -{
>>>> -    uint8_t msg[256];
>>>> -
>>>> -    if (len >= sizeof(msg)) {
>>>> -        throw MessageDataError("message is too long", len, sizeof(msg));
>>>> -    }
>>>> -    int n = read(streamfd, &msg, len);
>>>> -    if (n != (int) len) {
>>>> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>> -    }
>>>> -    is_streaming = (msg[0] != 0); /* num_codecs */
>>>> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>> -           is_streaming ? "START" : "STOP");
>>>> -    codecs.clear();
>>>> -    for (int i = 1; i <= msg[0]; ++i) {
>>>> -        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>> -    }
>>>> -}
>>>> -
>>>> -void Stream::handle_stream_capabilities(uint32_t len)
>>>> -{
>>>> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>> -
>>>> -    if (len > sizeof(caps)) {
>>>> -        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>> -    }
>>>> -    int n = read(streamfd, caps, len);
>>>> -    if (n != (int) len) {
>>>> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>> -    }
>>>> -
>>>> -    // we currently do not support extensions so just reply so
>>>> -    send<CapabilitiesMessage>();
>>>> -}
>>>> -
>>>> -void Stream::handle_stream_error(uint32_t len)
>>>> -{
>>>> -    // TODO read message and use it
>>>> -    throw ProtocolError("got an error message from server");
>>>> -}
>>>> -
>>>> -void Stream::read_command_from_device()
>>>> -{
>>>> -    StreamDevHeader hdr;
>>>> -    int n;
>>>> -
>>>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>>>> -    n = read(streamfd, &hdr, sizeof(hdr));
>>>> -    if (n != sizeof(hdr)) {
>>>> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>> -    }
>>>> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>> -    }
>>>> -
>>>> -    switch (hdr.type) {
>>>> -    case STREAM_TYPE_CAPABILITIES:
>>>> -        return handle_stream_capabilities(hdr.size);
>>>> -    case STREAM_TYPE_NOTIFY_ERROR:
>>>> -        return handle_stream_error(hdr.size);
>>>> -    case STREAM_TYPE_START_STOP:
>>>> -        return handle_stream_start_stop(hdr.size);
>>>> -    }
>>>> -    throw MessageDataError("unknown message type", hdr.type, 0);
>>>> -}
>>>> -
>>>> -int Stream::read_command(bool blocking)
>>>> -{
>>>> -    int timeout = blocking?-1:0;
>>>> -    while (!quit_requested) {
>>>> -        if (!have_something_to_read(timeout)) {
>>>> -            if (!blocking) {
>>>> -                return 0;
>>>> -            }
>>>> -            sleep(1);
>>>> -            continue;
>>>> -        }
>>>> -        read_command_from_device();
>>>> -        break;
>>>> -    }
>>>> -
>>>> -    return 1;
>>>> -}
>>>> -
>>>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>> -{
>>>> -    size_t written = 0;
>>>> -    while (written < len) {
>>>> -        int l = write(streamfd, (const char *) buf + written, len - written);
>>>> -        if (l < 0) {
>>>> -            if (errno == EINTR) {
>>>> -                continue;
>>>> -            }
>>>> -            throw WriteError("write failed", operation, errno).syslog();
>>>> -        }
>>>> -        written += l;
>>>> -    }
>>>> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>> -}
>>>> +bool quit_requested = false;
>>>> 
>>>> static void handle_interrupt(int intr)
>>>> {
>>>> diff --git a/src/stream.cpp b/src/stream.cpp
>>>> new file mode 100644
>>>> index 0000000..f756097
>>>> --- /dev/null
>>>> +++ b/src/stream.cpp
>>>> @@ -0,0 +1,172 @@
>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>> + *
>>>> + * \copyright
>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>> + */
>>>> +
>>>> +#include "stream.hpp"
>>>> +#include "message.hpp"
>>>> +
>>>> +#include <spice/stream-device.h>
>>>> +
>>>> +#include <spice-streaming-agent/errors.hpp>
>>>> +
>>>> +#include <sys/types.h>
>>>> +#include <sys/stat.h>
>>>> +#include <fcntl.h>
>>>> +#include <poll.h>
>>>> +#include <syslog.h>
>>>> +#include <unistd.h>
>>>> +
>>>> +namespace spice
>>>> +{
>>>> +namespace streaming_agent
>>>> +{
>>>> +
>>>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>>>> +                                           STREAM_TYPE_CAPABILITIES>
>>>> +{
>>>> +public:
>>>> +    CapabilitiesMessage() : Message() {}
>>>> +    static size_t size()
>>>> +    {
>>>> +        return sizeof(payload_t);
>>>> +    }
>>>> +    void write_message_body(Stream &stream)
>>>> +    {
>>>> +        /* No body for capabilities message */
>>>> +    }
>>>> +};
>>> 
>>> Not sure I like scattering the messages across source files that happen
>>> to use them, though I suppose you did it because each message (like the
>>> X11Cursor) may require different header files included? Perhaps it is
>>> the way to go…
>> 
>> No, it’s really to de-couple things, a good way to check if encapsulation was correct.
>> 
>> 
>>> 
>>>> +
>>>> +Stream::Stream(const char *name)
>>>> +    : codecs()
>>>> +{
>>>> +    streamfd = open(name, O_RDWR);
>>>> +    if (streamfd < 0) {
>>>> +        throw IOError("failed to open streaming device", errno);
>>>> +    }
>>>> +}
>>>> +
>>>> +Stream::~Stream()
>>>> +{
>>>> +    close(streamfd);
>>>> +}
>>>> +
>>>> +int Stream::have_something_to_read(int timeout)
>>>> +{
>>>> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>> +
>>>> +    if (poll(&pollfd, 1, timeout) < 0) {
>>>> +        syslog(LOG_ERR, "poll FAILED\n");
>>>> +        return -1;
>>>> +    }
>>>> +
>>>> +    if (pollfd.revents == POLLIN) {
>>>> +        return 1;
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +void Stream::handle_stream_start_stop(uint32_t len)
>>>> +{
>>>> +    uint8_t msg[256];
>>>> +
>>>> +    if (len >= sizeof(msg)) {
>>>> +        throw MessageDataError("message is too long", len, sizeof(msg));
>>>> +    }
>>>> +    int n = read(streamfd, &msg, len);
>>>> +    if (n != (int) len) {
>>>> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>> +    }
>>>> +    is_streaming = (msg[0] != 0); /* num_codecs */
>>>> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>> +           is_streaming ? "START" : "STOP");
>>>> +    codecs.clear();
>>>> +    for (int i = 1; i <= msg[0]; ++i) {
>>>> +        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>> +    }
>>>> +}
>>>> +
>>>> +void Stream::handle_stream_capabilities(uint32_t len)
>>>> +{
>>>> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>> +
>>>> +    if (len > sizeof(caps)) {
>>>> +        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>> +    }
>>>> +    int n = read(streamfd, caps, len);
>>>> +    if (n != (int) len) {
>>>> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>> +    }
>>>> +
>>>> +    // we currently do not support extensions so just reply so
>>>> +    send<CapabilitiesMessage>();
>>>> +}
>>>> +
>>>> +void Stream::handle_stream_error(uint32_t len)
>>>> +{
>>>> +    // TODO read message and use it
>>>> +    throw ProtocolError("got an error message from server");
>>>> +}
>>>> +
>>>> +void Stream::read_command_from_device()
>>>> +{
>>>> +    StreamDevHeader hdr;
>>>> +    int n;
>>>> +
>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>> +    n = read(streamfd, &hdr, sizeof(hdr));
>>>> +    if (n != sizeof(hdr)) {
>>>> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>> +    }
>>>> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>> +    }
>>>> +
>>>> +    switch (hdr.type) {
>>>> +    case STREAM_TYPE_CAPABILITIES:
>>>> +        return handle_stream_capabilities(hdr.size);
>>>> +    case STREAM_TYPE_NOTIFY_ERROR:
>>>> +        return handle_stream_error(hdr.size);
>>>> +    case STREAM_TYPE_START_STOP:
>>>> +        return handle_stream_start_stop(hdr.size);
>>>> +    }
>>>> +    throw MessageDataError("unknown message type", hdr.type, 0);
>>>> +}
>>>> +
>>>> +int Stream::read_command(bool blocking)
>>>> +{
>>>> +    int timeout = blocking?-1:0;
>>>> +    while (!quit_requested) {
>>>> +        if (!have_something_to_read(timeout)) {
>>>> +            if (!blocking) {
>>>> +                return 0;
>>>> +            }
>>>> +            sleep(1);
>>>> +            continue;
>>>> +        }
>>>> +        read_command_from_device();
>>>> +        break;
>>>> +    }
>>>> +
>>>> +    return 1;
>>>> +}
>>>> +
>>>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>> +{
>>>> +    size_t written = 0;
>>>> +    while (written < len) {
>>>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>>>> +        if (l < 0) {
>>>> +            if (errno == EINTR) {
>>>> +                continue;
>>>> +            }
>>>> +            throw WriteError("write failed", operation, errno).syslog();
>>>> +        }
>>>> +        written += l;
>>>> +    }
>>>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>> +}
>>>> +
>>>> +}} // namespace spice::streaming_agent
>>>> diff --git a/src/stream.hpp b/src/stream.hpp
>>>> new file mode 100644
>>>> index 0000000..b689f36
>>>> --- /dev/null
>>>> +++ b/src/stream.hpp
>>>> @@ -0,0 +1,55 @@
>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>> + *
>>>> + * \copyright
>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>> + */
>>>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>>>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>>>> +
>>>> +#include <spice/enums.h>
>>>> +#include <set>
>>>> +#include <mutex>
>>>> +
>>>> +namespace spice {
>>>> +namespace streaming_agent {
>>>> +
>>>> +class Stream
>>>> +{
>>>> +    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>> +
>>>> +public:
>>>> +    Stream(const char *name);
>>>> +    ~Stream();
>>>> +
>>>> +    const codecs_t &client_codecs() { return codecs; }
>>>> +    bool streaming_requested() { return is_streaming; }
>>>> +
>>>> +    template <typename Message, typename ...PayloadArgs>
>>>> +    void send(PayloadArgs... payload)
>>>> +    {
>>>> +        Message message(payload...);
>>>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>>>> +        message.write_header(*this);
>>>> +        message.write_message_body(*this, payload...);
>>>> +    }
>>>> +
>>>> +    int read_command(bool blocking);
>>>> +    void write_all(const char *operation, const void *buf, const size_t len);
>>>> +
>>>> +private:
>>>> +    int have_something_to_read(int timeout);
>>>> +    void handle_stream_start_stop(uint32_t len);
>>>> +    void handle_stream_capabilities(uint32_t len);
>>>> +    void handle_stream_error(uint32_t len);
>>>> +    void read_command_from_device(void);
>>>> +
>>>> +private:
>>>> +    std::mutex mutex;
>>>> +    codecs_t codecs;
>>>> +    int streamfd = -1;
>>>> +    bool is_streaming = false;
>>>> +};
>>>> +
>>>> +}} // namespace spice::streaming_agent
>>>> +
>>>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
> > On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > 
> > On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > > > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > > > 
> > > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > > > 
> > > > > Doing this change will make it possible to move the capture loop to the
> > > > > concrete-agent.cpp file.
> > > > > 
> > > > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > > > ---
> > > > > include/spice-streaming-agent/errors.hpp |   2 +
> > > > > src/Makefile.am                          |   2 +
> > > > > src/message.hpp                          |  41 ++++++
> > > > > src/spice-streaming-agent.cpp            | 209 +------------------------------
> > > > > src/stream.cpp                           | 172 +++++++++++++++++++++++++
> > > > > src/stream.hpp                           |  55 ++++++++
> > > > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > > > create mode 100644 src/message.hpp
> > > > > create mode 100644 src/stream.cpp
> > > > > create mode 100644 src/stream.hpp
> > > > > 
> > > > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> > > > > index 870a0fd..62ae010 100644
> > > > > --- a/include/spice-streaming-agent/errors.hpp
> > > > > +++ b/include/spice-streaming-agent/errors.hpp
> > > > > @@ -90,4 +90,6 @@ protected:
> > > > > 
> > > > > }} // namespace spice::streaming_agent
> > > > > 
> > > > > +extern bool quit_requested;
> > > > 
> > > > Putting quit_requested into errors.hpp? Why?
> > > 
> > > Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
> > 
> > I don't think the flag belongs to the errors header at all, let alone a
> > public one. It's a generic control flow mechanism to signal the
> > termination of the program. The only relation to errors is that in case
> > of errors you want to (usually) also quit.
> 
> Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.

But a termination signal is not an error, it is the natural way to end
the program. Actually, exceptions thrown inside the main program loop
are another, second way to exit the loop besides setting the quit flag.

From another point of view, you include errors.h in each place where
you are either throwing or catching exceptions. But the quit flag is
only needed in the main loop and in the signal handler.

> > 
> > Therefore the quit flag should not be coupled with errors and instead
> > used in the main control flow spot where global error handling is
> > taking place (and then in the signal handler). As a static variable it
> > should also be proliferating through the program as little as possible.
> > 
> > I've actually had the following idea for the quit flag, which I think
> > promotes the locality of the flag in the class design:
> > 
> > 
> > // file agent.{hpp,cpp}
> > class Agent {
> > public:
> >    bool& quit_flag() {
> >        return quit_requested;
> >    }
> > 
> >    void do_capture() {
> >        while(!quit_requested) {
> >            // ...
> >        }
> >    }
> > private:
> >    bool quit_requested = false;
> > };
> > 
> > 
> > // file main.cpp
> > static bool* quit_requested = nullptr;
> > 
> > void handle_sigterm() {
> >    *quit_requested = true;
> > }
> > 
> > int main() {
> >    Agent agent;
> >    quit_requested = &agent.quit_flag();
> >    ...
> > }
> > 
> > 
> > Some corner case handling (the quit_reqested pointer not yet set, etc.)
> > was left out for clarity.
> 
> I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?

Only this one global variable that is in the example. And the only
reason it is global is because of the signal handler, there is no other
way for it work. Therefore, I only make it "global locally" (excuse the
oxymoron :)) for the signal handler and limit what can access it as
much as I can.

(And it's just a pointer, the real flag is local to the loop which uses
it)

> > 
> > This keeps the flag local to the loop in which it is relevant and the
> > static variable local to the main.cpp file. Thus it increases
> > modularity (which arguably we do not need that much here).
> > 
> > I would also use a different quit flag for the cursor thread (again,
> > local to the X11CursorUpdater) and take care of it in main() after the
> > Agent::do_capture() loop quits.
> 
> What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?

No, I don't think there are. The reason is different. To me one global
"quit all" flag seems like a not well formed hierarchy of execution,
for one, it could introduce race conditions, if we generalize and say
we have several losely dependant threads/processes and you signal them
all to quit, the order they do so is arbitrary. Of course you can
introduce mechanisms like join() etc. to synchronize.

But what I would consider a better design would be one main loop in the
main thread, that also reacts to the signals. If the main loop exits,
then it would tear down the other threads signalling them in whatever
way is natural to the thread in question and waiting for them to
finish. So you define the hierarchy, there is the main thread taking
responsibility and subthreads that are managed by it.

> To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.

As I explained. I'm not necessarily saying your approach is wrong, just
that:

1. I prefer to explicitely define a main thread that has the management
responsibility.

2. The static flag shared across classes simply seems to me a thing to
avoid, it breaks modularity and for example makes it a bit more fiddly
to use the modules in tests for example...

> > 
> > > 
> > > > 
> > > > > +
> > > > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > > > index 2507844..923a103 100644
> > > > > --- a/src/Makefile.am
> > > > > +++ b/src/Makefile.am
> > > > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > > > 	mjpeg-fallback.hpp \
> > > > > 	jpeg.cpp \
> > > > > 	jpeg.hpp \
> > > > > +	stream.cpp \
> > > > > +	stream.hpp \
> > > > > 	errors.cpp \
> > > > > 	$(NULL)
> > > > > diff --git a/src/message.hpp b/src/message.hpp
> > > > > new file mode 100644
> > > > > index 0000000..28b3e28
> > > > > --- /dev/null
> > > > > +++ b/src/message.hpp
> > > > > @@ -0,0 +1,41 @@
> > > > > +/* Formatting messages
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > +
> > > > > +#include <spice/stream-device.h>
> > > > > +
> > > > > +namespace spice
> > > > > +{
> > > > > +namespace streaming_agent
> > > > > +{
> > > > > +
> > > > > +template <typename Payload, typename Info, unsigned Type>
> > > > > +class Message
> > > > > +{
> > > > > +public:
> > > > > +    template <typename ...PayloadArgs>
> > > > > +    Message(PayloadArgs... payload)
> > > > > +        : hdr(StreamDevHeader {
> > > > > +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > +              .type = Type,
> > > > > +              .size = (uint32_t) Info::size(payload...)
> > > > > +          })
> > > > > +    { }
> > > > > +    void write_header(Stream &stream)
> > > > > +    {
> > > > > +        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > +    }
> > > > > +
> > > > > +protected:
> > > > > +    StreamDevHeader hdr;
> > > > > +    typedef Payload payload_t;
> > > > > +};
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > +
> > > > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > > > index 35e65bb..c401a34 100644
> > > > > --- a/src/spice-streaming-agent.cpp
> > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > @@ -5,6 +5,8 @@
> > > > > */
> > > > > 
> > > > > #include "concrete-agent.hpp"
> > > > > +#include "stream.hpp"
> > > > > +#include "message.hpp"
> > > > > #include "hexdump.h"
> > > > > #include "mjpeg-fallback.hpp"
> > > > > 
> > > > > @@ -21,11 +23,9 @@
> > > > > #include <inttypes.h>
> > > > > #include <string.h>
> > > > > #include <getopt.h>
> > > > > -#include <unistd.h>
> > > > > #include <errno.h>
> > > > > -#include <fcntl.h>
> > > > > +#include <unistd.h>
> > > > > #include <sys/time.h>
> > > > > -#include <poll.h>
> > > > > #include <syslog.h>
> > > > > #include <signal.h>
> > > > > #include <exception>
> > > > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > > > > 
> > > > > }
> > > > > 
> > > > > -class Stream
> > > > > -{
> > > > > -    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > -
> > > > > -public:
> > > > > -    Stream(const char *name)
> > > > > -        : codecs()
> > > > > -    {
> > > > > -        streamfd = open(name, O_RDWR);
> > > > > -        if (streamfd < 0) {
> > > > > -            throw IOError("failed to open streaming device", errno);
> > > > > -        }
> > > > > -    }
> > > > > -    ~Stream()
> > > > > -    {
> > > > > -        close(streamfd);
> > > > > -    }
> > > > > -
> > > > > -    const codecs_t &client_codecs() { return codecs; }
> > > > > -    bool streaming_requested() { return is_streaming; }
> > > > > -
> > > > > -    template <typename Message, typename ...PayloadArgs>
> > > > > -    void send(PayloadArgs... payload)
> > > > > -    {
> > > > > -        Message message(payload...);
> > > > > -        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > -        message.write_header(*this);
> > > > > -        message.write_message_body(*this, payload...);
> > > > > -    }
> > > > > -
> > > > > -    int read_command(bool blocking);
> > > > > -    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > -
> > > > > -private:
> > > > > -    int have_something_to_read(int timeout);
> > > > > -    void handle_stream_start_stop(uint32_t len);
> > > > > -    void handle_stream_capabilities(uint32_t len);
> > > > > -    void handle_stream_error(uint32_t len);
> > > > > -    void read_command_from_device(void);
> > > > > -
> > > > > -private:
> > > > > -    std::mutex mutex;
> > > > > -    codecs_t codecs;
> > > > > -    int streamfd = -1;
> > > > > -    bool is_streaming = false;
> > > > > -};
> > > > > -
> > > > > -template <typename Payload, typename Info, unsigned Type>
> > > > > -class Message
> > > > > -{
> > > > > -public:
> > > > > -    template <typename ...PayloadArgs>
> > > > > -    Message(PayloadArgs... payload)
> > > > > -        : hdr(StreamDevHeader {
> > > > > -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > -              .type = Type,
> > > > > -              .size = (uint32_t) Info::size(payload...)
> > > > > -          })
> > > > > -    { }
> > > > > -    void write_header(Stream &stream)
> > > > > -    {
> > > > > -        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > -    }
> > > > > -
> > > > > -protected:
> > > > > -    StreamDevHeader hdr;
> > > > > -    typedef Payload payload_t;
> > > > > -};
> > > > > -
> > > > > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> > > > > {
> > > > > public:
> > > > > @@ -156,20 +86,6 @@ public:
> > > > >    }
> > > > > };
> > > > > 
> > > > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > > > -{
> > > > > -public:
> > > > > -    CapabilitiesMessage() : Message() {}
> > > > > -    static size_t size()
> > > > > -    {
> > > > > -        return sizeof(payload_t);
> > > > > -    }
> > > > > -    void write_message_body(Stream &stream)
> > > > > -    {
> > > > > -        /* No body for capabilities message */
> > > > > -    }
> > > > > -};
> > > > > -
> > > > > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > > > {
> > > > > public:
> > > > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
> > > > > 
> > > > > }} // namespace spice::streaming_agent
> > > > > 
> > > > > -static bool quit_requested = false;
> > > > > -
> > > > > -int Stream::have_something_to_read(int timeout)
> > > > > -{
> > > > > -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > -
> > > > > -    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > -        syslog(LOG_ERR, "poll FAILED\n");
> > > > > -        return -1;
> > > > > -    }
> > > > > -
> > > > > -    if (pollfd.revents == POLLIN) {
> > > > > -        return 1;
> > > > > -    }
> > > > > -
> > > > > -    return 0;
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > > > -{
> > > > > -    uint8_t msg[256];
> > > > > -
> > > > > -    if (len >= sizeof(msg)) {
> > > > > -        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > -    }
> > > > > -    int n = read(streamfd, &msg, len);
> > > > > -    if (n != (int) len) {
> > > > > -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > -    }
> > > > > -    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > -           is_streaming ? "START" : "STOP");
> > > > > -    codecs.clear();
> > > > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > > > -        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > -    }
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > > > -{
> > > > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > -
> > > > > -    if (len > sizeof(caps)) {
> > > > > -        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > -    }
> > > > > -    int n = read(streamfd, caps, len);
> > > > > -    if (n != (int) len) {
> > > > > -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > -    }
> > > > > -
> > > > > -    // we currently do not support extensions so just reply so
> > > > > -    send<CapabilitiesMessage>();
> > > > > -}
> > > > > -
> > > > > -void Stream::handle_stream_error(uint32_t len)
> > > > > -{
> > > > > -    // TODO read message and use it
> > > > > -    throw ProtocolError("got an error message from server");
> > > > > -}
> > > > > -
> > > > > -void Stream::read_command_from_device()
> > > > > -{
> > > > > -    StreamDevHeader hdr;
> > > > > -    int n;
> > > > > -
> > > > > -    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > -    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > -    if (n != sizeof(hdr)) {
> > > > > -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > -    }
> > > > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > -    }
> > > > > -
> > > > > -    switch (hdr.type) {
> > > > > -    case STREAM_TYPE_CAPABILITIES:
> > > > > -        return handle_stream_capabilities(hdr.size);
> > > > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > -        return handle_stream_error(hdr.size);
> > > > > -    case STREAM_TYPE_START_STOP:
> > > > > -        return handle_stream_start_stop(hdr.size);
> > > > > -    }
> > > > > -    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > -}
> > > > > -
> > > > > -int Stream::read_command(bool blocking)
> > > > > -{
> > > > > -    int timeout = blocking?-1:0;
> > > > > -    while (!quit_requested) {
> > > > > -        if (!have_something_to_read(timeout)) {
> > > > > -            if (!blocking) {
> > > > > -                return 0;
> > > > > -            }
> > > > > -            sleep(1);
> > > > > -            continue;
> > > > > -        }
> > > > > -        read_command_from_device();
> > > > > -        break;
> > > > > -    }
> > > > > -
> > > > > -    return 1;
> > > > > -}
> > > > > -
> > > > > -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > -{
> > > > > -    size_t written = 0;
> > > > > -    while (written < len) {
> > > > > -        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > -        if (l < 0) {
> > > > > -            if (errno == EINTR) {
> > > > > -                continue;
> > > > > -            }
> > > > > -            throw WriteError("write failed", operation, errno).syslog();
> > > > > -        }
> > > > > -        written += l;
> > > > > -    }
> > > > > -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > -}
> > > > > +bool quit_requested = false;
> > > > > 
> > > > > static void handle_interrupt(int intr)
> > > > > {
> > > > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > > > new file mode 100644
> > > > > index 0000000..f756097
> > > > > --- /dev/null
> > > > > +++ b/src/stream.cpp
> > > > > @@ -0,0 +1,172 @@
> > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +
> > > > > +#include "stream.hpp"
> > > > > +#include "message.hpp"
> > > > > +
> > > > > +#include <spice/stream-device.h>
> > > > > +
> > > > > +#include <spice-streaming-agent/errors.hpp>
> > > > > +
> > > > > +#include <sys/types.h>
> > > > > +#include <sys/stat.h>
> > > > > +#include <fcntl.h>
> > > > > +#include <poll.h>
> > > > > +#include <syslog.h>
> > > > > +#include <unistd.h>
> > > > > +
> > > > > +namespace spice
> > > > > +{
> > > > > +namespace streaming_agent
> > > > > +{
> > > > > +
> > > > > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> > > > > +                                           STREAM_TYPE_CAPABILITIES>
> > > > > +{
> > > > > +public:
> > > > > +    CapabilitiesMessage() : Message() {}
> > > > > +    static size_t size()
> > > > > +    {
> > > > > +        return sizeof(payload_t);
> > > > > +    }
> > > > > +    void write_message_body(Stream &stream)
> > > > > +    {
> > > > > +        /* No body for capabilities message */
> > > > > +    }
> > > > > +};
> > > > 
> > > > Not sure I like scattering the messages across source files that happen
> > > > to use them, though I suppose you did it because each message (like the
> > > > X11Cursor) may require different header files included? Perhaps it is
> > > > the way to go…
> > > 
> > > No, it’s really to de-couple things, a good way to check if encapsulation was correct.
> > > 
> > > 
> > > > 
> > > > > +
> > > > > +Stream::Stream(const char *name)
> > > > > +    : codecs()
> > > > > +{
> > > > > +    streamfd = open(name, O_RDWR);
> > > > > +    if (streamfd < 0) {
> > > > > +        throw IOError("failed to open streaming device", errno);
> > > > > +    }
> > > > > +}
> > > > > +
> > > > > +Stream::~Stream()
> > > > > +{
> > > > > +    close(streamfd);
> > > > > +}
> > > > > +
> > > > > +int Stream::have_something_to_read(int timeout)
> > > > > +{
> > > > > +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > +
> > > > > +    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > +        syslog(LOG_ERR, "poll FAILED\n");
> > > > > +        return -1;
> > > > > +    }
> > > > > +
> > > > > +    if (pollfd.revents == POLLIN) {
> > > > > +        return 1;
> > > > > +    }
> > > > > +
> > > > > +    return 0;
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > > > +{
> > > > > +    uint8_t msg[256];
> > > > > +
> > > > > +    if (len >= sizeof(msg)) {
> > > > > +        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > +    }
> > > > > +    int n = read(streamfd, &msg, len);
> > > > > +    if (n != (int) len) {
> > > > > +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > +    }
> > > > > +    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > +           is_streaming ? "START" : "STOP");
> > > > > +    codecs.clear();
> > > > > +    for (int i = 1; i <= msg[0]; ++i) {
> > > > > +        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > +    }
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > > > +{
> > > > > +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > +
> > > > > +    if (len > sizeof(caps)) {
> > > > > +        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > +    }
> > > > > +    int n = read(streamfd, caps, len);
> > > > > +    if (n != (int) len) {
> > > > > +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > +    }
> > > > > +
> > > > > +    // we currently do not support extensions so just reply so
> > > > > +    send<CapabilitiesMessage>();
> > > > > +}
> > > > > +
> > > > > +void Stream::handle_stream_error(uint32_t len)
> > > > > +{
> > > > > +    // TODO read message and use it
> > > > > +    throw ProtocolError("got an error message from server");
> > > > > +}
> > > > > +
> > > > > +void Stream::read_command_from_device()
> > > > > +{
> > > > > +    StreamDevHeader hdr;
> > > > > +    int n;
> > > > > +
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > +    if (n != sizeof(hdr)) {
> > > > > +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > +    }
> > > > > +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > +    }
> > > > > +
> > > > > +    switch (hdr.type) {
> > > > > +    case STREAM_TYPE_CAPABILITIES:
> > > > > +        return handle_stream_capabilities(hdr.size);
> > > > > +    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > +        return handle_stream_error(hdr.size);
> > > > > +    case STREAM_TYPE_START_STOP:
> > > > > +        return handle_stream_start_stop(hdr.size);
> > > > > +    }
> > > > > +    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > +}
> > > > > +
> > > > > +int Stream::read_command(bool blocking)
> > > > > +{
> > > > > +    int timeout = blocking?-1:0;
> > > > > +    while (!quit_requested) {
> > > > > +        if (!have_something_to_read(timeout)) {
> > > > > +            if (!blocking) {
> > > > > +                return 0;
> > > > > +            }
> > > > > +            sleep(1);
> > > > > +            continue;
> > > > > +        }
> > > > > +        read_command_from_device();
> > > > > +        break;
> > > > > +    }
> > > > > +
> > > > > +    return 1;
> > > > > +}
> > > > > +
> > > > > +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > +{
> > > > > +    size_t written = 0;
> > > > > +    while (written < len) {
> > > > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > +        if (l < 0) {
> > > > > +            if (errno == EINTR) {
> > > > > +                continue;
> > > > > +            }
> > > > > +            throw WriteError("write failed", operation, errno).syslog();
> > > > > +        }
> > > > > +        written += l;
> > > > > +    }
> > > > > +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > +}
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > > > new file mode 100644
> > > > > index 0000000..b689f36
> > > > > --- /dev/null
> > > > > +++ b/src/stream.hpp
> > > > > @@ -0,0 +1,55 @@
> > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > + *
> > > > > + * \copyright
> > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > + */
> > > > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > +
> > > > > +#include <spice/enums.h>
> > > > > +#include <set>
> > > > > +#include <mutex>
> > > > > +
> > > > > +namespace spice {
> > > > > +namespace streaming_agent {
> > > > > +
> > > > > +class Stream
> > > > > +{
> > > > > +    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > +
> > > > > +public:
> > > > > +    Stream(const char *name);
> > > > > +    ~Stream();
> > > > > +
> > > > > +    const codecs_t &client_codecs() { return codecs; }
> > > > > +    bool streaming_requested() { return is_streaming; }
> > > > > +
> > > > > +    template <typename Message, typename ...PayloadArgs>
> > > > > +    void send(PayloadArgs... payload)
> > > > > +    {
> > > > > +        Message message(payload...);
> > > > > +        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +        message.write_header(*this);
> > > > > +        message.write_message_body(*this, payload...);
> > > > > +    }
> > > > > +
> > > > > +    int read_command(bool blocking);
> > > > > +    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > +
> > > > > +private:
> > > > > +    int have_something_to_read(int timeout);
> > > > > +    void handle_stream_start_stop(uint32_t len);
> > > > > +    void handle_stream_capabilities(uint32_t len);
> > > > > +    void handle_stream_error(uint32_t len);
> > > > > +    void read_command_from_device(void);
> > > > > +
> > > > > +private:
> > > > > +    std::mutex mutex;
> > > > > +    codecs_t codecs;
> > > > > +    int streamfd = -1;
> > > > > +    bool is_streaming = false;
> > > > > +};
> > > > > +
> > > > > +}} // namespace spice::streaming_agent
> > > > > +
> > > > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> 
>
> On 2 Mar 2018, at 16:46, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> 
> On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
>>> On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky@redhat.com> wrote:
>>> 
>>> On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
>>>>> On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
>>>>> 
>>>>> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>>>>>> From: Christophe de Dinechin <dinechin@redhat.com>
>>>>>> 
>>>>>> Doing this change will make it possible to move the capture loop to the
>>>>>> concrete-agent.cpp file.
>>>>>> 
>>>>>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>>>>>> ---
>>>>>> include/spice-streaming-agent/errors.hpp |   2 +
>>>>>> src/Makefile.am                          |   2 +
>>>>>> src/message.hpp                          |  41 ++++++
>>>>>> src/spice-streaming-agent.cpp            | 209 +------------------------------
>>>>>> src/stream.cpp                           | 172 +++++++++++++++++++++++++
>>>>>> src/stream.hpp                           |  55 ++++++++
>>>>>> 6 files changed, 276 insertions(+), 205 deletions(-)
>>>>>> create mode 100644 src/message.hpp
>>>>>> create mode 100644 src/stream.cpp
>>>>>> create mode 100644 src/stream.hpp
>>>>>> 
>>>>>> diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
>>>>>> index 870a0fd..62ae010 100644
>>>>>> --- a/include/spice-streaming-agent/errors.hpp
>>>>>> +++ b/include/spice-streaming-agent/errors.hpp
>>>>>> @@ -90,4 +90,6 @@ protected:
>>>>>> 
>>>>>> }} // namespace spice::streaming_agent
>>>>>> 
>>>>>> +extern bool quit_requested;
>>>>> 
>>>>> Putting quit_requested into errors.hpp? Why?
>>>> 
>>>> Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
>>> 
>>> I don't think the flag belongs to the errors header at all, let alone a
>>> public one. It's a generic control flow mechanism to signal the
>>> termination of the program. The only relation to errors is that in case
>>> of errors you want to (usually) also quit.
>> 
>> Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.
> 
> But a termination signal is not an error, it is the natural way to end
> the program. Actually, exceptions thrown inside the main program loop
> are another, second way to exit the loop besides setting the quit flag.

The termination signal in itself is not an error. But it leads to either an error in one of the system calls, or a graceful interruption if we are elsewhere, see below.

> 
> From another point of view, you include errors.h in each place where
> you are either throwing or catching exceptions. But the quit flag is
> only needed in the main loop and in the signal handler.

Actually, for correct operation, it’s also needed in read_all and poll, as these system calls may be interrupted. The current code is broken, I have another fix but that was not submitted yet. Something like: https://github.com/c3d/spice-streaming-agent/commit/de11d77e9b6fcb92f29eacc7da178624783aea6b (similar tests are needed in read_all and write_all, I believe). This is what fixes the “Control-C” problem.

Now, I see these as “OS errors” that are not reported through exception but using errno. The proper handling of this kind of OS errors requires a check of quit_requested.

Obviously, we can put this flag somewhere else if that really annoys you, but frankly, I think that errors.hpp is as good a place as any.

> 
>>> 
>>> Therefore the quit flag should not be coupled with errors and instead
>>> used in the main control flow spot where global error handling is
>>> taking place (and then in the signal handler). As a static variable it
>>> should also be proliferating through the program as little as possible.
>>> 
>>> I've actually had the following idea for the quit flag, which I think
>>> promotes the locality of the flag in the class design:
>>> 
>>> 
>>> // file agent.{hpp,cpp}
>>> class Agent {
>>> public:
>>>   bool& quit_flag() {
>>>       return quit_requested;
>>>   }
>>> 
>>>   void do_capture() {
>>>       while(!quit_requested) {
>>>           // ...
>>>       }
>>>   }
>>> private:
>>>   bool quit_requested = false;
>>> };
>>> 
>>> 
>>> // file main.cpp
>>> static bool* quit_requested = nullptr;
>>> 
>>> void handle_sigterm() {
>>>   *quit_requested = true;
>>> }
>>> 
>>> int main() {
>>>   Agent agent;
>>>   quit_requested = &agent.quit_flag();
>>>   ...
>>> }
>>> 
>>> 
>>> Some corner case handling (the quit_reqested pointer not yet set, etc.)
>>> was left out for clarity.
>> 
>> I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?
> 
> Only this one global variable that is in the example. And the only
> reason it is global is because of the signal handler, there is no other
> way for it work. Therefore, I only make it "global locally" (excuse the
> oxymoron :)) for the signal handler and limit what can access it as
> much as I can.
> 
> (And it's just a pointer, the real flag is local to the loop which uses
> it)

Adding one level of indirection here does not look like an improvement to me.

If you want some kind of encapsulation, why not make the signal handler and the quit flag static members of the agent instead? Would that work for you? Something like this: https://github.com/c3d/spice-streaming-agent/commit/077ad90ad2f923b90546a3be99988ddf45746ea7 ?

> 
>>> 
>>> This keeps the flag local to the loop in which it is relevant and the
>>> static variable local to the main.cpp file. Thus it increases
>>> modularity (which arguably we do not need that much here).
>>> 
>>> I would also use a different quit flag for the cursor thread (again,
>>> local to the X11CursorUpdater) and take care of it in main() after the
>>> Agent::do_capture() loop quits.
>> 
>> What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?
> 
> No, I don't think there are. The reason is different. To me one global
> "quit all" flag seems like a not well formed hierarchy of execution,
> for one, it could introduce race conditions, if we generalize and say
> we have several losely dependant threads/processes and you signal them
> all to quit, the order they do so is arbitrary. Of course you can
> introduce mechanisms like join() etc. to synchronize.
 
We have such issues presently with the cursor thread. This is fixed in my series, it now exits cleanly in all test cases I threw at it instead of aborting or terminating.

> 
> But what I would consider a better design would be one main loop in the
> main thread, that also reacts to the signals. If the main loop exits,
> then it would tear down the other threads signalling them in whatever
> way is natural to the thread in question and waiting for them to
> finish. So you define the hierarchy, there is the main thread taking
> responsibility and subthreads that are managed by it.

This is more or less what I was describing with the quit flag and signal handler being static members of the agent. I’m OK with that.

> 
>> To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.
> 
> As I explained. I'm not necessarily saying your approach is wrong, just that:
> 
> 1. I prefer to explicitely define a main thread that has the management responsibility.

As I see it, that’s how the code after refactoring works.

> 2. The static flag shared across classes simply seems to me a thing to
> avoid, it breaks modularity and for example makes it a bit more fiddly
> to use the modules in tests for example…

Except for something that is truly global.

I don’t mind making it a static member of the agent.

I do mind having multiple “quit” flags, or a pointer to the quit flag. What happens if the signal handler is invoked before the quit flag pointer is initialized, for example?


> 
>>> 
>>>> 
>>>>> 
>>>>>> +
>>>>>> #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
>>>>>> diff --git a/src/Makefile.am b/src/Makefile.am
>>>>>> index 2507844..923a103 100644
>>>>>> --- a/src/Makefile.am
>>>>>> +++ b/src/Makefile.am
>>>>>> @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
>>>>>> 	mjpeg-fallback.hpp \
>>>>>> 	jpeg.cpp \
>>>>>> 	jpeg.hpp \
>>>>>> +	stream.cpp \
>>>>>> +	stream.hpp \
>>>>>> 	errors.cpp \
>>>>>> 	$(NULL)
>>>>>> diff --git a/src/message.hpp b/src/message.hpp
>>>>>> new file mode 100644
>>>>>> index 0000000..28b3e28
>>>>>> --- /dev/null
>>>>>> +++ b/src/message.hpp
>>>>>> @@ -0,0 +1,41 @@
>>>>>> +/* Formatting messages
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> +
>>>>>> +#include <spice/stream-device.h>
>>>>>> +
>>>>>> +namespace spice
>>>>>> +{
>>>>>> +namespace streaming_agent
>>>>>> +{
>>>>>> +
>>>>>> +template <typename Payload, typename Info, unsigned Type>
>>>>>> +class Message
>>>>>> +{
>>>>>> +public:
>>>>>> +    template <typename ...PayloadArgs>
>>>>>> +    Message(PayloadArgs... payload)
>>>>>> +        : hdr(StreamDevHeader {
>>>>>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>>>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>>>> +              .type = Type,
>>>>>> +              .size = (uint32_t) Info::size(payload...)
>>>>>> +          })
>>>>>> +    { }
>>>>>> +    void write_header(Stream &stream)
>>>>>> +    {
>>>>>> +        stream.write_all("header", &hdr, sizeof(hdr));
>>>>>> +    }
>>>>>> +
>>>>>> +protected:
>>>>>> +    StreamDevHeader hdr;
>>>>>> +    typedef Payload payload_t;
>>>>>> +};
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> +
>>>>>> +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
>>>>>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>>>>>> index 35e65bb..c401a34 100644
>>>>>> --- a/src/spice-streaming-agent.cpp
>>>>>> +++ b/src/spice-streaming-agent.cpp
>>>>>> @@ -5,6 +5,8 @@
>>>>>> */
>>>>>> 
>>>>>> #include "concrete-agent.hpp"
>>>>>> +#include "stream.hpp"
>>>>>> +#include "message.hpp"
>>>>>> #include "hexdump.h"
>>>>>> #include "mjpeg-fallback.hpp"
>>>>>> 
>>>>>> @@ -21,11 +23,9 @@
>>>>>> #include <inttypes.h>
>>>>>> #include <string.h>
>>>>>> #include <getopt.h>
>>>>>> -#include <unistd.h>
>>>>>> #include <errno.h>
>>>>>> -#include <fcntl.h>
>>>>>> +#include <unistd.h>
>>>>>> #include <sys/time.h>
>>>>>> -#include <poll.h>
>>>>>> #include <syslog.h>
>>>>>> #include <signal.h>
>>>>>> #include <exception>
>>>>>> @@ -57,76 +57,6 @@ static uint64_t get_time(void)
>>>>>> 
>>>>>> }
>>>>>> 
>>>>>> -class Stream
>>>>>> -{
>>>>>> -    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>>>> -
>>>>>> -public:
>>>>>> -    Stream(const char *name)
>>>>>> -        : codecs()
>>>>>> -    {
>>>>>> -        streamfd = open(name, O_RDWR);
>>>>>> -        if (streamfd < 0) {
>>>>>> -            throw IOError("failed to open streaming device", errno);
>>>>>> -        }
>>>>>> -    }
>>>>>> -    ~Stream()
>>>>>> -    {
>>>>>> -        close(streamfd);
>>>>>> -    }
>>>>>> -
>>>>>> -    const codecs_t &client_codecs() { return codecs; }
>>>>>> -    bool streaming_requested() { return is_streaming; }
>>>>>> -
>>>>>> -    template <typename Message, typename ...PayloadArgs>
>>>>>> -    void send(PayloadArgs... payload)
>>>>>> -    {
>>>>>> -        Message message(payload...);
>>>>>> -        std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> -        message.write_header(*this);
>>>>>> -        message.write_message_body(*this, payload...);
>>>>>> -    }
>>>>>> -
>>>>>> -    int read_command(bool blocking);
>>>>>> -    void write_all(const char *operation, const void *buf, const size_t len);
>>>>>> -
>>>>>> -private:
>>>>>> -    int have_something_to_read(int timeout);
>>>>>> -    void handle_stream_start_stop(uint32_t len);
>>>>>> -    void handle_stream_capabilities(uint32_t len);
>>>>>> -    void handle_stream_error(uint32_t len);
>>>>>> -    void read_command_from_device(void);
>>>>>> -
>>>>>> -private:
>>>>>> -    std::mutex mutex;
>>>>>> -    codecs_t codecs;
>>>>>> -    int streamfd = -1;
>>>>>> -    bool is_streaming = false;
>>>>>> -};
>>>>>> -
>>>>>> -template <typename Payload, typename Info, unsigned Type>
>>>>>> -class Message
>>>>>> -{
>>>>>> -public:
>>>>>> -    template <typename ...PayloadArgs>
>>>>>> -    Message(PayloadArgs... payload)
>>>>>> -        : hdr(StreamDevHeader {
>>>>>> -              .protocol_version = STREAM_DEVICE_PROTOCOL,
>>>>>> -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>>>>>> -              .type = Type,
>>>>>> -              .size = (uint32_t) Info::size(payload...)
>>>>>> -          })
>>>>>> -    { }
>>>>>> -    void write_header(Stream &stream)
>>>>>> -    {
>>>>>> -        stream.write_all("header", &hdr, sizeof(hdr));
>>>>>> -    }
>>>>>> -
>>>>>> -protected:
>>>>>> -    StreamDevHeader hdr;
>>>>>> -    typedef Payload payload_t;
>>>>>> -};
>>>>>> -
>>>>>> class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>>>>>> {
>>>>>> public:
>>>>>> @@ -156,20 +86,6 @@ public:
>>>>>>   }
>>>>>> };
>>>>>> 
>>>>>> -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>>>>>> -{
>>>>>> -public:
>>>>>> -    CapabilitiesMessage() : Message() {}
>>>>>> -    static size_t size()
>>>>>> -    {
>>>>>> -        return sizeof(payload_t);
>>>>>> -    }
>>>>>> -    void write_message_body(Stream &stream)
>>>>>> -    {
>>>>>> -        /* No body for capabilities message */
>>>>>> -    }
>>>>>> -};
>>>>>> -
>>>>>> class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>>>>>> {
>>>>>> public:
>>>>>> @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
>>>>>> 
>>>>>> }} // namespace spice::streaming_agent
>>>>>> 
>>>>>> -static bool quit_requested = false;
>>>>>> -
>>>>>> -int Stream::have_something_to_read(int timeout)
>>>>>> -{
>>>>>> -    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>>> -
>>>>>> -    if (poll(&pollfd, 1, timeout) < 0) {
>>>>>> -        syslog(LOG_ERR, "poll FAILED\n");
>>>>>> -        return -1;
>>>>>> -    }
>>>>>> -
>>>>>> -    if (pollfd.revents == POLLIN) {
>>>>>> -        return 1;
>>>>>> -    }
>>>>>> -
>>>>>> -    return 0;
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_start_stop(uint32_t len)
>>>>>> -{
>>>>>> -    uint8_t msg[256];
>>>>>> -
>>>>>> -    if (len >= sizeof(msg)) {
>>>>>> -        throw MessageDataError("message is too long", len, sizeof(msg));
>>>>>> -    }
>>>>>> -    int n = read(streamfd, &msg, len);
>>>>>> -    if (n != (int) len) {
>>>>>> -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>>>> -    }
>>>>>> -    is_streaming = (msg[0] != 0); /* num_codecs */
>>>>>> -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>>>> -           is_streaming ? "START" : "STOP");
>>>>>> -    codecs.clear();
>>>>>> -    for (int i = 1; i <= msg[0]; ++i) {
>>>>>> -        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>>>> -    }
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_capabilities(uint32_t len)
>>>>>> -{
>>>>>> -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>>>> -
>>>>>> -    if (len > sizeof(caps)) {
>>>>>> -        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>>>> -    }
>>>>>> -    int n = read(streamfd, caps, len);
>>>>>> -    if (n != (int) len) {
>>>>>> -        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>>>> -    }
>>>>>> -
>>>>>> -    // we currently do not support extensions so just reply so
>>>>>> -    send<CapabilitiesMessage>();
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::handle_stream_error(uint32_t len)
>>>>>> -{
>>>>>> -    // TODO read message and use it
>>>>>> -    throw ProtocolError("got an error message from server");
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::read_command_from_device()
>>>>>> -{
>>>>>> -    StreamDevHeader hdr;
>>>>>> -    int n;
>>>>>> -
>>>>>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> -    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>> -    if (n != sizeof(hdr)) {
>>>>>> -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>>>> -    }
>>>>>> -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>>>> -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>>>> -    }
>>>>>> -
>>>>>> -    switch (hdr.type) {
>>>>>> -    case STREAM_TYPE_CAPABILITIES:
>>>>>> -        return handle_stream_capabilities(hdr.size);
>>>>>> -    case STREAM_TYPE_NOTIFY_ERROR:
>>>>>> -        return handle_stream_error(hdr.size);
>>>>>> -    case STREAM_TYPE_START_STOP:
>>>>>> -        return handle_stream_start_stop(hdr.size);
>>>>>> -    }
>>>>>> -    throw MessageDataError("unknown message type", hdr.type, 0);
>>>>>> -}
>>>>>> -
>>>>>> -int Stream::read_command(bool blocking)
>>>>>> -{
>>>>>> -    int timeout = blocking?-1:0;
>>>>>> -    while (!quit_requested) {
>>>>>> -        if (!have_something_to_read(timeout)) {
>>>>>> -            if (!blocking) {
>>>>>> -                return 0;
>>>>>> -            }
>>>>>> -            sleep(1);
>>>>>> -            continue;
>>>>>> -        }
>>>>>> -        read_command_from_device();
>>>>>> -        break;
>>>>>> -    }
>>>>>> -
>>>>>> -    return 1;
>>>>>> -}
>>>>>> -
>>>>>> -void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>>>> -{
>>>>>> -    size_t written = 0;
>>>>>> -    while (written < len) {
>>>>>> -        int l = write(streamfd, (const char *) buf + written, len - written);
>>>>>> -        if (l < 0) {
>>>>>> -            if (errno == EINTR) {
>>>>>> -                continue;
>>>>>> -            }
>>>>>> -            throw WriteError("write failed", operation, errno).syslog();
>>>>>> -        }
>>>>>> -        written += l;
>>>>>> -    }
>>>>>> -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>>>> -}
>>>>>> +bool quit_requested = false;
>>>>>> 
>>>>>> static void handle_interrupt(int intr)
>>>>>> {
>>>>>> diff --git a/src/stream.cpp b/src/stream.cpp
>>>>>> new file mode 100644
>>>>>> index 0000000..f756097
>>>>>> --- /dev/null
>>>>>> +++ b/src/stream.cpp
>>>>>> @@ -0,0 +1,172 @@
>>>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +
>>>>>> +#include "stream.hpp"
>>>>>> +#include "message.hpp"
>>>>>> +
>>>>>> +#include <spice/stream-device.h>
>>>>>> +
>>>>>> +#include <spice-streaming-agent/errors.hpp>
>>>>>> +
>>>>>> +#include <sys/types.h>
>>>>>> +#include <sys/stat.h>
>>>>>> +#include <fcntl.h>
>>>>>> +#include <poll.h>
>>>>>> +#include <syslog.h>
>>>>>> +#include <unistd.h>
>>>>>> +
>>>>>> +namespace spice
>>>>>> +{
>>>>>> +namespace streaming_agent
>>>>>> +{
>>>>>> +
>>>>>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
>>>>>> +                                           STREAM_TYPE_CAPABILITIES>
>>>>>> +{
>>>>>> +public:
>>>>>> +    CapabilitiesMessage() : Message() {}
>>>>>> +    static size_t size()
>>>>>> +    {
>>>>>> +        return sizeof(payload_t);
>>>>>> +    }
>>>>>> +    void write_message_body(Stream &stream)
>>>>>> +    {
>>>>>> +        /* No body for capabilities message */
>>>>>> +    }
>>>>>> +};
>>>>> 
>>>>> Not sure I like scattering the messages across source files that happen
>>>>> to use them, though I suppose you did it because each message (like the
>>>>> X11Cursor) may require different header files included? Perhaps it is
>>>>> the way to go…
>>>> 
>>>> No, it’s really to de-couple things, a good way to check if encapsulation was correct.
>>>> 
>>>> 
>>>>> 
>>>>>> +
>>>>>> +Stream::Stream(const char *name)
>>>>>> +    : codecs()
>>>>>> +{
>>>>>> +    streamfd = open(name, O_RDWR);
>>>>>> +    if (streamfd < 0) {
>>>>>> +        throw IOError("failed to open streaming device", errno);
>>>>>> +    }
>>>>>> +}
>>>>>> +
>>>>>> +Stream::~Stream()
>>>>>> +{
>>>>>> +    close(streamfd);
>>>>>> +}
>>>>>> +
>>>>>> +int Stream::have_something_to_read(int timeout)
>>>>>> +{
>>>>>> +    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>>> +
>>>>>> +    if (poll(&pollfd, 1, timeout) < 0) {
>>>>>> +        syslog(LOG_ERR, "poll FAILED\n");
>>>>>> +        return -1;
>>>>>> +    }
>>>>>> +
>>>>>> +    if (pollfd.revents == POLLIN) {
>>>>>> +        return 1;
>>>>>> +    }
>>>>>> +
>>>>>> +    return 0;
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_start_stop(uint32_t len)
>>>>>> +{
>>>>>> +    uint8_t msg[256];
>>>>>> +
>>>>>> +    if (len >= sizeof(msg)) {
>>>>>> +        throw MessageDataError("message is too long", len, sizeof(msg));
>>>>>> +    }
>>>>>> +    int n = read(streamfd, &msg, len);
>>>>>> +    if (n != (int) len) {
>>>>>> +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
>>>>>> +    }
>>>>>> +    is_streaming = (msg[0] != 0); /* num_codecs */
>>>>>> +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
>>>>>> +           is_streaming ? "START" : "STOP");
>>>>>> +    codecs.clear();
>>>>>> +    for (int i = 1; i <= msg[0]; ++i) {
>>>>>> +        codecs.insert((SpiceVideoCodecType) msg[i]);
>>>>>> +    }
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_capabilities(uint32_t len)
>>>>>> +{
>>>>>> +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>>>>>> +
>>>>>> +    if (len > sizeof(caps)) {
>>>>>> +        throw MessageDataError("capability message too long", len, sizeof(caps));
>>>>>> +    }
>>>>>> +    int n = read(streamfd, caps, len);
>>>>>> +    if (n != (int) len) {
>>>>>> +        throw MessageDataError("read capabilities from device failed", n, len, errno);
>>>>>> +    }
>>>>>> +
>>>>>> +    // we currently do not support extensions so just reply so
>>>>>> +    send<CapabilitiesMessage>();
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::handle_stream_error(uint32_t len)
>>>>>> +{
>>>>>> +    // TODO read message and use it
>>>>>> +    throw ProtocolError("got an error message from server");
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::read_command_from_device()
>>>>>> +{
>>>>>> +    StreamDevHeader hdr;
>>>>>> +    int n;
>>>>>> +
>>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> +    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>> +    if (n != sizeof(hdr)) {
>>>>>> +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
>>>>>> +    }
>>>>>> +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
>>>>>> +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
>>>>>> +    }
>>>>>> +
>>>>>> +    switch (hdr.type) {
>>>>>> +    case STREAM_TYPE_CAPABILITIES:
>>>>>> +        return handle_stream_capabilities(hdr.size);
>>>>>> +    case STREAM_TYPE_NOTIFY_ERROR:
>>>>>> +        return handle_stream_error(hdr.size);
>>>>>> +    case STREAM_TYPE_START_STOP:
>>>>>> +        return handle_stream_start_stop(hdr.size);
>>>>>> +    }
>>>>>> +    throw MessageDataError("unknown message type", hdr.type, 0);
>>>>>> +}
>>>>>> +
>>>>>> +int Stream::read_command(bool blocking)
>>>>>> +{
>>>>>> +    int timeout = blocking?-1:0;
>>>>>> +    while (!quit_requested) {
>>>>>> +        if (!have_something_to_read(timeout)) {
>>>>>> +            if (!blocking) {
>>>>>> +                return 0;
>>>>>> +            }
>>>>>> +            sleep(1);
>>>>>> +            continue;
>>>>>> +        }
>>>>>> +        read_command_from_device();
>>>>>> +        break;
>>>>>> +    }
>>>>>> +
>>>>>> +    return 1;
>>>>>> +}
>>>>>> +
>>>>>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>>>>>> +{
>>>>>> +    size_t written = 0;
>>>>>> +    while (written < len) {
>>>>>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>>>>>> +        if (l < 0) {
>>>>>> +            if (errno == EINTR) {
>>>>>> +                continue;
>>>>>> +            }
>>>>>> +            throw WriteError("write failed", operation, errno).syslog();
>>>>>> +        }
>>>>>> +        written += l;
>>>>>> +    }
>>>>>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>>>>>> +}
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> diff --git a/src/stream.hpp b/src/stream.hpp
>>>>>> new file mode 100644
>>>>>> index 0000000..b689f36
>>>>>> --- /dev/null
>>>>>> +++ b/src/stream.hpp
>>>>>> @@ -0,0 +1,55 @@
>>>>>> +/* Encapsulation of the stream used to communicate between agent and server
>>>>>> + *
>>>>>> + * \copyright
>>>>>> + * Copyright 2018 Red Hat Inc. All rights reserved.
>>>>>> + */
>>>>>> +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
>>>>>> +#define SPICE_STREAMING_AGENT_STREAM_HPP
>>>>>> +
>>>>>> +#include <spice/enums.h>
>>>>>> +#include <set>
>>>>>> +#include <mutex>
>>>>>> +
>>>>>> +namespace spice {
>>>>>> +namespace streaming_agent {
>>>>>> +
>>>>>> +class Stream
>>>>>> +{
>>>>>> +    typedef std::set<SpiceVideoCodecType> codecs_t;
>>>>>> +
>>>>>> +public:
>>>>>> +    Stream(const char *name);
>>>>>> +    ~Stream();
>>>>>> +
>>>>>> +    const codecs_t &client_codecs() { return codecs; }
>>>>>> +    bool streaming_requested() { return is_streaming; }
>>>>>> +
>>>>>> +    template <typename Message, typename ...PayloadArgs>
>>>>>> +    void send(PayloadArgs... payload)
>>>>>> +    {
>>>>>> +        Message message(payload...);
>>>>>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>> +        message.write_header(*this);
>>>>>> +        message.write_message_body(*this, payload...);
>>>>>> +    }
>>>>>> +
>>>>>> +    int read_command(bool blocking);
>>>>>> +    void write_all(const char *operation, const void *buf, const size_t len);
>>>>>> +
>>>>>> +private:
>>>>>> +    int have_something_to_read(int timeout);
>>>>>> +    void handle_stream_start_stop(uint32_t len);
>>>>>> +    void handle_stream_capabilities(uint32_t len);
>>>>>> +    void handle_stream_error(uint32_t len);
>>>>>> +    void read_command_from_device(void);
>>>>>> +
>>>>>> +private:
>>>>>> +    std::mutex mutex;
>>>>>> +    codecs_t codecs;
>>>>>> +    int streamfd = -1;
>>>>>> +    bool is_streaming = false;
>>>>>> +};
>>>>>> +
>>>>>> +}} // namespace spice::streaming_agent
>>>>>> +
>>>>>> +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
On Fri, 2018-03-02 at 19:25 +0100, Christophe de Dinechin wrote:
> > On 2 Mar 2018, at 16:46, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > 
> > On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
> > > > On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > > > 
> > > > On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > > > > > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > > > > > 
> > > > > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > > > > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > > > > > 
> > > > > > > Doing this change will make it possible to move the capture loop to the
> > > > > > > concrete-agent.cpp file.
> > > > > > > 
> > > > > > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > > > > > ---
> > > > > > > include/spice-streaming-agent/errors.hpp |   2 +
> > > > > > > src/Makefile.am                          |   2 +
> > > > > > > src/message.hpp                          |  41 ++++++
> > > > > > > src/spice-streaming-agent.cpp            | 209 +------------------------------
> > > > > > > src/stream.cpp                           | 172 +++++++++++++++++++++++++
> > > > > > > src/stream.hpp                           |  55 ++++++++
> > > > > > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > > > > > create mode 100644 src/message.hpp
> > > > > > > create mode 100644 src/stream.cpp
> > > > > > > create mode 100644 src/stream.hpp
> > > > > > > 
> > > > > > > diff --git a/include/spice-streaming-agent/errors.hpp b/include/spice-streaming-agent/errors.hpp
> > > > > > > index 870a0fd..62ae010 100644
> > > > > > > --- a/include/spice-streaming-agent/errors.hpp
> > > > > > > +++ b/include/spice-streaming-agent/errors.hpp
> > > > > > > @@ -90,4 +90,6 @@ protected:
> > > > > > > 
> > > > > > > }} // namespace spice::streaming_agent
> > > > > > > 
> > > > > > > +extern bool quit_requested;
> > > > > > 
> > > > > > Putting quit_requested into errors.hpp? Why?
> > > > > 
> > > > > Because errors.hpp deals with error conditions. You need to quit other threads on signals or exceptions. See https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
> > > > 
> > > > I don't think the flag belongs to the errors header at all, let alone a
> > > > public one. It's a generic control flow mechanism to signal the
> > > > termination of the program. The only relation to errors is that in case
> > > > of errors you want to (usually) also quit.
> > > 
> > > Well, ‘quit_requested’ is set for all “final" errors, whether we detect them using exceptions or signals.
> > 
> > But a termination signal is not an error, it is the natural way to end
> > the program. Actually, exceptions thrown inside the main program loop
> > are another, second way to exit the loop besides setting the quit flag.
> 
> The termination signal in itself is not an error. But it leads to either an error in one of the system calls, or a graceful interruption if we are elsewhere, see below.

Ok, but I still don't think this warrants bundling the quit flag with
the exceptions...

> > 
> > From another point of view, you include errors.h in each place where
> > you are either throwing or catching exceptions. But the quit flag is
> > only needed in the main loop and in the signal handler.
> 
> Actually, for correct operation, it’s also needed in read_all and poll, as these system calls may be interrupted. The current code is broken, I have another fix but that was not submitted yet. Something like: https://github.com/c3d/spice-streaming-agent/commit/de11d77e9b6fcb92f29eacc7da178624783aea6b (similar tests are needed in read_all and write_all, I believe). This is what fixes the “Control-C” problem.
> 
> Now, I see these as “OS errors” that are not reported through exception but using errno. The proper handling of this kind of OS errors requires a check of quit_requested.

I see. In one of my experiments, I managed to factor out the
quit_requested flag out of the Stream functions. But that did not take
into acount this addition, so it may not be possible.

> Obviously, we can put this flag somewhere else if that really annoys you, but frankly, I think that errors.hpp is as good a place as any.

I would be glad if we did :)

> > 
> > > > 
> > > > Therefore the quit flag should not be coupled with errors and instead
> > > > used in the main control flow spot where global error handling is
> > > > taking place (and then in the signal handler). As a static variable it
> > > > should also be proliferating through the program as little as possible.
> > > > 
> > > > I've actually had the following idea for the quit flag, which I think
> > > > promotes the locality of the flag in the class design:
> > > > 
> > > > 
> > > > // file agent.{hpp,cpp}
> > > > class Agent {
> > > > public:
> > > >   bool& quit_flag() {
> > > >       return quit_requested;
> > > >   }
> > > > 
> > > >   void do_capture() {
> > > >       while(!quit_requested) {
> > > >           // ...
> > > >       }
> > > >   }
> > > > private:
> > > >   bool quit_requested = false;
> > > > };
> > > > 
> > > > 
> > > > // file main.cpp
> > > > static bool* quit_requested = nullptr;
> > > > 
> > > > void handle_sigterm() {
> > > >   *quit_requested = true;
> > > > }
> > > > 
> > > > int main() {
> > > >   Agent agent;
> > > >   quit_requested = &agent.quit_flag();
> > > >   ...
> > > > }
> > > > 
> > > > 
> > > > Some corner case handling (the quit_reqested pointer not yet set, etc.)
> > > > was left out for clarity.
> > > 
> > > I understand what you are trying to do, but you replace one global variable with one (and several if I follow the logic below), so how is helping with the proliferation?
> > 
> > Only this one global variable that is in the example. And the only
> > reason it is global is because of the signal handler, there is no other
> > way for it work. Therefore, I only make it "global locally" (excuse the
> > oxymoron :)) for the signal handler and limit what can access it as
> > much as I can.
> > 
> > (And it's just a pointer, the real flag is local to the loop which uses
> > it)
> 
> Adding one level of indirection here does not look like an improvement to me.

A level of indirection is never an improvement in itself :) the
improvement should have been no global variable shared across source
modules.

> If you want some kind of encapsulation, why not make the signal handler and the quit flag static members of the agent instead? Would that work for you? Something like this: https://github.com/c3d/spice-streaming-agent/commit/077ad90ad2f923b90546a3be99988ddf45746ea7 ?

Yes, if it has to be a global quit flag, something like this is fine by
me.

> > 
> > > > 
> > > > This keeps the flag local to the loop in which it is relevant and the
> > > > static variable local to the main.cpp file. Thus it increases
> > > > modularity (which arguably we do not need that much here).
> > > > 
> > > > I would also use a different quit flag for the cursor thread (again,
> > > > local to the X11CursorUpdater) and take care of it in main() after the
> > > > Agent::do_capture() loop quits.
> > > 
> > > What benefits do you see in having multiple flags? Are there signals where we can quit the agent without interrupting the cursor thread or other activities?
> > 
> > No, I don't think there are. The reason is different. To me one global
> > "quit all" flag seems like a not well formed hierarchy of execution,
> > for one, it could introduce race conditions, if we generalize and say
> > we have several losely dependant threads/processes and you signal them
> > all to quit, the order they do so is arbitrary. Of course you can
> > introduce mechanisms like join() etc. to synchronize.
> 
>  
> We have such issues presently with the cursor thread. This is fixed in my series, it now exits cleanly in all test cases I threw at it instead of aborting or terminating.
> 
> > 
> > But what I would consider a better design would be one main loop in the
> > main thread, that also reacts to the signals. If the main loop exits,
> > then it would tear down the other threads signalling them in whatever
> > way is natural to the thread in question and waiting for them to
> > finish. So you define the hierarchy, there is the main thread taking
> > responsibility and subthreads that are managed by it.
> 
> This is more or less what I was describing with the quit flag and signal handler being static members of the agent. I’m OK with that.
> 
> > 
> > > To me, quit_requested is the archetypical example of when a global variable should be used. There is only one “quit”, and it’s for all threads and objects in the program. How each one of them deals with it is local, but the “we must quit” request is global.
> > 
> > As I explained. I'm not necessarily saying your approach is wrong, just that:
> > 
> > 1. I prefer to explicitely define a main thread that has the management responsibility.
> 
> As I see it, that’s how the code after refactoring works.
> 
> > 2. The static flag shared across classes simply seems to me a thing to
> > avoid, it breaks modularity and for example makes it a bit more fiddly
> > to use the modules in tests for example…
> 
> Except for something that is truly global.
> 
> I don’t mind making it a static member of the agent.
> 
> I do mind having multiple “quit” flags, or a pointer to the quit flag.

Sure, it's not a big issue either, let's do it your way.

> What happens if the signal handler is invoked before the quit flag pointer is initialized, for example?

That's one of the corner cases I mentioned, you'd have to register the
signal handler only after you initialize the flag pointer. Can be a
disadvantage, I suppose.

> > 
> > > > 
> > > > > 
> > > > > > 
> > > > > > > +
> > > > > > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > > > > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > > > > > index 2507844..923a103 100644
> > > > > > > --- a/src/Makefile.am
> > > > > > > +++ b/src/Makefile.am
> > > > > > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > > > > > 	mjpeg-fallback.hpp \
> > > > > > > 	jpeg.cpp \
> > > > > > > 	jpeg.hpp \
> > > > > > > +	stream.cpp \
> > > > > > > +	stream.hpp \
> > > > > > > 	errors.cpp \
> > > > > > > 	$(NULL)
> > > > > > > diff --git a/src/message.hpp b/src/message.hpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..28b3e28
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/message.hpp
> > > > > > > @@ -0,0 +1,41 @@
> > > > > > > +/* Formatting messages
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > +
> > > > > > > +#include <spice/stream-device.h>
> > > > > > > +
> > > > > > > +namespace spice
> > > > > > > +{
> > > > > > > +namespace streaming_agent
> > > > > > > +{
> > > > > > > +
> > > > > > > +template <typename Payload, typename Info, unsigned Type>
> > > > > > > +class Message
> > > > > > > +{
> > > > > > > +public:
> > > > > > > +    template <typename ...PayloadArgs>
> > > > > > > +    Message(PayloadArgs... payload)
> > > > > > > +        : hdr(StreamDevHeader {
> > > > > > > +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > > > +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > > > +              .type = Type,
> > > > > > > +              .size = (uint32_t) Info::size(payload...)
> > > > > > > +          })
> > > > > > > +    { }
> > > > > > > +    void write_header(Stream &stream)
> > > > > > > +    {
> > > > > > > +        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > > > +    }
> > > > > > > +
> > > > > > > +protected:
> > > > > > > +    StreamDevHeader hdr;
> > > > > > > +    typedef Payload payload_t;
> > > > > > > +};
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > +
> > > > > > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > > > > > index 35e65bb..c401a34 100644
> > > > > > > --- a/src/spice-streaming-agent.cpp
> > > > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > > > @@ -5,6 +5,8 @@
> > > > > > > */
> > > > > > > 
> > > > > > > #include "concrete-agent.hpp"
> > > > > > > +#include "stream.hpp"
> > > > > > > +#include "message.hpp"
> > > > > > > #include "hexdump.h"
> > > > > > > #include "mjpeg-fallback.hpp"
> > > > > > > 
> > > > > > > @@ -21,11 +23,9 @@
> > > > > > > #include <inttypes.h>
> > > > > > > #include <string.h>
> > > > > > > #include <getopt.h>
> > > > > > > -#include <unistd.h>
> > > > > > > #include <errno.h>
> > > > > > > -#include <fcntl.h>
> > > > > > > +#include <unistd.h>
> > > > > > > #include <sys/time.h>
> > > > > > > -#include <poll.h>
> > > > > > > #include <syslog.h>
> > > > > > > #include <signal.h>
> > > > > > > #include <exception>
> > > > > > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > > > > > > 
> > > > > > > }
> > > > > > > 
> > > > > > > -class Stream
> > > > > > > -{
> > > > > > > -    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > > > -
> > > > > > > -public:
> > > > > > > -    Stream(const char *name)
> > > > > > > -        : codecs()
> > > > > > > -    {
> > > > > > > -        streamfd = open(name, O_RDWR);
> > > > > > > -        if (streamfd < 0) {
> > > > > > > -            throw IOError("failed to open streaming device", errno);
> > > > > > > -        }
> > > > > > > -    }
> > > > > > > -    ~Stream()
> > > > > > > -    {
> > > > > > > -        close(streamfd);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    const codecs_t &client_codecs() { return codecs; }
> > > > > > > -    bool streaming_requested() { return is_streaming; }
> > > > > > > -
> > > > > > > -    template <typename Message, typename ...PayloadArgs>
> > > > > > > -    void send(PayloadArgs... payload)
> > > > > > > -    {
> > > > > > > -        Message message(payload...);
> > > > > > > -        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > -        message.write_header(*this);
> > > > > > > -        message.write_message_body(*this, payload...);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    int read_command(bool blocking);
> > > > > > > -    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > > > -
> > > > > > > -private:
> > > > > > > -    int have_something_to_read(int timeout);
> > > > > > > -    void handle_stream_start_stop(uint32_t len);
> > > > > > > -    void handle_stream_capabilities(uint32_t len);
> > > > > > > -    void handle_stream_error(uint32_t len);
> > > > > > > -    void read_command_from_device(void);
> > > > > > > -
> > > > > > > -private:
> > > > > > > -    std::mutex mutex;
> > > > > > > -    codecs_t codecs;
> > > > > > > -    int streamfd = -1;
> > > > > > > -    bool is_streaming = false;
> > > > > > > -};
> > > > > > > -
> > > > > > > -template <typename Payload, typename Info, unsigned Type>
> > > > > > > -class Message
> > > > > > > -{
> > > > > > > -public:
> > > > > > > -    template <typename ...PayloadArgs>
> > > > > > > -    Message(PayloadArgs... payload)
> > > > > > > -        : hdr(StreamDevHeader {
> > > > > > > -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > > > -              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> > > > > > > -              .type = Type,
> > > > > > > -              .size = (uint32_t) Info::size(payload...)
> > > > > > > -          })
> > > > > > > -    { }
> > > > > > > -    void write_header(Stream &stream)
> > > > > > > -    {
> > > > > > > -        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > > > -    }
> > > > > > > -
> > > > > > > -protected:
> > > > > > > -    StreamDevHeader hdr;
> > > > > > > -    typedef Payload payload_t;
> > > > > > > -};
> > > > > > > -
> > > > > > > class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> > > > > > > {
> > > > > > > public:
> > > > > > > @@ -156,20 +86,6 @@ public:
> > > > > > >   }
> > > > > > > };
> > > > > > > 
> > > > > > > -class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > > > > > -{
> > > > > > > -public:
> > > > > > > -    CapabilitiesMessage() : Message() {}
> > > > > > > -    static size_t size()
> > > > > > > -    {
> > > > > > > -        return sizeof(payload_t);
> > > > > > > -    }
> > > > > > > -    void write_message_body(Stream &stream)
> > > > > > > -    {
> > > > > > > -        /* No body for capabilities message */
> > > > > > > -    }
> > > > > > > -};
> > > > > > > -
> > > > > > > class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > > > > > {
> > > > > > > public:
> > > > > > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream &stream)
> > > > > > > 
> > > > > > > }} // namespace spice::streaming_agent
> > > > > > > 
> > > > > > > -static bool quit_requested = false;
> > > > > > > -
> > > > > > > -int Stream::have_something_to_read(int timeout)
> > > > > > > -{
> > > > > > > -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > > > -
> > > > > > > -    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > > > -        syslog(LOG_ERR, "poll FAILED\n");
> > > > > > > -        return -1;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    if (pollfd.revents == POLLIN) {
> > > > > > > -        return 1;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    return 0;
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > > > > > -{
> > > > > > > -    uint8_t msg[256];
> > > > > > > -
> > > > > > > -    if (len >= sizeof(msg)) {
> > > > > > > -        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > > > -    }
> > > > > > > -    int n = read(streamfd, &msg, len);
> > > > > > > -    if (n != (int) len) {
> > > > > > > -        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > > > -    }
> > > > > > > -    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > > > -           is_streaming ? "START" : "STOP");
> > > > > > > -    codecs.clear();
> > > > > > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > > > > > -        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > > > -    }
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > > > > > -{
> > > > > > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > > > -
> > > > > > > -    if (len > sizeof(caps)) {
> > > > > > > -        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > > > -    }
> > > > > > > -    int n = read(streamfd, caps, len);
> > > > > > > -    if (n != (int) len) {
> > > > > > > -        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    // we currently do not support extensions so just reply so
> > > > > > > -    send<CapabilitiesMessage>();
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_error(uint32_t len)
> > > > > > > -{
> > > > > > > -    // TODO read message and use it
> > > > > > > -    throw ProtocolError("got an error message from server");
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::read_command_from_device()
> > > > > > > -{
> > > > > > > -    StreamDevHeader hdr;
> > > > > > > -    int n;
> > > > > > > -
> > > > > > > -    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > -    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > > > -    if (n != sizeof(hdr)) {
> > > > > > > -        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > > > -    }
> > > > > > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > > > -        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    switch (hdr.type) {
> > > > > > > -    case STREAM_TYPE_CAPABILITIES:
> > > > > > > -        return handle_stream_capabilities(hdr.size);
> > > > > > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > > > -        return handle_stream_error(hdr.size);
> > > > > > > -    case STREAM_TYPE_START_STOP:
> > > > > > > -        return handle_stream_start_stop(hdr.size);
> > > > > > > -    }
> > > > > > > -    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > > > -}
> > > > > > > -
> > > > > > > -int Stream::read_command(bool blocking)
> > > > > > > -{
> > > > > > > -    int timeout = blocking?-1:0;
> > > > > > > -    while (!quit_requested) {
> > > > > > > -        if (!have_something_to_read(timeout)) {
> > > > > > > -            if (!blocking) {
> > > > > > > -                return 0;
> > > > > > > -            }
> > > > > > > -            sleep(1);
> > > > > > > -            continue;
> > > > > > > -        }
> > > > > > > -        read_command_from_device();
> > > > > > > -        break;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    return 1;
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > > > -{
> > > > > > > -    size_t written = 0;
> > > > > > > -    while (written < len) {
> > > > > > > -        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > > > -        if (l < 0) {
> > > > > > > -            if (errno == EINTR) {
> > > > > > > -                continue;
> > > > > > > -            }
> > > > > > > -            throw WriteError("write failed", operation, errno).syslog();
> > > > > > > -        }
> > > > > > > -        written += l;
> > > > > > > -    }
> > > > > > > -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > > > -}
> > > > > > > +bool quit_requested = false;
> > > > > > > 
> > > > > > > static void handle_interrupt(int intr)
> > > > > > > {
> > > > > > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..f756097
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/stream.cpp
> > > > > > > @@ -0,0 +1,172 @@
> > > > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +
> > > > > > > +#include "stream.hpp"
> > > > > > > +#include "message.hpp"
> > > > > > > +
> > > > > > > +#include <spice/stream-device.h>
> > > > > > > +
> > > > > > > +#include <spice-streaming-agent/errors.hpp>
> > > > > > > +
> > > > > > > +#include <sys/types.h>
> > > > > > > +#include <sys/stat.h>
> > > > > > > +#include <fcntl.h>
> > > > > > > +#include <poll.h>
> > > > > > > +#include <syslog.h>
> > > > > > > +#include <unistd.h>
> > > > > > > +
> > > > > > > +namespace spice
> > > > > > > +{
> > > > > > > +namespace streaming_agent
> > > > > > > +{
> > > > > > > +
> > > > > > > +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage,
> > > > > > > +                                           STREAM_TYPE_CAPABILITIES>
> > > > > > > +{
> > > > > > > +public:
> > > > > > > +    CapabilitiesMessage() : Message() {}
> > > > > > > +    static size_t size()
> > > > > > > +    {
> > > > > > > +        return sizeof(payload_t);
> > > > > > > +    }
> > > > > > > +    void write_message_body(Stream &stream)
> > > > > > > +    {
> > > > > > > +        /* No body for capabilities message */
> > > > > > > +    }
> > > > > > > +};
> > > > > > 
> > > > > > Not sure I like scattering the messages across source files that happen
> > > > > > to use them, though I suppose you did it because each message (like the
> > > > > > X11Cursor) may require different header files included? Perhaps it is
> > > > > > the way to go…
> > > > > 
> > > > > No, it’s really to de-couple things, a good way to check if encapsulation was correct.
> > > > > 
> > > > > 
> > > > > > 
> > > > > > > +
> > > > > > > +Stream::Stream(const char *name)
> > > > > > > +    : codecs()
> > > > > > > +{
> > > > > > > +    streamfd = open(name, O_RDWR);
> > > > > > > +    if (streamfd < 0) {
> > > > > > > +        throw IOError("failed to open streaming device", errno);
> > > > > > > +    }
> > > > > > > +}
> > > > > > > +
> > > > > > > +Stream::~Stream()
> > > > > > > +{
> > > > > > > +    close(streamfd);
> > > > > > > +}
> > > > > > > +
> > > > > > > +int Stream::have_something_to_read(int timeout)
> > > > > > > +{
> > > > > > > +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > > > +
> > > > > > > +    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > > > +        syslog(LOG_ERR, "poll FAILED\n");
> > > > > > > +        return -1;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    if (pollfd.revents == POLLIN) {
> > > > > > > +        return 1;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    return 0;
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > > > > > +{
> > > > > > > +    uint8_t msg[256];
> > > > > > > +
> > > > > > > +    if (len >= sizeof(msg)) {
> > > > > > > +        throw MessageDataError("message is too long", len, sizeof(msg));
> > > > > > > +    }
> > > > > > > +    int n = read(streamfd, &msg, len);
> > > > > > > +    if (n != (int) len) {
> > > > > > > +        throw MessageDataError("read start/stop command from device failed", n, len, errno);
> > > > > > > +    }
> > > > > > > +    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > > > +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s streaming\n",
> > > > > > > +           is_streaming ? "START" : "STOP");
> > > > > > > +    codecs.clear();
> > > > > > > +    for (int i = 1; i <= msg[0]; ++i) {
> > > > > > > +        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > > > +    }
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > > > > > +{
> > > > > > > +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > > > +
> > > > > > > +    if (len > sizeof(caps)) {
> > > > > > > +        throw MessageDataError("capability message too long", len, sizeof(caps));
> > > > > > > +    }
> > > > > > > +    int n = read(streamfd, caps, len);
> > > > > > > +    if (n != (int) len) {
> > > > > > > +        throw MessageDataError("read capabilities from device failed", n, len, errno);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    // we currently do not support extensions so just reply so
> > > > > > > +    send<CapabilitiesMessage>();
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_error(uint32_t len)
> > > > > > > +{
> > > > > > > +    // TODO read message and use it
> > > > > > > +    throw ProtocolError("got an error message from server");
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::read_command_from_device()
> > > > > > > +{
> > > > > > > +    StreamDevHeader hdr;
> > > > > > > +    int n;
> > > > > > > +
> > > > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > +    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > > > +    if (n != sizeof(hdr)) {
> > > > > > > +        throw MessageDataError("read command from device failed", n, sizeof(hdr), errno);
> > > > > > > +    }
> > > > > > > +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > > > +        throw MessageDataError("bad protocol version", hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    switch (hdr.type) {
> > > > > > > +    case STREAM_TYPE_CAPABILITIES:
> > > > > > > +        return handle_stream_capabilities(hdr.size);
> > > > > > > +    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > > > +        return handle_stream_error(hdr.size);
> > > > > > > +    case STREAM_TYPE_START_STOP:
> > > > > > > +        return handle_stream_start_stop(hdr.size);
> > > > > > > +    }
> > > > > > > +    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > > > +}
> > > > > > > +
> > > > > > > +int Stream::read_command(bool blocking)
> > > > > > > +{
> > > > > > > +    int timeout = blocking?-1:0;
> > > > > > > +    while (!quit_requested) {
> > > > > > > +        if (!have_something_to_read(timeout)) {
> > > > > > > +            if (!blocking) {
> > > > > > > +                return 0;
> > > > > > > +            }
> > > > > > > +            sleep(1);
> > > > > > > +            continue;
> > > > > > > +        }
> > > > > > > +        read_command_from_device();
> > > > > > > +        break;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    return 1;
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::write_all(const char *operation, const void *buf, const size_t len)
> > > > > > > +{
> > > > > > > +    size_t written = 0;
> > > > > > > +    while (written < len) {
> > > > > > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > > > > > > +        if (l < 0) {
> > > > > > > +            if (errno == EINTR) {
> > > > > > > +                continue;
> > > > > > > +            }
> > > > > > > +            throw WriteError("write failed", operation, errno).syslog();
> > > > > > > +        }
> > > > > > > +        written += l;
> > > > > > > +    }
> > > > > > > +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
> > > > > > > +}
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..b689f36
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/stream.hpp
> > > > > > > @@ -0,0 +1,55 @@
> > > > > > > +/* Encapsulation of the stream used to communicate between agent and server
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > > > +
> > > > > > > +#include <spice/enums.h>
> > > > > > > +#include <set>
> > > > > > > +#include <mutex>
> > > > > > > +
> > > > > > > +namespace spice {
> > > > > > > +namespace streaming_agent {
> > > > > > > +
> > > > > > > +class Stream
> > > > > > > +{
> > > > > > > +    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > > > +
> > > > > > > +public:
> > > > > > > +    Stream(const char *name);
> > > > > > > +    ~Stream();
> > > > > > > +
> > > > > > > +    const codecs_t &client_codecs() { return codecs; }
> > > > > > > +    bool streaming_requested() { return is_streaming; }
> > > > > > > +
> > > > > > > +    template <typename Message, typename ...PayloadArgs>
> > > > > > > +    void send(PayloadArgs... payload)
> > > > > > > +    {
> > > > > > > +        Message message(payload...);
> > > > > > > +        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > +        message.write_header(*this);
> > > > > > > +        message.write_message_body(*this, payload...);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    int read_command(bool blocking);
> > > > > > > +    void write_all(const char *operation, const void *buf, const size_t len);
> > > > > > > +
> > > > > > > +private:
> > > > > > > +    int have_something_to_read(int timeout);
> > > > > > > +    void handle_stream_start_stop(uint32_t len);
> > > > > > > +    void handle_stream_capabilities(uint32_t len);
> > > > > > > +    void handle_stream_error(uint32_t len);
> > > > > > > +    void read_command_from_device(void);
> > > > > > > +
> > > > > > > +private:
> > > > > > > +    std::mutex mutex;
> > > > > > > +    codecs_t codecs;
> > > > > > > +    int streamfd = -1;
> > > > > > > +    bool is_streaming = false;
> > > > > > > +};
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > +
> > > > > > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> 
>