[13/22] Convert message writing from C style to C++ style

Submitted by Christophe de Dinechin on Feb. 28, 2018, 3:43 p.m.

Details

Message ID 20180228154325.25791-14-christophe@dinechin.org
State New
Headers show
Series "streaming-agent: C++ refactoring" ( rev: 2 1 ) in Spice

Not browsing as part of any series.

Commit Message

Christophe de Dinechin Feb. 28, 2018, 3:43 p.m.
From: Christophe de Dinechin <dinechin@redhat.com>

- The Stream class now deals with locking and sending messages
- The Message<> template class deals with the general writing mechanisms
- Classes, FormatMessage, FrameMessage, CapabilitiesMessage and
  X11CursorMessage represent individual messages

The various classes should be moved to separate headers in a
subsequent operation

The design uses the "curiously recurring template pattern" (CRTP) to
defer some of the code to a derived class. This is done to avoid
runtime overhead: all the calls are static.

Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
---
 src/spice-streaming-agent.cpp | 279 +++++++++++++++++++-----------------------
 1 file changed, 128 insertions(+), 151 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index 23ee824..8bbd457 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -48,24 +48,6 @@  namespace spice
 namespace streaming_agent
 {
 
-struct FormatMessage
-{
-    StreamDevHeader hdr;
-    StreamMsgFormat msg;
-};
-
-struct DataMessage
-{
-    StreamDevHeader hdr;
-    StreamMsgData msg;
-};
-
-struct CursorMessage
-{
-    StreamDevHeader hdr;
-    StreamMsgCursorSet msg;
-};
-
 class Stream
 {
     typedef std::set<SpiceVideoCodecType> codecs_t;
@@ -87,13 +69,17 @@  public:
     const codecs_t &client_codecs() { return codecs; }
     bool streaming_requested() { return is_streaming; }
 
+    template <typename Message, typename ...PayloadArgs>
+    void send(PayloadArgs... payload)
+    {
+        Message message(payload...);
+        std::lock_guard<std::mutex> stream_guard(mutex);
+        message.write_header(*this);
+        message.write_message_body(*this, payload...);
+    }
+
     int read_command(bool blocking);
-    size_t write_all(const void *buf, const size_t len);
-    int send_format(unsigned w, unsigned h, uint8_t c);
-    int send_frame(const void *buf, const unsigned size);
-    void send_cursor(uint16_t width, uint16_t height,
-                     uint16_t hotspot_x, uint16_t hotspot_y,
-                     std::function<void(uint32_t *)> fill_cursor);
+    void write_all(const char *operation, const void *buf, const size_t len);
 
 private:
     int have_something_to_read(int timeout);
@@ -109,6 +95,117 @@  private:
     bool is_streaming = false;
 };
 
+template <typename Payload, typename Info, unsigned Type>
+class Message
+{
+public:
+    template <typename ...PayloadArgs>
+    Message(PayloadArgs... payload)
+        : hdr(StreamDevHeader {
+              .protocol_version = STREAM_DEVICE_PROTOCOL,
+              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
+              .type = Type,
+              .size = (uint32_t) Info::size(payload...)
+          })
+    { }
+    void write_header(Stream &stream)
+    {
+        stream.write_all("header", &hdr, sizeof(hdr));
+    }
+
+protected:
+    StreamDevHeader hdr;
+    typedef Payload payload_t;
+};
+
+class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
+{
+public:
+    FormatMessage(unsigned w, unsigned h, uint8_t c) : Message(w, h, c) {}
+    static size_t size(unsigned width, unsigned height, uint8_t codec)
+    {
+        return sizeof(payload_t);
+    }
+    void write_message_body(Stream &stream, unsigned w, unsigned h, uint8_t c)
+    {
+        StreamMsgFormat msg = { .width = w, .height = h, .codec = c, .padding1 = {} };
+        stream.write_all("format", &msg, sizeof(msg));
+    }
+};
+
+class FrameMessage : public Message<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
+{
+public:
+    FrameMessage(const void *frame, size_t length) : Message(frame, length) {}
+    static size_t size(const void *frame, size_t length)
+    {
+        return sizeof(payload_t) + length;
+    }
+    void write_message_body(Stream &stream, const void *frame, size_t length)
+    {
+        stream.write_all("frame", frame, length);
+    }
+};
+
+class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
+{
+public:
+    CapabilitiesMessage() : Message() {}
+    static size_t size()
+    {
+        return sizeof(payload_t);
+    }
+    void write_message_body(Stream &stream)
+    {
+        /* No body for capabilities message */
+    }
+};
+
+class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
+{
+public:
+    X11CursorMessage(XFixesCursorImage *cursor): Message(cursor) {}
+    static size_t size(XFixesCursorImage *cursor)
+    {
+        return sizeof(payload_t) + sizeof(uint32_t) * pixel_count(cursor);
+    }
+
+    void write_message_body(Stream &stream, XFixesCursorImage *cursor)
+    {
+        StreamMsgCursorSet msg = {
+            .width = cursor->width,
+            .height = cursor->height,
+            .hot_spot_x = cursor->xhot,
+            .hot_spot_y = cursor->yhot,
+            .type = SPICE_CURSOR_TYPE_ALPHA,
+            .padding1 = { },
+            .data = { }
+        };
+
+        size_t pixcount = pixel_count(cursor);
+        size_t pixsize = pixcount * sizeof(uint32_t);
+        std::unique_ptr<uint32_t[]> pixels(new uint32_t[pixcount]);
+        uint32_t *pixbuf = pixels.get();
+        fill_pixels(cursor, pixcount, pixbuf);
+
+        stream.write_all("cursor message", &msg, sizeof(msg));
+        stream.write_all("cursor pixels", pixbuf, pixsize);
+    }
+
+private:
+    static size_t pixel_count(XFixesCursorImage *cursor)
+    {
+        return cursor->width * cursor->height;
+    }
+
+    void fill_pixels(XFixesCursorImage *cursor, unsigned count, uint32_t *pixbuf)
+    {
+        for (unsigned i = 0; i < count; ++i) {
+            pixbuf[i] = cursor->pixels[i];
+        }
+    }
+};
+
 }} // namespace spice::streaming_agent
 
 static bool quit_requested = false;
@@ -166,15 +263,7 @@  void Stream::handle_stream_capabilities(uint32_t len)
     }
 
     // we currently do not support extensions so just reply so
-    StreamDevHeader hdr = {
-        STREAM_DEVICE_PROTOCOL,
-        0,
-        STREAM_TYPE_CAPABILITIES,
-        0
-    };
-    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
-        throw std::runtime_error("error writing capabilities");
-    }
+    send<CapabilitiesMessage>();
 }
 
 void Stream::handle_stream_error(uint32_t len)
@@ -228,7 +317,7 @@  int Stream::read_command(bool blocking)
     return 1;
 }
 
-size_t Stream::write_all(const void *buf, const size_t len)
+void Stream::write_all(const char *operation, const void *buf, const size_t len)
 {
     size_t written = 0;
     while (written < len) {
@@ -237,73 +326,11 @@  size_t Stream::write_all(const void *buf, const size_t len)
             if (errno == EINTR) {
                 continue;
             }
-            syslog(LOG_ERR, "write failed - %m");
-            return l;
+            throw WriteError("write failed", operation, errno).syslog();
         }
         written += l;
     }
-    syslog(LOG_DEBUG, "write_all -- %u bytes written\n", (unsigned)written);
-    return written;
-}
-
-int Stream::send_format(unsigned w, unsigned h, uint8_t c)
-{
-    const size_t msgsize = sizeof(FormatMessage);
-    const size_t hdrsize  = sizeof(StreamDevHeader);
-    FormatMessage msg = {
-        .hdr = {
-            .protocol_version = STREAM_DEVICE_PROTOCOL,
-            .padding = 0,       // Workaround GCC "not implemented" bug
-            .type = STREAM_TYPE_FORMAT,
-            .size = msgsize - hdrsize
-        },
-        .msg = {
-            .width = w,
-            .height = h,
-            .codec = c,
-            .padding1 = { }
-        }
-    };
-    syslog(LOG_DEBUG, "writing format\n");
-    std::lock_guard<std::mutex> stream_guard(mutex);
-    if (write_all(&msg, msgsize) != msgsize) {
-        return EXIT_FAILURE;
-    }
-    return EXIT_SUCCESS;
-}
-
-int Stream::send_frame(const void *buf, const unsigned size)
-{
-    ssize_t n;
-    const size_t msgsize = sizeof(FormatMessage);
-    DataMessage msg = {
-        .hdr = {
-            .protocol_version = STREAM_DEVICE_PROTOCOL,
-            .padding = 0,       // Workaround GCC "not implemented" bug
-            .type = STREAM_TYPE_DATA,
-            .size = size  /* includes only the body? */
-        },
-        .msg = {}
-    };
-
-    std::lock_guard<std::mutex> stream_guard(mutex);
-    n = write_all(&msg, msgsize);
-    syslog(LOG_DEBUG,
-           "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
-           n, msg.hdr.size);
-    if (n != msgsize) {
-        syslog(LOG_WARNING, "write_all header: wrote %ld expected %lu\n",
-               n, msgsize);
-        return EXIT_FAILURE;
-    }
-    n = write_all(buf, size);
-    syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
-    if (n != size) {
-        syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
-               n, size);
-        return EXIT_FAILURE;
-    }
-    return EXIT_SUCCESS;
+    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
 }
 
 /* returns current time in micro-seconds */
@@ -350,46 +377,6 @@  static void usage(const char *progname)
     exit(1);
 }
 
-void
-Stream::send_cursor(uint16_t width, uint16_t height,
-                    uint16_t hotspot_x, uint16_t hotspot_y,
-                    std::function<void(uint32_t *)> fill_cursor)
-{
-    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
-        return;
-    }
-
-    const uint32_t msgsize = sizeof(CursorMessage) + width * height * sizeof(uint32_t);
-    const uint32_t hdrsize  = sizeof(StreamDevHeader);
-
-    std::unique_ptr<uint8_t[]> storage(new uint8_t[msgsize]);
-
-    CursorMessage *cursor_msg =
-        new(storage.get()) CursorMessage {
-        .hdr = {
-            .protocol_version = STREAM_DEVICE_PROTOCOL,
-            .padding = 0,       // Workaround GCC internal / not implemented compiler error
-            .type = STREAM_TYPE_CURSOR_SET,
-            .size = msgsize - hdrsize
-        },
-        .msg = {
-            .width = width,
-            .height = height,
-            .hot_spot_x = hotspot_x,
-            .hot_spot_y = hotspot_y,
-            .type = SPICE_CURSOR_TYPE_ALPHA,
-            .padding1 = { },
-            .data = { }
-        }
-    };
-
-    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
-    fill_cursor(pixels);
-
-    std::lock_guard<std::mutex> stream_guard(mutex);
-    write_all(storage.get(), msgsize);
-}
-
 static void cursor_changes(Stream *stream, Display *display, int event_base)
 {
     unsigned long last_serial = 0;
@@ -411,12 +398,7 @@  static void cursor_changes(Stream *stream, Display *display, int event_base)
         }
 
         last_serial = cursor->cursor_serial;
-        auto fill_cursor = [cursor](uint32_t *pixels) {
-            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
-                pixels[i] = cursor->pixels[i];
-        };
-        stream->send_cursor(cursor->width, cursor->height,
-                            cursor->xhot, cursor->yhot, fill_cursor);
+        stream->send<X11CursorMessage>(cursor);
     }
 }
 
@@ -471,9 +453,7 @@  do_capture(Stream &stream, const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                if (stream.send_format(width, height, codec) == EXIT_FAILURE) {
-                    throw std::runtime_error("FAILED to send format message");
-                }
+                stream.send<FormatMessage>(width, height, codec);
             }
             if (f_log) {
                 if (log_binary) {
@@ -484,10 +464,7 @@  do_capture(Stream &stream, const char *streamport, FILE *f_log)
                     hexdump(frame.buffer, frame.buffer_size, f_log);
                 }
             }
-            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
-                syslog(LOG_ERR, "FAILED to send a frame\n");
-                break;
-            }
+            stream.send<FrameMessage>(frame.buffer, frame.buffer_size);
             //usleep(1);
             if (stream.read_command(false) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");

Comments

On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> From: Christophe de Dinechin <dinechin@redhat.com>
> 
> - The Stream class now deals with locking and sending messages
> - The Message<> template class deals with the general writing mechanisms
> - Classes, FormatMessage, FrameMessage, CapabilitiesMessage and
>   X11CursorMessage represent individual messages
> 
> The various classes should be moved to separate headers in a
> subsequent operation
> 
> The design uses the "curiously recurring template pattern" (CRTP) to
> defer some of the code to a derived class. This is done to avoid
> runtime overhead: all the calls are static.
> 
> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 279 +++++++++++++++++++-----------------------
>  1 file changed, 128 insertions(+), 151 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 23ee824..8bbd457 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -48,24 +48,6 @@ namespace spice
>  namespace streaming_agent
>  {
>  
> -struct FormatMessage
> -{
> -    StreamDevHeader hdr;
> -    StreamMsgFormat msg;
> -};
> -
> -struct DataMessage
> -{
> -    StreamDevHeader hdr;
> -    StreamMsgData msg;
> -};
> -
> -struct CursorMessage
> -{
> -    StreamDevHeader hdr;
> -    StreamMsgCursorSet msg;
> -};
> -
>  class Stream
>  {
>      typedef std::set<SpiceVideoCodecType> codecs_t;
> @@ -87,13 +69,17 @@ public:
>      const codecs_t &client_codecs() { return codecs; }
>      bool streaming_requested() { return is_streaming; }
>  
> +    template <typename Message, typename ...PayloadArgs>
> +    void send(PayloadArgs... payload)

I'd use perfect forwarding here.

> +    {
> +        Message message(payload...);
> +        std::lock_guard<std::mutex> stream_guard(mutex);
> +        message.write_header(*this);
> +        message.write_message_body(*this, payload...);
> +    }
> +
>      int read_command(bool blocking);
> -    size_t write_all(const void *buf, const size_t len);
> -    int send_format(unsigned w, unsigned h, uint8_t c);
> -    int send_frame(const void *buf, const unsigned size);
> -    void send_cursor(uint16_t width, uint16_t height,
> -                     uint16_t hotspot_x, uint16_t hotspot_y,
> -                     std::function<void(uint32_t *)> fill_cursor);
> +    void write_all(const char *operation, const void *buf, const size_t len);
>  
>  private:
>      int have_something_to_read(int timeout);
> @@ -109,6 +95,117 @@ private:
>      bool is_streaming = false;
>  };
>  
> +template <typename Payload, typename Info, unsigned Type>
> +class Message
> +{
> +public:
> +    template <typename ...PayloadArgs>
> +    Message(PayloadArgs... payload)

Also perfect forwarding. And I'd still rather see "PayloadArgs...
payload_args" here :) (and to be consistent, in Stream::send(...)
above, too)

> +        : hdr(StreamDevHeader {
> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
> +              .type = Type,
> +              .size = (uint32_t) Info::size(payload...)
> +          })
> +    { }
> +    void write_header(Stream &stream)
> +    {
> +        stream.write_all("header", &hdr, sizeof(hdr));
> +    }
> +
> +protected:
> +    StreamDevHeader hdr;
> +    typedef Payload payload_t;
> +};
> +
> +class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
> +{
> +public:
> +    FormatMessage(unsigned w, unsigned h, uint8_t c) : Message(w, h, c) {}
> +    static size_t size(unsigned width, unsigned height, uint8_t codec)
> +    {
> +        return sizeof(payload_t);
> +    }
> +    void write_message_body(Stream &stream, unsigned w, unsigned h, uint8_t c)
> +    {
> +        StreamMsgFormat msg = { .width = w, .height = h, .codec = c, .padding1 = {} };
> +        stream.write_all("format", &msg, sizeof(msg));
> +    }
> +};
> +
> +class FrameMessage : public Message<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
> +{
> +public:
> +    FrameMessage(const void *frame, size_t length) : Message(frame, length) {}
> +    static size_t size(const void *frame, size_t length)
> +    {
> +        return sizeof(payload_t) + length;
> +    }
> +    void write_message_body(Stream &stream, const void *frame, size_t length)
> +    {
> +        stream.write_all("frame", frame, length);
> +    }
> +};
> +
> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> +{
> +public:
> +    CapabilitiesMessage() : Message() {}
> +    static size_t size()
> +    {
> +        return sizeof(payload_t);
> +    }
> +    void write_message_body(Stream &stream)
> +    {
> +        /* No body for capabilities message */
> +    }
> +};
> +
> +class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> +{
> +public:
> +    X11CursorMessage(XFixesCursorImage *cursor): Message(cursor) {}
> +    static size_t size(XFixesCursorImage *cursor)
> +    {
> +        return sizeof(payload_t) + sizeof(uint32_t) * pixel_count(cursor);
> +    }
> +
> +    void write_message_body(Stream &stream, XFixesCursorImage *cursor)
> +    {
> +        StreamMsgCursorSet msg = {
> +            .width = cursor->width,
> +            .height = cursor->height,
> +            .hot_spot_x = cursor->xhot,
> +            .hot_spot_y = cursor->yhot,
> +            .type = SPICE_CURSOR_TYPE_ALPHA,
> +            .padding1 = { },
> +            .data = { }
> +        };
> +
> +        size_t pixcount = pixel_count(cursor);
> +        size_t pixsize = pixcount * sizeof(uint32_t);
> +        std::unique_ptr<uint32_t[]> pixels(new uint32_t[pixcount]);
> +        uint32_t *pixbuf = pixels.get();
> +        fill_pixels(cursor, pixcount, pixbuf);
> +
> +        stream.write_all("cursor message", &msg, sizeof(msg));
> +        stream.write_all("cursor pixels", pixbuf, pixsize);
> +    }
> +
> +private:
> +    static size_t pixel_count(XFixesCursorImage *cursor)
> +    {
> +        return cursor->width * cursor->height;
> +    }
> +
> +    void fill_pixels(XFixesCursorImage *cursor, unsigned count, uint32_t *pixbuf)
> +    {
> +        for (unsigned i = 0; i < count; ++i) {
> +            pixbuf[i] = cursor->pixels[i];
> +        }
> +    }

Both methods above const. fill_pixels() used only once and very short,
perhaps drop the method and use the code directly?

> +};

I like how the message classes came out nice and terse :)

> +
>  }} // namespace spice::streaming_agent
>  
>  static bool quit_requested = false;
> @@ -166,15 +263,7 @@ void Stream::handle_stream_capabilities(uint32_t len)
>      }
>  
>      // we currently do not support extensions so just reply so
> -    StreamDevHeader hdr = {
> -        STREAM_DEVICE_PROTOCOL,
> -        0,
> -        STREAM_TYPE_CAPABILITIES,
> -        0
> -    };
> -    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
> -        throw std::runtime_error("error writing capabilities");
> -    }
> +    send<CapabilitiesMessage>();
>  }
>  
>  void Stream::handle_stream_error(uint32_t len)
> @@ -228,7 +317,7 @@ int Stream::read_command(bool blocking)
>      return 1;
>  }
>  
> -size_t Stream::write_all(const void *buf, const size_t len)
> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>  {
>      size_t written = 0;
>      while (written < len) {
> @@ -237,73 +326,11 @@ size_t Stream::write_all(const void *buf, const size_t len)
>              if (errno == EINTR) {
>                  continue;
>              }
> -            syslog(LOG_ERR, "write failed - %m");
> -            return l;
> +            throw WriteError("write failed", operation, errno).syslog();
>          }
>          written += l;
>      }
> -    syslog(LOG_DEBUG, "write_all -- %u bytes written\n", (unsigned)written);
> -    return written;
> -}
> -
> -int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> -{
> -    const size_t msgsize = sizeof(FormatMessage);
> -    const size_t hdrsize  = sizeof(StreamDevHeader);
> -    FormatMessage msg = {
> -        .hdr = {
> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
> -            .padding = 0,       // Workaround GCC "not implemented" bug
> -            .type = STREAM_TYPE_FORMAT,
> -            .size = msgsize - hdrsize
> -        },
> -        .msg = {
> -            .width = w,
> -            .height = h,
> -            .codec = c,
> -            .padding1 = { }
> -        }
> -    };
> -    syslog(LOG_DEBUG, "writing format\n");
> -    std::lock_guard<std::mutex> stream_guard(mutex);
> -    if (write_all(&msg, msgsize) != msgsize) {
> -        return EXIT_FAILURE;
> -    }
> -    return EXIT_SUCCESS;
> -}
> -
> -int Stream::send_frame(const void *buf, const unsigned size)
> -{
> -    ssize_t n;
> -    const size_t msgsize = sizeof(FormatMessage);
> -    DataMessage msg = {
> -        .hdr = {
> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
> -            .padding = 0,       // Workaround GCC "not implemented" bug
> -            .type = STREAM_TYPE_DATA,
> -            .size = size  /* includes only the body? */
> -        },
> -        .msg = {}
> -    };
> -
> -    std::lock_guard<std::mutex> stream_guard(mutex);
> -    n = write_all(&msg, msgsize);
> -    syslog(LOG_DEBUG,
> -           "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
> -           n, msg.hdr.size);
> -    if (n != msgsize) {
> -        syslog(LOG_WARNING, "write_all header: wrote %ld expected %lu\n",
> -               n, msgsize);
> -        return EXIT_FAILURE;
> -    }
> -    n = write_all(buf, size);
> -    syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> -    if (n != size) {
> -        syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> -               n, size);
> -        return EXIT_FAILURE;
> -    }
> -    return EXIT_SUCCESS;
> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>  }
>  
>  /* returns current time in micro-seconds */
> @@ -350,46 +377,6 @@ static void usage(const char *progname)
>      exit(1);
>  }
>  
> -void
> -Stream::send_cursor(uint16_t width, uint16_t height,
> -                    uint16_t hotspot_x, uint16_t hotspot_y,
> -                    std::function<void(uint32_t *)> fill_cursor)
> -{
> -    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
> -        return;
> -    }
> -
> -    const uint32_t msgsize = sizeof(CursorMessage) + width * height * sizeof(uint32_t);
> -    const uint32_t hdrsize  = sizeof(StreamDevHeader);
> -
> -    std::unique_ptr<uint8_t[]> storage(new uint8_t[msgsize]);
> -
> -    CursorMessage *cursor_msg =
> -        new(storage.get()) CursorMessage {
> -        .hdr = {
> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
> -            .padding = 0,       // Workaround GCC internal / not implemented compiler error
> -            .type = STREAM_TYPE_CURSOR_SET,
> -            .size = msgsize - hdrsize
> -        },
> -        .msg = {
> -            .width = width,
> -            .height = height,
> -            .hot_spot_x = hotspot_x,
> -            .hot_spot_y = hotspot_y,
> -            .type = SPICE_CURSOR_TYPE_ALPHA,
> -            .padding1 = { },
> -            .data = { }
> -        }
> -    };
> -
> -    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
> -    fill_cursor(pixels);
> -
> -    std::lock_guard<std::mutex> stream_guard(mutex);
> -    write_all(storage.get(), msgsize);
> -}
> -
>  static void cursor_changes(Stream *stream, Display *display, int event_base)
>  {
>      unsigned long last_serial = 0;
> @@ -411,12 +398,7 @@ static void cursor_changes(Stream *stream, Display *display, int event_base)
>          }
>  
>          last_serial = cursor->cursor_serial;
> -        auto fill_cursor = [cursor](uint32_t *pixels) {
> -            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> -                pixels[i] = cursor->pixels[i];
> -        };
> -        stream->send_cursor(cursor->width, cursor->height,
> -                            cursor->xhot, cursor->yhot, fill_cursor);
> +        stream->send<X11CursorMessage>(cursor);
>      }
>  }
>  
> @@ -471,9 +453,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log)
>  
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
>  
> -                if (stream.send_format(width, height, codec) == EXIT_FAILURE) {
> -                    throw std::runtime_error("FAILED to send format message");
> -                }
> +                stream.send<FormatMessage>(width, height, codec);
>              }
>              if (f_log) {
>                  if (log_binary) {
> @@ -484,10 +464,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log)
>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>                  }
>              }
> -            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> -                syslog(LOG_ERR, "FAILED to send a frame\n");
> -                break;
> -            }
> +            stream.send<FrameMessage>(frame.buffer, frame.buffer_size);
>              //usleep(1);
>              if (stream.read_command(false) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
> On 1 Mar 2018, at 14:45, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> 
> On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin@redhat.com>
>> 
>> - The Stream class now deals with locking and sending messages
>> - The Message<> template class deals with the general writing mechanisms
>> - Classes, FormatMessage, FrameMessage, CapabilitiesMessage and
>>  X11CursorMessage represent individual messages
>> 
>> The various classes should be moved to separate headers in a
>> subsequent operation
>> 
>> The design uses the "curiously recurring template pattern" (CRTP) to
>> defer some of the code to a derived class. This is done to avoid
>> runtime overhead: all the calls are static.
>> 
>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>> ---
>> src/spice-streaming-agent.cpp | 279 +++++++++++++++++++-----------------------
>> 1 file changed, 128 insertions(+), 151 deletions(-)
>> 
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index 23ee824..8bbd457 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -48,24 +48,6 @@ namespace spice
>> namespace streaming_agent
>> {
>> 
>> -struct FormatMessage
>> -{
>> -    StreamDevHeader hdr;
>> -    StreamMsgFormat msg;
>> -};
>> -
>> -struct DataMessage
>> -{
>> -    StreamDevHeader hdr;
>> -    StreamMsgData msg;
>> -};
>> -
>> -struct CursorMessage
>> -{
>> -    StreamDevHeader hdr;
>> -    StreamMsgCursorSet msg;
>> -};
>> -
>> class Stream
>> {
>>     typedef std::set<SpiceVideoCodecType> codecs_t;
>> @@ -87,13 +69,17 @@ public:
>>     const codecs_t &client_codecs() { return codecs; }
>>     bool streaming_requested() { return is_streaming; }
>> 
>> +    template <typename Message, typename ...PayloadArgs>
>> +    void send(PayloadArgs... payload)
> 
> I'd use perfect forwarding here.

Useless with the datatypes we have (and will have, this is a C-based protocol).
Also introduces some subtle complexity (e.g. default arguments) with variadic templates.

> 
>> +    {
>> +        Message message(payload...);
>> +        std::lock_guard<std::mutex> stream_guard(mutex);
>> +        message.write_header(*this);
>> +        message.write_message_body(*this, payload...);
>> +    }
>> +
>>     int read_command(bool blocking);
>> -    size_t write_all(const void *buf, const size_t len);
>> -    int send_format(unsigned w, unsigned h, uint8_t c);
>> -    int send_frame(const void *buf, const unsigned size);
>> -    void send_cursor(uint16_t width, uint16_t height,
>> -                     uint16_t hotspot_x, uint16_t hotspot_y,
>> -                     std::function<void(uint32_t *)> fill_cursor);
>> +    void write_all(const char *operation, const void *buf, const size_t len);
>> 
>> private:
>>     int have_something_to_read(int timeout);
>> @@ -109,6 +95,117 @@ private:
>>     bool is_streaming = false;
>> };
>> 
>> +template <typename Payload, typename Info, unsigned Type>
>> +class Message
>> +{
>> +public:
>> +    template <typename ...PayloadArgs>
>> +    Message(PayloadArgs... payload)
> 
> Also perfect forwarding. And I'd still rather see "PayloadArgs… payload_args" here :) (and to be consistent, in Stream::send(…) above, too)

Perfect forwarding: no, see above.
payload_args: yes, missed that one.

> 
>> +        : hdr(StreamDevHeader {
>> +              .protocol_version = STREAM_DEVICE_PROTOCOL,
>> +              .padding = 0,     // Workaround GCC bug "sorry: not implemented"
>> +              .type = Type,
>> +              .size = (uint32_t) Info::size(payload...)
>> +          })
>> +    { }
>> +    void write_header(Stream &stream)
>> +    {
>> +        stream.write_all("header", &hdr, sizeof(hdr));
>> +    }
>> +
>> +protected:
>> +    StreamDevHeader hdr;
>> +    typedef Payload payload_t;
>> +};
>> +
>> +class FormatMessage : public Message<StreamMsgFormat, FormatMessage, STREAM_TYPE_FORMAT>
>> +{
>> +public:
>> +    FormatMessage(unsigned w, unsigned h, uint8_t c) : Message(w, h, c) {}
>> +    static size_t size(unsigned width, unsigned height, uint8_t codec)
>> +    {
>> +        return sizeof(payload_t);
>> +    }
>> +    void write_message_body(Stream &stream, unsigned w, unsigned h, uint8_t c)
>> +    {
>> +        StreamMsgFormat msg = { .width = w, .height = h, .codec = c, .padding1 = {} };
>> +        stream.write_all("format", &msg, sizeof(msg));
>> +    }
>> +};
>> +
>> +class FrameMessage : public Message<StreamMsgData, FrameMessage, STREAM_TYPE_DATA>
>> +{
>> +public:
>> +    FrameMessage(const void *frame, size_t length) : Message(frame, length) {}
>> +    static size_t size(const void *frame, size_t length)
>> +    {
>> +        return sizeof(payload_t) + length;
>> +    }
>> +    void write_message_body(Stream &stream, const void *frame, size_t length)
>> +    {
>> +        stream.write_all("frame", frame, length);
>> +    }
>> +};
>> +
>> +class CapabilitiesMessage : public Message<StreamMsgData, CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
>> +{
>> +public:
>> +    CapabilitiesMessage() : Message() {}
>> +    static size_t size()
>> +    {
>> +        return sizeof(payload_t);
>> +    }
>> +    void write_message_body(Stream &stream)
>> +    {
>> +        /* No body for capabilities message */
>> +    }
>> +};
>> +
>> +class X11CursorMessage : public Message<StreamMsgCursorSet, X11CursorMessage, STREAM_TYPE_CURSOR_SET>
>> +{
>> +public:
>> +    X11CursorMessage(XFixesCursorImage *cursor): Message(cursor) {}
>> +    static size_t size(XFixesCursorImage *cursor)
>> +    {
>> +        return sizeof(payload_t) + sizeof(uint32_t) * pixel_count(cursor);
>> +    }
>> +
>> +    void write_message_body(Stream &stream, XFixesCursorImage *cursor)
>> +    {
>> +        StreamMsgCursorSet msg = {
>> +            .width = cursor->width,
>> +            .height = cursor->height,
>> +            .hot_spot_x = cursor->xhot,
>> +            .hot_spot_y = cursor->yhot,
>> +            .type = SPICE_CURSOR_TYPE_ALPHA,
>> +            .padding1 = { },
>> +            .data = { }
>> +        };
>> +
>> +        size_t pixcount = pixel_count(cursor);
>> +        size_t pixsize = pixcount * sizeof(uint32_t);
>> +        std::unique_ptr<uint32_t[]> pixels(new uint32_t[pixcount]);
>> +        uint32_t *pixbuf = pixels.get();
>> +        fill_pixels(cursor, pixcount, pixbuf);
>> +
>> +        stream.write_all("cursor message", &msg, sizeof(msg));
>> +        stream.write_all("cursor pixels", pixbuf, pixsize);
>> +    }
>> +
>> +private:
>> +    static size_t pixel_count(XFixesCursorImage *cursor)
>> +    {
>> +        return cursor->width * cursor->height;
>> +    }
>> +
>> +    void fill_pixels(XFixesCursorImage *cursor, unsigned count, uint32_t *pixbuf)
>> +    {
>> +        for (unsigned i = 0; i < count; ++i) {
>> +            pixbuf[i] = cursor->pixels[i];
>> +        }
>> +    }
> 
> Both methods above const. fill_pixels() used only once and very short,
> perhaps drop the method and use the code directly?

const: no, but both methods should be static.

drop method: no, but based on IRC discussion a couple of days ago, need to add a comment to explain why the loop is necessary.

> 
>> +};
> 
> I like how the message classes came out nice and terse :)

Yes, it’s better now.

> 
>> +
>> }} // namespace spice::streaming_agent
>> 
>> static bool quit_requested = false;
>> @@ -166,15 +263,7 @@ void Stream::handle_stream_capabilities(uint32_t len)
>>     }
>> 
>>     // we currently do not support extensions so just reply so
>> -    StreamDevHeader hdr = {
>> -        STREAM_DEVICE_PROTOCOL,
>> -        0,
>> -        STREAM_TYPE_CAPABILITIES,
>> -        0
>> -    };
>> -    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
>> -        throw std::runtime_error("error writing capabilities");
>> -    }
>> +    send<CapabilitiesMessage>();
>> }
>> 
>> void Stream::handle_stream_error(uint32_t len)
>> @@ -228,7 +317,7 @@ int Stream::read_command(bool blocking)
>>     return 1;
>> }
>> 
>> -size_t Stream::write_all(const void *buf, const size_t len)
>> +void Stream::write_all(const char *operation, const void *buf, const size_t len)
>> {
>>     size_t written = 0;
>>     while (written < len) {
>> @@ -237,73 +326,11 @@ size_t Stream::write_all(const void *buf, const size_t len)
>>             if (errno == EINTR) {
>>                 continue;
>>             }
>> -            syslog(LOG_ERR, "write failed - %m");
>> -            return l;
>> +            throw WriteError("write failed", operation, errno).syslog();
>>         }
>>         written += l;
>>     }
>> -    syslog(LOG_DEBUG, "write_all -- %u bytes written\n", (unsigned)written);
>> -    return written;
>> -}
>> -
>> -int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>> -{
>> -    const size_t msgsize = sizeof(FormatMessage);
>> -    const size_t hdrsize  = sizeof(StreamDevHeader);
>> -    FormatMessage msg = {
>> -        .hdr = {
>> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
>> -            .padding = 0,       // Workaround GCC "not implemented" bug
>> -            .type = STREAM_TYPE_FORMAT,
>> -            .size = msgsize - hdrsize
>> -        },
>> -        .msg = {
>> -            .width = w,
>> -            .height = h,
>> -            .codec = c,
>> -            .padding1 = { }
>> -        }
>> -    };
>> -    syslog(LOG_DEBUG, "writing format\n");
>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>> -    if (write_all(&msg, msgsize) != msgsize) {
>> -        return EXIT_FAILURE;
>> -    }
>> -    return EXIT_SUCCESS;
>> -}
>> -
>> -int Stream::send_frame(const void *buf, const unsigned size)
>> -{
>> -    ssize_t n;
>> -    const size_t msgsize = sizeof(FormatMessage);
>> -    DataMessage msg = {
>> -        .hdr = {
>> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
>> -            .padding = 0,       // Workaround GCC "not implemented" bug
>> -            .type = STREAM_TYPE_DATA,
>> -            .size = size  /* includes only the body? */
>> -        },
>> -        .msg = {}
>> -    };
>> -
>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>> -    n = write_all(&msg, msgsize);
>> -    syslog(LOG_DEBUG,
>> -           "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
>> -           n, msg.hdr.size);
>> -    if (n != msgsize) {
>> -        syslog(LOG_WARNING, "write_all header: wrote %ld expected %lu\n",
>> -               n, msgsize);
>> -        return EXIT_FAILURE;
>> -    }
>> -    n = write_all(buf, size);
>> -    syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>> -    if (n != size) {
>> -        syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
>> -               n, size);
>> -        return EXIT_FAILURE;
>> -    }
>> -    return EXIT_SUCCESS;
>> +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", written);
>> }
>> 
>> /* returns current time in micro-seconds */
>> @@ -350,46 +377,6 @@ static void usage(const char *progname)
>>     exit(1);
>> }
>> 
>> -void
>> -Stream::send_cursor(uint16_t width, uint16_t height,
>> -                    uint16_t hotspot_x, uint16_t hotspot_y,
>> -                    std::function<void(uint32_t *)> fill_cursor)
>> -{
>> -    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
>> -        return;
>> -    }
>> -
>> -    const uint32_t msgsize = sizeof(CursorMessage) + width * height * sizeof(uint32_t);
>> -    const uint32_t hdrsize  = sizeof(StreamDevHeader);
>> -
>> -    std::unique_ptr<uint8_t[]> storage(new uint8_t[msgsize]);
>> -
>> -    CursorMessage *cursor_msg =
>> -        new(storage.get()) CursorMessage {
>> -        .hdr = {
>> -            .protocol_version = STREAM_DEVICE_PROTOCOL,
>> -            .padding = 0,       // Workaround GCC internal / not implemented compiler error
>> -            .type = STREAM_TYPE_CURSOR_SET,
>> -            .size = msgsize - hdrsize
>> -        },
>> -        .msg = {
>> -            .width = width,
>> -            .height = height,
>> -            .hot_spot_x = hotspot_x,
>> -            .hot_spot_y = hotspot_y,
>> -            .type = SPICE_CURSOR_TYPE_ALPHA,
>> -            .padding1 = { },
>> -            .data = { }
>> -        }
>> -    };
>> -
>> -    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>> -    fill_cursor(pixels);
>> -
>> -    std::lock_guard<std::mutex> stream_guard(mutex);
>> -    write_all(storage.get(), msgsize);
>> -}
>> -
>> static void cursor_changes(Stream *stream, Display *display, int event_base)
>> {
>>     unsigned long last_serial = 0;
>> @@ -411,12 +398,7 @@ static void cursor_changes(Stream *stream, Display *display, int event_base)
>>         }
>> 
>>         last_serial = cursor->cursor_serial;
>> -        auto fill_cursor = [cursor](uint32_t *pixels) {
>> -            for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>> -                pixels[i] = cursor->pixels[i];
>> -        };
>> -        stream->send_cursor(cursor->width, cursor->height,
>> -                            cursor->xhot, cursor->yhot, fill_cursor);
>> +        stream->send<X11CursorMessage>(cursor);
>>     }
>> }
>> 
>> @@ -471,9 +453,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log)
>> 
>>                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
>> 
>> -                if (stream.send_format(width, height, codec) == EXIT_FAILURE) {
>> -                    throw std::runtime_error("FAILED to send format message");
>> -                }
>> +                stream.send<FormatMessage>(width, height, codec);
>>             }
>>             if (f_log) {
>>                 if (log_binary) {
>> @@ -484,10 +464,7 @@ do_capture(Stream &stream, const char *streamport, FILE *f_log)
>>                     hexdump(frame.buffer, frame.buffer_size, f_log);
>>                 }
>>             }
>> -            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>> -                syslog(LOG_ERR, "FAILED to send a frame\n");
>> -                break;
>> -            }
>> +            stream.send<FrameMessage>(frame.buffer, frame.buffer_size);
>>             //usleep(1);
>>             if (stream.read_command(false) < 0) {
>>                 syslog(LOG_ERR, "FAILED to read command\n");