[11/17] Move read, write and locking into the 'Stream' class

Submitted by Christophe de Dinechin on Feb. 16, 2018, 4:15 p.m.

Details

Message ID 20180216161547.28110-12-christophe@dinechin.org
State New
Headers show
Series "WIP: Refactor the streaming agent towards a more standard C++ style" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Christophe de Dinechin Feb. 16, 2018, 4:15 p.m.
From: Christophe de Dinechin <dinechin@redhat.com>

Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
---
 src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
 1 file changed, 47 insertions(+), 39 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index f0d79ae..a989ee7 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -71,18 +71,30 @@  class Stream
 public:
     Stream(const char *name)
     {
-        fd = open(name, O_RDWR);
-        if (fd < 0)
+        streamfd = open(name, O_RDWR);
+        if (streamfd < 0)
             throw std::runtime_error("failed to open streaming device");
     }
     ~Stream()
     {
-        close(fd);
+        close(streamfd);
     }
-    operator int() { return fd; }
+    operator int() { return streamfd; }
+
+    int have_something_to_read(int timeout);
+    int read_command_from_device(void);
+    int read_command(bool blocking);
+
+    size_t write_all(const void *buf, const size_t len);
+    int send_format(unsigned w, unsigned h, uint8_t c);
+    int send_frame(const void *buf, const unsigned size);
+    void send_cursor(uint16_t width, uint16_t height,
+                     uint16_t hotspot_x, uint16_t hotspot_y,
+                     std::function<void(uint32_t *)> fill_cursor);
 
 private:
-    int fd = -1;
+    int streamfd = -1;
+    std::mutex mutex;
 };
 
 }} // namespace spice::streaming_agent
@@ -92,9 +104,8 @@  static bool streaming_requested = false;
 static bool quit_requested = false;
 static bool log_binary = false;
 static std::set<SpiceVideoCodecType> client_codecs;
-static std::mutex stream_mtx;
 
-static int have_something_to_read(int streamfd, int timeout)
+int Stream::have_something_to_read(int timeout)
 {
     struct pollfd pollfd = {streamfd, POLLIN, 0};
 
@@ -110,13 +121,13 @@  static int have_something_to_read(int streamfd, int timeout)
     return 0;
 }
 
-static int read_command_from_device(int streamfd)
+int Stream::read_command_from_device()
 {
     StreamDevHeader hdr;
     uint8_t msg[64];
     int n;
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
+    std::lock_guard<std::mutex> stream_guard(mutex);
     n = read(streamfd, &hdr, sizeof(hdr));
     if (n != sizeof(hdr)) {
         syslog(LOG_WARNING,
@@ -155,29 +166,28 @@  static int read_command_from_device(int streamfd)
     return 1;
 }
 
-static int read_command(int streamfd, bool blocking)
+int Stream::read_command(bool blocking)
 {
     int timeout = blocking?-1:0;
     while (!quit_requested) {
-        if (!have_something_to_read(streamfd, timeout)) {
+        if (!have_something_to_read(timeout)) {
             if (!blocking) {
                 return 0;
             }
             sleep(1);
             continue;
         }
-        return read_command_from_device(streamfd);
+        return read_command_from_device();
     }
 
     return 1;
 }
 
-static size_t
-write_all(int fd, const void *buf, const size_t len)
+size_t Stream::write_all(const void *buf, const size_t len)
 {
     size_t written = 0;
     while (written < len) {
-        int l = write(fd, (const char *) buf + written, len - written);
+        int l = write(streamfd, (const char *) buf + written, len - written);
         if (l < 0) {
             if (errno == EINTR) {
                 continue;
@@ -191,7 +201,7 @@  write_all(int fd, const void *buf, const size_t len)
     return written;
 }
 
-static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
+int Stream::send_format(unsigned w, unsigned h, uint8_t c)
 {
     const size_t msgsize = sizeof(FormatMessage);
     const size_t hdrsize  = sizeof(StreamDevHeader);
@@ -210,14 +220,14 @@  static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
         }
     };
     syslog(LOG_DEBUG, "writing format\n");
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    if (write_all(streamfd, &msg, msgsize) != msgsize) {
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    if (write_all(&msg, msgsize) != msgsize) {
         return EXIT_FAILURE;
     }
     return EXIT_SUCCESS;
 }
 
-static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
+int Stream::send_frame(const void *buf, const unsigned size)
 {
     ssize_t n;
     const size_t msgsize = sizeof(FormatMessage);
@@ -231,8 +241,8 @@  static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
         .msg = {}
     };
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    n = write_all(streamfd, &msg, msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    n = write_all(&msg, msgsize);
     syslog(LOG_DEBUG,
            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
            n, msg.hdr.size);
@@ -241,7 +251,7 @@  static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
                n, msgsize);
         return EXIT_FAILURE;
     }
-    n = write_all(streamfd, buf, size);
+    n = write_all(buf, size);
     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
     if (n != size) {
         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
@@ -294,11 +304,10 @@  static void usage(const char *progname)
     exit(1);
 }
 
-static void
-send_cursor(int streamfd,
-            uint16_t width, uint16_t height,
-            uint16_t hotspot_x, uint16_t hotspot_y,
-            std::function<void(uint32_t *)> fill_cursor)
+void
+Stream::send_cursor(uint16_t width, uint16_t height,
+                    uint16_t hotspot_x, uint16_t hotspot_y,
+                    std::function<void(uint32_t *)> fill_cursor)
 {
     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
@@ -332,11 +341,11 @@  send_cursor(int streamfd,
     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
     fill_cursor(pixels);
 
-    std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, storage.get(), cursor_msgsize);
+    std::lock_guard<std::mutex> stream_guard(mutex);
+    write_all(storage.get(), cursor_msgsize);
 }
 
-static void cursor_changes(int streamfd, Display *display, int event_base)
+static void cursor_changes(Stream *stream, Display *display, int event_base)
 {
     unsigned long last_serial = 0;
 
@@ -358,18 +367,18 @@  static void cursor_changes(int streamfd, Display *display, int event_base)
             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
                 pixels[i] = cursor->pixels[i];
         };
-        send_cursor(streamfd,
-                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
+        stream->send_cursor(cursor->width, cursor->height,
+                            cursor->xhot, cursor->yhot, fill_cursor);
     }
 }
 
 static void
-do_capture(int streamfd, const char *streamport, FILE *f_log)
+do_capture(Stream &stream, const char *streamport, FILE *f_log)
 {
     unsigned int frame_count = 0;
     while (!quit_requested) {
         while (!quit_requested && !streaming_requested) {
-            if (read_command(streamfd, true) < 0) {
+            if (stream.read_command(true) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -413,7 +422,7 @@  do_capture(int streamfd, const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
+                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
                     throw std::runtime_error("FAILED to send format message");
             }
             if (f_log) {
@@ -425,13 +434,12 @@  do_capture(int streamfd, const char *streamport, FILE *f_log)
                     hexdump(frame.buffer, frame.buffer_size, f_log);
                 }
             }
-            if (spice_stream_send_frame(streamfd,
-                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
+            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
                 syslog(LOG_ERR, "FAILED to send a frame\n");
                 break;
             }
             //usleep(1);
-            if (read_command(streamfd, false) < 0) {
+            if (stream.read_command(false) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -520,7 +528,7 @@  int main(int argc, char* argv[])
     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
 
     Stream streamfd(streamport);
-    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
+    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
     cursor_th.detach();
 
     int ret = EXIT_SUCCESS;

Comments

On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> From: Christophe de Dinechin <dinechin@redhat.com>
> 
> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
>  1 file changed, 47 insertions(+), 39 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index f0d79ae..a989ee7 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -71,18 +71,30 @@ class Stream
>  public:
>      Stream(const char *name)

I would like to name the class something more descriptive for what it
is becoming. Class named Stream in a file named "stream.{cpp,hpp}"
could be almost anything. My best name is StreamDispatcher so far, not
entirely happy about it :)

>      {
> -        fd = open(name, O_RDWR);
> -        if (fd < 0)
> +        streamfd = open(name, O_RDWR);
> +        if (streamfd < 0)
>              throw std::runtime_error("failed to open streaming device");
>      }
>      ~Stream()
>      {
> -        close(fd);
> +        close(streamfd);
>      }
> -    operator int() { return fd; }
> +    operator int() { return streamfd; }
> +
> +    int have_something_to_read(int timeout);
> +    int read_command_from_device(void);
> +    int read_command(bool blocking);
> +
> +    size_t write_all(const void *buf, const size_t len);

This method could also use a better name. write_bytes()?
write_buffer()?

> +    int send_format(unsigned w, unsigned h, uint8_t c);
> +    int send_frame(const void *buf, const unsigned size);
> +    void send_cursor(uint16_t width, uint16_t height,
> +                     uint16_t hotspot_x, uint16_t hotspot_y,
> +                     std::function<void(uint32_t *)> fill_cursor);
>  
>  private:
> -    int fd = -1;
> +    int streamfd = -1;
> +    std::mutex mutex;
>  };
>  
>  }} // namespace spice::streaming_agent
> @@ -92,9 +104,8 @@ static bool streaming_requested = false;
>  static bool quit_requested = false;
>  static bool log_binary = false;
>  static std::set<SpiceVideoCodecType> client_codecs;
> -static std::mutex stream_mtx;
>  
> -static int have_something_to_read(int streamfd, int timeout)
> +int Stream::have_something_to_read(int timeout)
>  {
>      struct pollfd pollfd = {streamfd, POLLIN, 0};
>  
> @@ -110,13 +121,13 @@ static int have_something_to_read(int streamfd, int timeout)
>      return 0;
>  }
>  
> -static int read_command_from_device(int streamfd)
> +int Stream::read_command_from_device()
>  {
>      StreamDevHeader hdr;
>      uint8_t msg[64];
>      int n;
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
>      n = read(streamfd, &hdr, sizeof(hdr));
>      if (n != sizeof(hdr)) {
>          syslog(LOG_WARNING,
> @@ -155,29 +166,28 @@ static int read_command_from_device(int streamfd)
>      return 1;
>  }
>  
> -static int read_command(int streamfd, bool blocking)
> +int Stream::read_command(bool blocking)
>  {
>      int timeout = blocking?-1:0;
>      while (!quit_requested) {
> -        if (!have_something_to_read(streamfd, timeout)) {
> +        if (!have_something_to_read(timeout)) {
>              if (!blocking) {
>                  return 0;
>              }
>              sleep(1);
>              continue;
>          }
> -        return read_command_from_device(streamfd);
> +        return read_command_from_device();
>      }
>  
>      return 1;
>  }
>  
> -static size_t
> -write_all(int fd, const void *buf, const size_t len)
> +size_t Stream::write_all(const void *buf, const size_t len)
>  {
>      size_t written = 0;
>      while (written < len) {
> -        int l = write(fd, (const char *) buf + written, len - written);
> +        int l = write(streamfd, (const char *) buf + written, len - written);
>          if (l < 0) {
>              if (errno == EINTR) {
>                  continue;
> @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const size_t len)
>      return written;
>  }
>  
> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>  {
>      const size_t msgsize = sizeof(FormatMessage);
>      const size_t hdrsize  = sizeof(StreamDevHeader);
> @@ -210,14 +220,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
>          }
>      };
>      syslog(LOG_DEBUG, "writing format\n");
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    if (write_all(&msg, msgsize) != msgsize) {
>          return EXIT_FAILURE;
>      }
>      return EXIT_SUCCESS;
>  }
>  
> -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
> +int Stream::send_frame(const void *buf, const unsigned size)
>  {
>      ssize_t n;
>      const size_t msgsize = sizeof(FormatMessage);
> @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>          .msg = {}
>      };
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    n = write_all(streamfd, &msg, msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    n = write_all(&msg, msgsize);
>      syslog(LOG_DEBUG,
>             "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
>             n, msg.hdr.size);
> @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>                 n, msgsize);
>          return EXIT_FAILURE;
>      }
> -    n = write_all(streamfd, buf, size);
> +    n = write_all(buf, size);
>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>      if (n != size) {
>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> @@ -294,11 +304,10 @@ static void usage(const char *progname)
>      exit(1);
>  }
>  
> -static void
> -send_cursor(int streamfd,
> -            uint16_t width, uint16_t height,
> -            uint16_t hotspot_x, uint16_t hotspot_y,
> -            std::function<void(uint32_t *)> fill_cursor)
> +void
> +Stream::send_cursor(uint16_t width, uint16_t height,
> +                    uint16_t hotspot_x, uint16_t hotspot_y,
> +                    std::function<void(uint32_t *)> fill_cursor)
>  {
>      if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>          height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> @@ -332,11 +341,11 @@ send_cursor(int streamfd,
>      uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>      fill_cursor(pixels);
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    write_all(streamfd, storage.get(), cursor_msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    write_all(storage.get(), cursor_msgsize);
>  }
>  
> -static void cursor_changes(int streamfd, Display *display, int event_base)
> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>  {
>      unsigned long last_serial = 0;
>  
> @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
>              for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>                  pixels[i] = cursor->pixels[i];
>          };
> -        send_cursor(streamfd,
> -                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
> +        stream->send_cursor(cursor->width, cursor->height,
> +                            cursor->xhot, cursor->yhot, fill_cursor);
>      }
>  }
>  
>  static void
> -do_capture(int streamfd, const char *streamport, FILE *f_log)
> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>  {
>      unsigned int frame_count = 0;
>      while (!quit_requested) {
>          while (!quit_requested && !streaming_requested) {
> -            if (read_command(streamfd, true) < 0) {
> +            if (stream.read_command(true) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -413,7 +422,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>  
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
>  
> -                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
> +                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
>                      throw std::runtime_error("FAILED to send format message");
>              }
>              if (f_log) {
> @@ -425,13 +434,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>                  }
>              }
> -            if (spice_stream_send_frame(streamfd,
> -                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> +            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>                  syslog(LOG_ERR, "FAILED to send a frame\n");
>                  break;
>              }
>              //usleep(1);
> -            if (read_command(streamfd, false) < 0) {
> +            if (stream.read_command(false) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
>      XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
>  
>      Stream streamfd(streamport);
> -    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
> +    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
>      cursor_th.detach();
>  
>      int ret = EXIT_SUCCESS;
> On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> 
> On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin@redhat.com>
>> 
>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>> ---
>> src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
>> 1 file changed, 47 insertions(+), 39 deletions(-)
>> 
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index f0d79ae..a989ee7 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -71,18 +71,30 @@ class Stream
>> public:
>>     Stream(const char *name)
> 
> I would like to name the class something more descriptive for what it
> is becoming. Class named Stream in a file named "stream.{cpp,hpp}"
> could be almost anything.

But it’s not named “Stream”, it’s called “spice::streaming_agent::Stream” ;-)

I chose short names because I was in that namespace. Otherwise, I agree with you.

Do you think that the name is still too vague even within the namespace?

> My best name is StreamDispatcher so far, not
> entirely happy about it :)


> 
>>     {
>> -        fd = open(name, O_RDWR);
>> -        if (fd < 0)
>> +        streamfd = open(name, O_RDWR);
>> +        if (streamfd < 0)
>>             throw std::runtime_error("failed to open streaming device");
>>     }
>>     ~Stream()
>>     {
>> -        close(fd);
>> +        close(streamfd);
>>     }
>> -    operator int() { return fd; }
>> +    operator int() { return streamfd; }
>> +
>> +    int have_something_to_read(int timeout);
>> +    int read_command_from_device(void);
>> +    int read_command(bool blocking);
>> +
>> +    size_t write_all(const void *buf, const size_t len);
> 
> This method could also use a better name. write_bytes()?
> write_buffer()?

I intended to do a rename in a follow up. My current choice was “write_packet”, because precisely, it’s not writing bytes or a buffer, it’s making sure the whole packet gets sent.

> 
>> +    int send_format(unsigned w, unsigned h, uint8_t c);
>> +    int send_frame(const void *buf, const unsigned size);
>> +    void send_cursor(uint16_t width, uint16_t height,
>> +                     uint16_t hotspot_x, uint16_t hotspot_y,
>> +                     std::function<void(uint32_t *)> fill_cursor);
>> 
>> private:
>> -    int fd = -1;
>> +    int streamfd = -1;
>> +    std::mutex mutex;
>> };
>> 
>> }} // namespace spice::streaming_agent
>> @@ -92,9 +104,8 @@ static bool streaming_requested = false;
>> static bool quit_requested = false;
>> static bool log_binary = false;
>> static std::set<SpiceVideoCodecType> client_codecs;
>> -static std::mutex stream_mtx;
>> 
>> -static int have_something_to_read(int streamfd, int timeout)
>> +int Stream::have_something_to_read(int timeout)
>> {
>>     struct pollfd pollfd = {streamfd, POLLIN, 0};
>> 
>> @@ -110,13 +121,13 @@ static int have_something_to_read(int streamfd, int timeout)
>>     return 0;
>> }
>> 
>> -static int read_command_from_device(int streamfd)
>> +int Stream::read_command_from_device()
>> {
>>     StreamDevHeader hdr;
>>     uint8_t msg[64];
>>     int n;
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>     n = read(streamfd, &hdr, sizeof(hdr));
>>     if (n != sizeof(hdr)) {
>>         syslog(LOG_WARNING,
>> @@ -155,29 +166,28 @@ static int read_command_from_device(int streamfd)
>>     return 1;
>> }
>> 
>> -static int read_command(int streamfd, bool blocking)
>> +int Stream::read_command(bool blocking)
>> {
>>     int timeout = blocking?-1:0;
>>     while (!quit_requested) {
>> -        if (!have_something_to_read(streamfd, timeout)) {
>> +        if (!have_something_to_read(timeout)) {
>>             if (!blocking) {
>>                 return 0;
>>             }
>>             sleep(1);
>>             continue;
>>         }
>> -        return read_command_from_device(streamfd);
>> +        return read_command_from_device();
>>     }
>> 
>>     return 1;
>> }
>> 
>> -static size_t
>> -write_all(int fd, const void *buf, const size_t len)
>> +size_t Stream::write_all(const void *buf, const size_t len)
>> {
>>     size_t written = 0;
>>     while (written < len) {
>> -        int l = write(fd, (const char *) buf + written, len - written);
>> +        int l = write(streamfd, (const char *) buf + written, len - written);
>>         if (l < 0) {
>>             if (errno == EINTR) {
>>                 continue;
>> @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const size_t len)
>>     return written;
>> }
>> 
>> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
>> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>> {
>>     const size_t msgsize = sizeof(FormatMessage);
>>     const size_t hdrsize  = sizeof(StreamDevHeader);
>> @@ -210,14 +220,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
>>         }
>>     };
>>     syslog(LOG_DEBUG, "writing format\n");
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    if (write_all(&msg, msgsize) != msgsize) {
>>         return EXIT_FAILURE;
>>     }
>>     return EXIT_SUCCESS;
>> }
>> 
>> -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
>> +int Stream::send_frame(const void *buf, const unsigned size)
>> {
>>     ssize_t n;
>>     const size_t msgsize = sizeof(FormatMessage);
>> @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>>         .msg = {}
>>     };
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    n = write_all(streamfd, &msg, msgsize);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    n = write_all(&msg, msgsize);
>>     syslog(LOG_DEBUG,
>>            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
>>            n, msg.hdr.size);
>> @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>>                n, msgsize);
>>         return EXIT_FAILURE;
>>     }
>> -    n = write_all(streamfd, buf, size);
>> +    n = write_all(buf, size);
>>     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>>     if (n != size) {
>>         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
>> @@ -294,11 +304,10 @@ static void usage(const char *progname)
>>     exit(1);
>> }
>> 
>> -static void
>> -send_cursor(int streamfd,
>> -            uint16_t width, uint16_t height,
>> -            uint16_t hotspot_x, uint16_t hotspot_y,
>> -            std::function<void(uint32_t *)> fill_cursor)
>> +void
>> +Stream::send_cursor(uint16_t width, uint16_t height,
>> +                    uint16_t hotspot_x, uint16_t hotspot_y,
>> +                    std::function<void(uint32_t *)> fill_cursor)
>> {
>>     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>>         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
>> @@ -332,11 +341,11 @@ send_cursor(int streamfd,
>>     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>>     fill_cursor(pixels);
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    write_all(streamfd, storage.get(), cursor_msgsize);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    write_all(storage.get(), cursor_msgsize);
>> }
>> 
>> -static void cursor_changes(int streamfd, Display *display, int event_base)
>> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>> {
>>     unsigned long last_serial = 0;
>> 
>> @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
>>             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>>                 pixels[i] = cursor->pixels[i];
>>         };
>> -        send_cursor(streamfd,
>> -                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
>> +        stream->send_cursor(cursor->width, cursor->height,
>> +                            cursor->xhot, cursor->yhot, fill_cursor);
>>     }
>> }
>> 
>> static void
>> -do_capture(int streamfd, const char *streamport, FILE *f_log)
>> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>> {
>>     unsigned int frame_count = 0;
>>     while (!quit_requested) {
>>         while (!quit_requested && !streaming_requested) {
>> -            if (read_command(streamfd, true) < 0) {
>> +            if (stream.read_command(true) < 0) {
>>                 syslog(LOG_ERR, "FAILED to read command\n");
>>                 return;
>>             }
>> @@ -413,7 +422,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>> 
>>                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
>> 
>> -                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
>> +                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
>>                     throw std::runtime_error("FAILED to send format message");
>>             }
>>             if (f_log) {
>> @@ -425,13 +434,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>>                     hexdump(frame.buffer, frame.buffer_size, f_log);
>>                 }
>>             }
>> -            if (spice_stream_send_frame(streamfd,
>> -                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>> +            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>>                 syslog(LOG_ERR, "FAILED to send a frame\n");
>>                 break;
>>             }
>>             //usleep(1);
>> -            if (read_command(streamfd, false) < 0) {
>> +            if (stream.read_command(false) < 0) {
>>                 syslog(LOG_ERR, "FAILED to read command\n");
>>                 return;
>>             }
>> @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
>>     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
>> 
>>     Stream streamfd(streamport);
>> -    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
>> +    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
>>     cursor_th.detach();
>> 
>>     int ret = EXIT_SUCCESS;
On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
> > On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky@redhat.com> wrote:
> > 
> > On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > 
> > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > ---
> > > src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
> > > 1 file changed, 47 insertions(+), 39 deletions(-)
> > > 
> > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > index f0d79ae..a989ee7 100644
> > > --- a/src/spice-streaming-agent.cpp
> > > +++ b/src/spice-streaming-agent.cpp
> > > @@ -71,18 +71,30 @@ class Stream
> > > public:
> > >     Stream(const char *name)
> > 
> > I would like to name the class something more descriptive for what it
> > is becoming. Class named Stream in a file named "stream.{cpp,hpp}"
> > could be almost anything.
> 
> But it’s not named “Stream”, it’s called “spice::streaming_agent::Stream” ;-)
> 
> I chose short names because I was in that namespace. Otherwise, I agree with you.
> 
> Do you think that the name is still too vague even within the namespace?

I think so. Everything in streaming agent is in that namespace, even if
it wasn't, you know you're looking at the streaming agent code and
think of the types in that context. Stream is still a pretty generic
name, I suppose you could imagine a number of things under it.

So what is this class in our case. It handles the socket communication
over the streaming channel to the server. Is it accurate to call it a
channel here? If so, maybe StreamingChannel?

> > My best name is StreamDispatcher so far, not
> > entirely happy about it :)
> 
> 
> > 
> > >     {
> > > -        fd = open(name, O_RDWR);
> > > -        if (fd < 0)
> > > +        streamfd = open(name, O_RDWR);
> > > +        if (streamfd < 0)
> > >             throw std::runtime_error("failed to open streaming device");
> > >     }
> > >     ~Stream()
> > >     {
> > > -        close(fd);
> > > +        close(streamfd);
> > >     }
> > > -    operator int() { return fd; }
> > > +    operator int() { return streamfd; }
> > > +
> > > +    int have_something_to_read(int timeout);
> > > +    int read_command_from_device(void);
> > > +    int read_command(bool blocking);
> > > +
> > > +    size_t write_all(const void *buf, const size_t len);
> > 
> > This method could also use a better name. write_bytes()?
> > write_buffer()?
> 
> I intended to do a rename in a follow up. My current choice was “write_packet”, because precisely, it’s not writing bytes or a buffer, it’s making sure the whole packet gets sent.

What do you mean by packet here? Does it have a specific meaning in
this context? It just sends an array of binary data, doesn't it? Like
later on you use it to write the header and message body separately.

> > 
> > > +    int send_format(unsigned w, unsigned h, uint8_t c);
> > > +    int send_frame(const void *buf, const unsigned size);
> > > +    void send_cursor(uint16_t width, uint16_t height,
> > > +                     uint16_t hotspot_x, uint16_t hotspot_y,
> > > +                     std::function<void(uint32_t *)> fill_cursor);
> > > 
> > > private:
> > > -    int fd = -1;
> > > +    int streamfd = -1;
> > > +    std::mutex mutex;
> > > };
> > > 
> > > }} // namespace spice::streaming_agent
> > > @@ -92,9 +104,8 @@ static bool streaming_requested = false;
> > > static bool quit_requested = false;
> > > static bool log_binary = false;
> > > static std::set<SpiceVideoCodecType> client_codecs;
> > > -static std::mutex stream_mtx;
> > > 
> > > -static int have_something_to_read(int streamfd, int timeout)
> > > +int Stream::have_something_to_read(int timeout)
> > > {
> > >     struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > 
> > > @@ -110,13 +121,13 @@ static int have_something_to_read(int streamfd, int timeout)
> > >     return 0;
> > > }
> > > 
> > > -static int read_command_from_device(int streamfd)
> > > +int Stream::read_command_from_device()
> > > {
> > >     StreamDevHeader hdr;
> > >     uint8_t msg[64];
> > >     int n;
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > >     n = read(streamfd, &hdr, sizeof(hdr));
> > >     if (n != sizeof(hdr)) {
> > >         syslog(LOG_WARNING,
> > > @@ -155,29 +166,28 @@ static int read_command_from_device(int streamfd)
> > >     return 1;
> > > }
> > > 
> > > -static int read_command(int streamfd, bool blocking)
> > > +int Stream::read_command(bool blocking)
> > > {
> > >     int timeout = blocking?-1:0;
> > >     while (!quit_requested) {
> > > -        if (!have_something_to_read(streamfd, timeout)) {
> > > +        if (!have_something_to_read(timeout)) {
> > >             if (!blocking) {
> > >                 return 0;
> > >             }
> > >             sleep(1);
> > >             continue;
> > >         }
> > > -        return read_command_from_device(streamfd);
> > > +        return read_command_from_device();
> > >     }
> > > 
> > >     return 1;
> > > }
> > > 
> > > -static size_t
> > > -write_all(int fd, const void *buf, const size_t len)
> > > +size_t Stream::write_all(const void *buf, const size_t len)
> > > {
> > >     size_t written = 0;
> > >     while (written < len) {
> > > -        int l = write(fd, (const char *) buf + written, len - written);
> > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > >         if (l < 0) {
> > >             if (errno == EINTR) {
> > >                 continue;
> > > @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const size_t len)
> > >     return written;
> > > }
> > > 
> > > -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
> > > +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> > > {
> > >     const size_t msgsize = sizeof(FormatMessage);
> > >     const size_t hdrsize  = sizeof(StreamDevHeader);
> > > @@ -210,14 +220,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
> > >         }
> > >     };
> > >     syslog(LOG_DEBUG, "writing format\n");
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    if (write_all(&msg, msgsize) != msgsize) {
> > >         return EXIT_FAILURE;
> > >     }
> > >     return EXIT_SUCCESS;
> > > }
> > > 
> > > -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
> > > +int Stream::send_frame(const void *buf, const unsigned size)
> > > {
> > >     ssize_t n;
> > >     const size_t msgsize = sizeof(FormatMessage);
> > > @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
> > >         .msg = {}
> > >     };
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    n = write_all(streamfd, &msg, msgsize);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    n = write_all(&msg, msgsize);
> > >     syslog(LOG_DEBUG,
> > >            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
> > >            n, msg.hdr.size);
> > > @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
> > >                n, msgsize);
> > >         return EXIT_FAILURE;
> > >     }
> > > -    n = write_all(streamfd, buf, size);
> > > +    n = write_all(buf, size);
> > >     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> > >     if (n != size) {
> > >         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> > > @@ -294,11 +304,10 @@ static void usage(const char *progname)
> > >     exit(1);
> > > }
> > > 
> > > -static void
> > > -send_cursor(int streamfd,
> > > -            uint16_t width, uint16_t height,
> > > -            uint16_t hotspot_x, uint16_t hotspot_y,
> > > -            std::function<void(uint32_t *)> fill_cursor)
> > > +void
> > > +Stream::send_cursor(uint16_t width, uint16_t height,
> > > +                    uint16_t hotspot_x, uint16_t hotspot_y,
> > > +                    std::function<void(uint32_t *)> fill_cursor)
> > > {
> > >     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
> > >         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> > > @@ -332,11 +341,11 @@ send_cursor(int streamfd,
> > >     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
> > >     fill_cursor(pixels);
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    write_all(streamfd, storage.get(), cursor_msgsize);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    write_all(storage.get(), cursor_msgsize);
> > > }
> > > 
> > > -static void cursor_changes(int streamfd, Display *display, int event_base)
> > > +static void cursor_changes(Stream *stream, Display *display, int event_base)
> > > {
> > >     unsigned long last_serial = 0;
> > > 
> > > @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
> > >             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> > >                 pixels[i] = cursor->pixels[i];
> > >         };
> > > -        send_cursor(streamfd,
> > > -                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
> > > +        stream->send_cursor(cursor->width, cursor->height,
> > > +                            cursor->xhot, cursor->yhot, fill_cursor);
> > >     }
> > > }
> > > 
> > > static void
> > > -do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > +do_capture(Stream &stream, const char *streamport, FILE *f_log)
> > > {
> > >     unsigned int frame_count = 0;
> > >     while (!quit_requested) {
> > >         while (!quit_requested && !streaming_requested) {
> > > -            if (read_command(streamfd, true) < 0) {
> > > +            if (stream.read_command(true) < 0) {
> > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > >                 return;
> > >             }
> > > @@ -413,7 +422,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > 
> > >                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
> > > 
> > > -                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
> > > +                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
> > >                     throw std::runtime_error("FAILED to send format message");
> > >             }
> > >             if (f_log) {
> > > @@ -425,13 +434,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
> > >                     hexdump(frame.buffer, frame.buffer_size, f_log);
> > >                 }
> > >             }
> > > -            if (spice_stream_send_frame(streamfd,
> > > -                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> > > +            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> > >                 syslog(LOG_ERR, "FAILED to send a frame\n");
> > >                 break;
> > >             }
> > >             //usleep(1);
> > > -            if (read_command(streamfd, false) < 0) {
> > > +            if (stream.read_command(false) < 0) {
> > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > >                 return;
> > >             }
> > > @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
> > >     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
> > > 
> > >     Stream streamfd(streamport);
> > > -    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
> > > +    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
> > >     cursor_th.detach();
> > > 
> > >     int ret = EXIT_SUCCESS;
> 
>
On Tue, 2018-02-20 at 14:29 +0100, Lukáš Hrázký wrote:
> On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
> > > On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky@redhat.com>
> > > wrote:
> > > 
> > > On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> > > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > > 
> > > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > > ---
> > > > src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++-----
> > > > ---------------
> > > > 1 file changed, 47 insertions(+), 39 deletions(-)
> > > > 
> > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-
> > > > streaming-agent.cpp
> > > > index f0d79ae..a989ee7 100644
> > > > --- a/src/spice-streaming-agent.cpp
> > > > +++ b/src/spice-streaming-agent.cpp
> > > > @@ -71,18 +71,30 @@ class Stream
> > > > public:
> > > >     Stream(const char *name)
> > > 
> > > I would like to name the class something more descriptive for
> > > what it
> > > is becoming. Class named Stream in a file named
> > > "stream.{cpp,hpp}"
> > > could be almost anything.
> > 
> > But it’s not named “Stream”, it’s called
> > “spice::streaming_agent::Stream” ;-)
> > 
> > I chose short names because I was in that namespace. Otherwise, I
> > agree with you.
> > 
> > Do you think that the name is still too vague even within the
> > namespace?
> 
> I think so. Everything in streaming agent is in that namespace, even
> if
> it wasn't, you know you're looking at the streaming agent code and
> think of the types in that context. Stream is still a pretty generic
> name, I suppose you could imagine a number of things under it.
> 
> So what is this class in our case. It handles the socket
> communication
> over the streaming channel to the server. Is it accurate to call it a
> channel here? If so, maybe StreamingChannel?

It's true that "Stream" in the context of the streaming agent could
make you think that it was actually representing the encoded video
stream, rather than an encapsulation of a communication channel. But I
don't like the name StreamDispatcher. Glib uses the name GIOChannel for
something similar. Maybe IOChannel?

> 
> > > My best name is StreamDispatcher so far, not
> > > entirely happy about it :)
> > 
> > 
> > > 
> > > >     {
> > > > -        fd = open(name, O_RDWR);
> > > > -        if (fd < 0)
> > > > +        streamfd = open(name, O_RDWR);
> > > > +        if (streamfd < 0)
> > > >             throw std::runtime_error("failed to open streaming
> > > > device");
> > > >     }
> > > >     ~Stream()
> > > >     {
> > > > -        close(fd);
> > > > +        close(streamfd);
> > > >     }
> > > > -    operator int() { return fd; }
> > > > +    operator int() { return streamfd; }
> > > > +
> > > > +    int have_something_to_read(int timeout);
> > > > +    int read_command_from_device(void);
> > > > +    int read_command(bool blocking);
> > > > +
> > > > +    size_t write_all(const void *buf, const size_t len);
> > > 
> > > This method could also use a better name. write_bytes()?
> > > write_buffer()?
> > 
> > I intended to do a rename in a follow up. My current choice was
> > “write_packet”, because precisely, it’s not writing bytes or a
> > buffer, it’s making sure the whole packet gets sent.
> 
> What do you mean by packet here? Does it have a specific meaning in
> this context? It just sends an array of binary data, doesn't it? Like
> later on you use it to write the header and message body separately.
> 
> > > 
> > > > +    int send_format(unsigned w, unsigned h, uint8_t c);
> > > > +    int send_frame(const void *buf, const unsigned size);
> > > > +    void send_cursor(uint16_t width, uint16_t height,
> > > > +                     uint16_t hotspot_x, uint16_t hotspot_y,
> > > > +                     std::function<void(uint32_t *)>
> > > > fill_cursor);
> > > > 
> > > > private:
> > > > -    int fd = -1;
> > > > +    int streamfd = -1;
> > > > +    std::mutex mutex;
> > > > };
> > > > 
> > > > }} // namespace spice::streaming_agent
> > > > @@ -92,9 +104,8 @@ static bool streaming_requested = false;
> > > > static bool quit_requested = false;
> > > > static bool log_binary = false;
> > > > static std::set<SpiceVideoCodecType> client_codecs;
> > > > -static std::mutex stream_mtx;
> > > > 
> > > > -static int have_something_to_read(int streamfd, int timeout)
> > > > +int Stream::have_something_to_read(int timeout)
> > > > {
> > > >     struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > 
> > > > @@ -110,13 +121,13 @@ static int have_something_to_read(int
> > > > streamfd, int timeout)
> > > >     return 0;
> > > > }
> > > > 
> > > > -static int read_command_from_device(int streamfd)
> > > > +int Stream::read_command_from_device()
> > > > {
> > > >     StreamDevHeader hdr;
> > > >     uint8_t msg[64];
> > > >     int n;
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > >     n = read(streamfd, &hdr, sizeof(hdr));
> > > >     if (n != sizeof(hdr)) {
> > > >         syslog(LOG_WARNING,
> > > > @@ -155,29 +166,28 @@ static int read_command_from_device(int
> > > > streamfd)
> > > >     return 1;
> > > > }
> > > > 
> > > > -static int read_command(int streamfd, bool blocking)
> > > > +int Stream::read_command(bool blocking)
> > > > {
> > > >     int timeout = blocking?-1:0;
> > > >     while (!quit_requested) {
> > > > -        if (!have_something_to_read(streamfd, timeout)) {
> > > > +        if (!have_something_to_read(timeout)) {
> > > >             if (!blocking) {
> > > >                 return 0;
> > > >             }
> > > >             sleep(1);
> > > >             continue;
> > > >         }
> > > > -        return read_command_from_device(streamfd);
> > > > +        return read_command_from_device();
> > > >     }
> > > > 
> > > >     return 1;
> > > > }
> > > > 
> > > > -static size_t
> > > > -write_all(int fd, const void *buf, const size_t len)
> > > > +size_t Stream::write_all(const void *buf, const size_t len)
> > > > {
> > > >     size_t written = 0;
> > > >     while (written < len) {
> > > > -        int l = write(fd, (const char *) buf + written, len -
> > > > written);
> > > > +        int l = write(streamfd, (const char *) buf + written,
> > > > len - written);
> > > >         if (l < 0) {
> > > >             if (errno == EINTR) {
> > > >                 continue;
> > > > @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const
> > > > size_t len)
> > > >     return written;
> > > > }
> > > > 
> > > > -static int spice_stream_send_format(int streamfd, unsigned w,
> > > > unsigned h, uint8_t c)
> > > > +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> > > > {
> > > >     const size_t msgsize = sizeof(FormatMessage);
> > > >     const size_t hdrsize  = sizeof(StreamDevHeader);
> > > > @@ -210,14 +220,14 @@ static int spice_stream_send_format(int
> > > > streamfd, unsigned w, unsigned h, uint8_
> > > >         }
> > > >     };
> > > >     syslog(LOG_DEBUG, "writing format\n");
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    if (write_all(&msg, msgsize) != msgsize) {
> > > >         return EXIT_FAILURE;
> > > >     }
> > > >     return EXIT_SUCCESS;
> > > > }
> > > > 
> > > > -static int spice_stream_send_frame(int streamfd, const void
> > > > *buf, const unsigned size)
> > > > +int Stream::send_frame(const void *buf, const unsigned size)
> > > > {
> > > >     ssize_t n;
> > > >     const size_t msgsize = sizeof(FormatMessage);
> > > > @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int
> > > > streamfd, const void *buf, const unsigned
> > > >         .msg = {}
> > > >     };
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    n = write_all(streamfd, &msg, msgsize);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    n = write_all(&msg, msgsize);
> > > >     syslog(LOG_DEBUG,
> > > >            "wrote %ld bytes of header of data msg with frame of
> > > > size %u bytes\n",
> > > >            n, msg.hdr.size);
> > > > @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int
> > > > streamfd, const void *buf, const unsigned
> > > >                n, msgsize);
> > > >         return EXIT_FAILURE;
> > > >     }
> > > > -    n = write_all(streamfd, buf, size);
> > > > +    n = write_all(buf, size);
> > > >     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> > > >     if (n != size) {
> > > >         syslog(LOG_WARNING, "write_all header: wrote %ld
> > > > expected %u\n",
> > > > @@ -294,11 +304,10 @@ static void usage(const char *progname)
> > > >     exit(1);
> > > > }
> > > > 
> > > > -static void
> > > > -send_cursor(int streamfd,
> > > > -            uint16_t width, uint16_t height,
> > > > -            uint16_t hotspot_x, uint16_t hotspot_y,
> > > > -            std::function<void(uint32_t *)> fill_cursor)
> > > > +void
> > > > +Stream::send_cursor(uint16_t width, uint16_t height,
> > > > +                    uint16_t hotspot_x, uint16_t hotspot_y,
> > > > +                    std::function<void(uint32_t *)>
> > > > fill_cursor)
> > > > {
> > > >     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
> > > >         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> > > > @@ -332,11 +341,11 @@ send_cursor(int streamfd,
> > > >     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg-
> > > > >msg.data);
> > > >     fill_cursor(pixels);
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    write_all(streamfd, storage.get(), cursor_msgsize);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    write_all(storage.get(), cursor_msgsize);
> > > > }
> > > > 
> > > > -static void cursor_changes(int streamfd, Display *display, int
> > > > event_base)
> > > > +static void cursor_changes(Stream *stream, Display *display,
> > > > int event_base)
> > > > {
> > > >     unsigned long last_serial = 0;
> > > > 
> > > > @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd,
> > > > Display *display, int event_base)
> > > >             for (unsigned i = 0; i < cursor->width * cursor-
> > > > >height; ++i)
> > > >                 pixels[i] = cursor->pixels[i];
> > > >         };
> > > > -        send_cursor(streamfd,
> > > > -                    cursor->width, cursor->height, cursor-
> > > > >xhot, cursor->yhot, fill_cursor);
> > > > +        stream->send_cursor(cursor->width, cursor->height,
> > > > +                            cursor->xhot, cursor->yhot,
> > > > fill_cursor);
> > > >     }
> > > > }
> > > > 
> > > > static void
> > > > -do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > > +do_capture(Stream &stream, const char *streamport, FILE
> > > > *f_log)
> > > > {
> > > >     unsigned int frame_count = 0;
> > > >     while (!quit_requested) {
> > > >         while (!quit_requested && !streaming_requested) {
> > > > -            if (read_command(streamfd, true) < 0) {
> > > > +            if (stream.read_command(true) < 0) {
> > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > >                 return;
> > > >             }
> > > > @@ -413,7 +422,7 @@ do_capture(int streamfd, const char
> > > > *streamport, FILE *f_log)
> > > > 
> > > >                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n",
> > > > width, height, codec);
> > > > 
> > > > -                if (spice_stream_send_format(streamfd, width,
> > > > height, codec) == EXIT_FAILURE)
> > > > +                if (stream.send_format(width, height, codec)
> > > > == EXIT_FAILURE)
> > > >                     throw std::runtime_error("FAILED to send
> > > > format message");
> > > >             }
> > > >             if (f_log) {
> > > > @@ -425,13 +434,12 @@ do_capture(int streamfd, const char
> > > > *streamport, FILE *f_log)
> > > >                     hexdump(frame.buffer, frame.buffer_size,
> > > > f_log);
> > > >                 }
> > > >             }
> > > > -            if (spice_stream_send_frame(streamfd,
> > > > -                                        frame.buffer,
> > > > frame.buffer_size) == EXIT_FAILURE) {
> > > > +            if (stream.send_frame(frame.buffer,
> > > > frame.buffer_size) == EXIT_FAILURE) {
> > > >                 syslog(LOG_ERR, "FAILED to send a frame\n");
> > > >                 break;
> > > >             }
> > > >             //usleep(1);
> > > > -            if (read_command(streamfd, false) < 0) {
> > > > +            if (stream.read_command(false) < 0) {
> > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > >                 return;
> > > >             }
> > > > @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
> > > >     XFixesSelectCursorInput(display, rootwindow,
> > > > XFixesDisplayCursorNotifyMask);
> > > > 
> > > >     Stream streamfd(streamport);
> > > > -    std::thread cursor_th(cursor_changes, (int) streamfd,
> > > > display, event_base);
> > > > +    std::thread cursor_th(cursor_changes, &streamfd, display,
> > > > event_base);
> > > >     cursor_th.detach();
> > > > 
> > > >     int ret = EXIT_SUCCESS;
> > 
> > 
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
> On 20 Feb 2018, at 22:24, Jonathon Jongsma <jjongsma@redhat.com> wrote:
> 
> On Tue, 2018-02-20 at 14:29 +0100, Lukáš Hrázký wrote:
>> On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
>>>> On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky@redhat.com>
>>>> wrote:
>>>> 
>>>> On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
>>>>> From: Christophe de Dinechin <dinechin@redhat.com>
>>>>> 
>>>>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>>>>> ---
>>>>> src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++-----
>>>>> ---------------
>>>>> 1 file changed, 47 insertions(+), 39 deletions(-)
>>>>> 
>>>>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-
>>>>> streaming-agent.cpp
>>>>> index f0d79ae..a989ee7 100644
>>>>> --- a/src/spice-streaming-agent.cpp
>>>>> +++ b/src/spice-streaming-agent.cpp
>>>>> @@ -71,18 +71,30 @@ class Stream
>>>>> public:
>>>>>    Stream(const char *name)
>>>> 
>>>> I would like to name the class something more descriptive for
>>>> what it
>>>> is becoming. Class named Stream in a file named
>>>> "stream.{cpp,hpp}"
>>>> could be almost anything.
>>> 
>>> But it’s not named “Stream”, it’s called
>>> “spice::streaming_agent::Stream” ;-)
>>> 
>>> I chose short names because I was in that namespace. Otherwise, I
>>> agree with you.
>>> 
>>> Do you think that the name is still too vague even within the
>>> namespace?
>> 
>> I think so. Everything in streaming agent is in that namespace, even
>> if
>> it wasn't, you know you're looking at the streaming agent code and
>> think of the types in that context. Stream is still a pretty generic
>> name, I suppose you could imagine a number of things under it.
>> 
>> So what is this class in our case. It handles the socket
>> communication
>> over the streaming channel to the server. Is it accurate to call it a
>> channel here? If so, maybe StreamingChannel?
> 
> It's true that "Stream" in the context of the streaming agent could
> make you think that it was actually representing the encoded video
> stream, rather than an encapsulation of a communication channel. But I
> don't like the name StreamDispatcher. Glib uses the name GIOChannel for
> something similar. Maybe IOChannel?

I like the fact that IOChannel points out that it’s both input and output. Sounds good to me.

> 
>> 
>>>> My best name is StreamDispatcher so far, not
>>>> entirely happy about it :)
>>> 
>>> 
>>>> 
>>>>>    {
>>>>> -        fd = open(name, O_RDWR);
>>>>> -        if (fd < 0)
>>>>> +        streamfd = open(name, O_RDWR);
>>>>> +        if (streamfd < 0)
>>>>>            throw std::runtime_error("failed to open streaming
>>>>> device");
>>>>>    }
>>>>>    ~Stream()
>>>>>    {
>>>>> -        close(fd);
>>>>> +        close(streamfd);
>>>>>    }
>>>>> -    operator int() { return fd; }
>>>>> +    operator int() { return streamfd; }
>>>>> +
>>>>> +    int have_something_to_read(int timeout);
>>>>> +    int read_command_from_device(void);
>>>>> +    int read_command(bool blocking);
>>>>> +
>>>>> +    size_t write_all(const void *buf, const size_t len);
>>>> 
>>>> This method could also use a better name. write_bytes()?
>>>> write_buffer()?
>>> 
>>> I intended to do a rename in a follow up. My current choice was
>>> “write_packet”, because precisely, it’s not writing bytes or a
>>> buffer, it’s making sure the whole packet gets sent.
>> 
>> What do you mean by packet here? Does it have a specific meaning in
>> this context? It just sends an array of binary data, doesn't it? Like
>> later on you use it to write the header and message body separately.
>> 
>>>> 
>>>>> +    int send_format(unsigned w, unsigned h, uint8_t c);
>>>>> +    int send_frame(const void *buf, const unsigned size);
>>>>> +    void send_cursor(uint16_t width, uint16_t height,
>>>>> +                     uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> +                     std::function<void(uint32_t *)>
>>>>> fill_cursor);
>>>>> 
>>>>> private:
>>>>> -    int fd = -1;
>>>>> +    int streamfd = -1;
>>>>> +    std::mutex mutex;
>>>>> };
>>>>> 
>>>>> }} // namespace spice::streaming_agent
>>>>> @@ -92,9 +104,8 @@ static bool streaming_requested = false;
>>>>> static bool quit_requested = false;
>>>>> static bool log_binary = false;
>>>>> static std::set<SpiceVideoCodecType> client_codecs;
>>>>> -static std::mutex stream_mtx;
>>>>> 
>>>>> -static int have_something_to_read(int streamfd, int timeout)
>>>>> +int Stream::have_something_to_read(int timeout)
>>>>> {
>>>>>    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>> 
>>>>> @@ -110,13 +121,13 @@ static int have_something_to_read(int
>>>>> streamfd, int timeout)
>>>>>    return 0;
>>>>> }
>>>>> 
>>>>> -static int read_command_from_device(int streamfd)
>>>>> +int Stream::read_command_from_device()
>>>>> {
>>>>>    StreamDevHeader hdr;
>>>>>    uint8_t msg[64];
>>>>>    int n;
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>    if (n != sizeof(hdr)) {
>>>>>        syslog(LOG_WARNING,
>>>>> @@ -155,29 +166,28 @@ static int read_command_from_device(int
>>>>> streamfd)
>>>>>    return 1;
>>>>> }
>>>>> 
>>>>> -static int read_command(int streamfd, bool blocking)
>>>>> +int Stream::read_command(bool blocking)
>>>>> {
>>>>>    int timeout = blocking?-1:0;
>>>>>    while (!quit_requested) {
>>>>> -        if (!have_something_to_read(streamfd, timeout)) {
>>>>> +        if (!have_something_to_read(timeout)) {
>>>>>            if (!blocking) {
>>>>>                return 0;
>>>>>            }
>>>>>            sleep(1);
>>>>>            continue;
>>>>>        }
>>>>> -        return read_command_from_device(streamfd);
>>>>> +        return read_command_from_device();
>>>>>    }
>>>>> 
>>>>>    return 1;
>>>>> }
>>>>> 
>>>>> -static size_t
>>>>> -write_all(int fd, const void *buf, const size_t len)
>>>>> +size_t Stream::write_all(const void *buf, const size_t len)
>>>>> {
>>>>>    size_t written = 0;
>>>>>    while (written < len) {
>>>>> -        int l = write(fd, (const char *) buf + written, len -
>>>>> written);
>>>>> +        int l = write(streamfd, (const char *) buf + written,
>>>>> len - written);
>>>>>        if (l < 0) {
>>>>>            if (errno == EINTR) {
>>>>>                continue;
>>>>> @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const
>>>>> size_t len)
>>>>>    return written;
>>>>> }
>>>>> 
>>>>> -static int spice_stream_send_format(int streamfd, unsigned w,
>>>>> unsigned h, uint8_t c)
>>>>> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>>>>> {
>>>>>    const size_t msgsize = sizeof(FormatMessage);
>>>>>    const size_t hdrsize  = sizeof(StreamDevHeader);
>>>>> @@ -210,14 +220,14 @@ static int spice_stream_send_format(int
>>>>> streamfd, unsigned w, unsigned h, uint8_
>>>>>        }
>>>>>    };
>>>>>    syslog(LOG_DEBUG, "writing format\n");
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    if (write_all(&msg, msgsize) != msgsize) {
>>>>>        return EXIT_FAILURE;
>>>>>    }
>>>>>    return EXIT_SUCCESS;
>>>>> }
>>>>> 
>>>>> -static int spice_stream_send_frame(int streamfd, const void
>>>>> *buf, const unsigned size)
>>>>> +int Stream::send_frame(const void *buf, const unsigned size)
>>>>> {
>>>>>    ssize_t n;
>>>>>    const size_t msgsize = sizeof(FormatMessage);
>>>>> @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int
>>>>> streamfd, const void *buf, const unsigned
>>>>>        .msg = {}
>>>>>    };
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    n = write_all(streamfd, &msg, msgsize);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    n = write_all(&msg, msgsize);
>>>>>    syslog(LOG_DEBUG,
>>>>>           "wrote %ld bytes of header of data msg with frame of
>>>>> size %u bytes\n",
>>>>>           n, msg.hdr.size);
>>>>> @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int
>>>>> streamfd, const void *buf, const unsigned
>>>>>               n, msgsize);
>>>>>        return EXIT_FAILURE;
>>>>>    }
>>>>> -    n = write_all(streamfd, buf, size);
>>>>> +    n = write_all(buf, size);
>>>>>    syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>>>>>    if (n != size) {
>>>>>        syslog(LOG_WARNING, "write_all header: wrote %ld
>>>>> expected %u\n",
>>>>> @@ -294,11 +304,10 @@ static void usage(const char *progname)
>>>>>    exit(1);
>>>>> }
>>>>> 
>>>>> -static void
>>>>> -send_cursor(int streamfd,
>>>>> -            uint16_t width, uint16_t height,
>>>>> -            uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> -            std::function<void(uint32_t *)> fill_cursor)
>>>>> +void
>>>>> +Stream::send_cursor(uint16_t width, uint16_t height,
>>>>> +                    uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> +                    std::function<void(uint32_t *)>
>>>>> fill_cursor)
>>>>> {
>>>>>    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>>>>>        height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
>>>>> @@ -332,11 +341,11 @@ send_cursor(int streamfd,
>>>>>    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg-
>>>>>> msg.data);
>>>>>    fill_cursor(pixels);
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    write_all(streamfd, storage.get(), cursor_msgsize);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    write_all(storage.get(), cursor_msgsize);
>>>>> }
>>>>> 
>>>>> -static void cursor_changes(int streamfd, Display *display, int
>>>>> event_base)
>>>>> +static void cursor_changes(Stream *stream, Display *display,
>>>>> int event_base)
>>>>> {
>>>>>    unsigned long last_serial = 0;
>>>>> 
>>>>> @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd,
>>>>> Display *display, int event_base)
>>>>>            for (unsigned i = 0; i < cursor->width * cursor-
>>>>>> height; ++i)
>>>>>                pixels[i] = cursor->pixels[i];
>>>>>        };
>>>>> -        send_cursor(streamfd,
>>>>> -                    cursor->width, cursor->height, cursor-
>>>>>> xhot, cursor->yhot, fill_cursor);
>>>>> +        stream->send_cursor(cursor->width, cursor->height,
>>>>> +                            cursor->xhot, cursor->yhot,
>>>>> fill_cursor);
>>>>>    }
>>>>> }
>>>>> 
>>>>> static void
>>>>> -do_capture(int streamfd, const char *streamport, FILE *f_log)
>>>>> +do_capture(Stream &stream, const char *streamport, FILE
>>>>> *f_log)
>>>>> {
>>>>>    unsigned int frame_count = 0;
>>>>>    while (!quit_requested) {
>>>>>        while (!quit_requested && !streaming_requested) {
>>>>> -            if (read_command(streamfd, true) < 0) {
>>>>> +            if (stream.read_command(true) < 0) {
>>>>>                syslog(LOG_ERR, "FAILED to read command\n");
>>>>>                return;
>>>>>            }
>>>>> @@ -413,7 +422,7 @@ do_capture(int streamfd, const char
>>>>> *streamport, FILE *f_log)
>>>>> 
>>>>>                syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n",
>>>>> width, height, codec);
>>>>> 
>>>>> -                if (spice_stream_send_format(streamfd, width,
>>>>> height, codec) == EXIT_FAILURE)
>>>>> +                if (stream.send_format(width, height, codec)
>>>>> == EXIT_FAILURE)
>>>>>                    throw std::runtime_error("FAILED to send
>>>>> format message");
>>>>>            }
>>>>>            if (f_log) {
>>>>> @@ -425,13 +434,12 @@ do_capture(int streamfd, const char
>>>>> *streamport, FILE *f_log)
>>>>>                    hexdump(frame.buffer, frame.buffer_size,
>>>>> f_log);
>>>>>                }
>>>>>            }
>>>>> -            if (spice_stream_send_frame(streamfd,
>>>>> -                                        frame.buffer,
>>>>> frame.buffer_size) == EXIT_FAILURE) {
>>>>> +            if (stream.send_frame(frame.buffer,
>>>>> frame.buffer_size) == EXIT_FAILURE) {
>>>>>                syslog(LOG_ERR, "FAILED to send a frame\n");
>>>>>                break;
>>>>>            }
>>>>>            //usleep(1);
>>>>> -            if (read_command(streamfd, false) < 0) {
>>>>> +            if (stream.read_command(false) < 0) {
>>>>>                syslog(LOG_ERR, "FAILED to read command\n");
>>>>>                return;
>>>>>            }
>>>>> @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
>>>>>    XFixesSelectCursorInput(display, rootwindow,
>>>>> XFixesDisplayCursorNotifyMask);
>>>>> 
>>>>>    Stream streamfd(streamport);
>>>>> -    std::thread cursor_th(cursor_changes, (int) streamfd,
>>>>> display, event_base);
>>>>> +    std::thread cursor_th(cursor_changes, &streamfd, display,
>>>>> event_base);
>>>>>    cursor_th.detach();
>>>>> 
>>>>>    int ret = EXIT_SUCCESS;
>>> 
>>> 
>> 
>> _______________________________________________
>> Spice-devel mailing list
>> Spice-devel@lists.freedesktop.org
>> https://lists.freedesktop.org/mailman/listinfo/spice-devel
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
On Tue, 2018-02-20 at 15:24 -0600, Jonathon Jongsma wrote:
> On Tue, 2018-02-20 at 14:29 +0100, Lukáš Hrázký wrote:
> > On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
> > > > On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky@redhat.com>
> > > > wrote:
> > > > 
> > > > On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> > > > > From: Christophe de Dinechin <dinechin@redhat.com>
> > > > > 
> > > > > Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> > > > > ---
> > > > > src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++-----
> > > > > ---------------
> > > > > 1 file changed, 47 insertions(+), 39 deletions(-)
> > > > > 
> > > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-
> > > > > streaming-agent.cpp
> > > > > index f0d79ae..a989ee7 100644
> > > > > --- a/src/spice-streaming-agent.cpp
> > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > @@ -71,18 +71,30 @@ class Stream
> > > > > public:
> > > > >     Stream(const char *name)
> > > > 
> > > > I would like to name the class something more descriptive for
> > > > what it
> > > > is becoming. Class named Stream in a file named
> > > > "stream.{cpp,hpp}"
> > > > could be almost anything.
> > > 
> > > But it’s not named “Stream”, it’s called
> > > “spice::streaming_agent::Stream” ;-)
> > > 
> > > I chose short names because I was in that namespace. Otherwise, I
> > > agree with you.
> > > 
> > > Do you think that the name is still too vague even within the
> > > namespace?
> > 
> > I think so. Everything in streaming agent is in that namespace, even
> > if
> > it wasn't, you know you're looking at the streaming agent code and
> > think of the types in that context. Stream is still a pretty generic
> > name, I suppose you could imagine a number of things under it.
> > 
> > So what is this class in our case. It handles the socket
> > communication
> > over the streaming channel to the server. Is it accurate to call it a
> > channel here? If so, maybe StreamingChannel?
> 
> It's true that "Stream" in the context of the streaming agent could
> make you think that it was actually representing the encoded video
> stream, rather than an encapsulation of a communication channel. But I
> don't like the name StreamDispatcher. Glib uses the name GIOChannel for
> something similar. Maybe IOChannel?

Could be, but at this moment, the class is specific to the streaming
'channel', and I'm not sure if we will make it generic in forseeable
future (it could be done, but since we don't have other channels atm.
and may never have, it is not necessary). So StreamIOChannel? Then I
would just leave out the IO as it seems a bit unnecessary and we get to
the StreamChannel/StreamingChannel?

> > 
> > > > My best name is StreamDispatcher so far, not
> > > > entirely happy about it :)
> > > 
> > > 
> > > > 
> > > > >     {
> > > > > -        fd = open(name, O_RDWR);
> > > > > -        if (fd < 0)
> > > > > +        streamfd = open(name, O_RDWR);
> > > > > +        if (streamfd < 0)
> > > > >             throw std::runtime_error("failed to open streaming
> > > > > device");
> > > > >     }
> > > > >     ~Stream()
> > > > >     {
> > > > > -        close(fd);
> > > > > +        close(streamfd);
> > > > >     }
> > > > > -    operator int() { return fd; }
> > > > > +    operator int() { return streamfd; }
> > > > > +
> > > > > +    int have_something_to_read(int timeout);
> > > > > +    int read_command_from_device(void);
> > > > > +    int read_command(bool blocking);
> > > > > +
> > > > > +    size_t write_all(const void *buf, const size_t len);
> > > > 
> > > > This method could also use a better name. write_bytes()?
> > > > write_buffer()?
> > > 
> > > I intended to do a rename in a follow up. My current choice was
> > > “write_packet”, because precisely, it’s not writing bytes or a
> > > buffer, it’s making sure the whole packet gets sent.
> > 
> > What do you mean by packet here? Does it have a specific meaning in
> > this context? It just sends an array of binary data, doesn't it? Like
> > later on you use it to write the header and message body separately.
> > 
> > > > 
> > > > > +    int send_format(unsigned w, unsigned h, uint8_t c);
> > > > > +    int send_frame(const void *buf, const unsigned size);
> > > > > +    void send_cursor(uint16_t width, uint16_t height,
> > > > > +                     uint16_t hotspot_x, uint16_t hotspot_y,
> > > > > +                     std::function<void(uint32_t *)>
> > > > > fill_cursor);
> > > > > 
> > > > > private:
> > > > > -    int fd = -1;
> > > > > +    int streamfd = -1;
> > > > > +    std::mutex mutex;
> > > > > };
> > > > > 
> > > > > }} // namespace spice::streaming_agent
> > > > > @@ -92,9 +104,8 @@ static bool streaming_requested = false;
> > > > > static bool quit_requested = false;
> > > > > static bool log_binary = false;
> > > > > static std::set<SpiceVideoCodecType> client_codecs;
> > > > > -static std::mutex stream_mtx;
> > > > > 
> > > > > -static int have_something_to_read(int streamfd, int timeout)
> > > > > +int Stream::have_something_to_read(int timeout)
> > > > > {
> > > > >     struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > 
> > > > > @@ -110,13 +121,13 @@ static int have_something_to_read(int
> > > > > streamfd, int timeout)
> > > > >     return 0;
> > > > > }
> > > > > 
> > > > > -static int read_command_from_device(int streamfd)
> > > > > +int Stream::read_command_from_device()
> > > > > {
> > > > >     StreamDevHeader hdr;
> > > > >     uint8_t msg[64];
> > > > >     int n;
> > > > > 
> > > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > >     n = read(streamfd, &hdr, sizeof(hdr));
> > > > >     if (n != sizeof(hdr)) {
> > > > >         syslog(LOG_WARNING,
> > > > > @@ -155,29 +166,28 @@ static int read_command_from_device(int
> > > > > streamfd)
> > > > >     return 1;
> > > > > }
> > > > > 
> > > > > -static int read_command(int streamfd, bool blocking)
> > > > > +int Stream::read_command(bool blocking)
> > > > > {
> > > > >     int timeout = blocking?-1:0;
> > > > >     while (!quit_requested) {
> > > > > -        if (!have_something_to_read(streamfd, timeout)) {
> > > > > +        if (!have_something_to_read(timeout)) {
> > > > >             if (!blocking) {
> > > > >                 return 0;
> > > > >             }
> > > > >             sleep(1);
> > > > >             continue;
> > > > >         }
> > > > > -        return read_command_from_device(streamfd);
> > > > > +        return read_command_from_device();
> > > > >     }
> > > > > 
> > > > >     return 1;
> > > > > }
> > > > > 
> > > > > -static size_t
> > > > > -write_all(int fd, const void *buf, const size_t len)
> > > > > +size_t Stream::write_all(const void *buf, const size_t len)
> > > > > {
> > > > >     size_t written = 0;
> > > > >     while (written < len) {
> > > > > -        int l = write(fd, (const char *) buf + written, len -
> > > > > written);
> > > > > +        int l = write(streamfd, (const char *) buf + written,
> > > > > len - written);
> > > > >         if (l < 0) {
> > > > >             if (errno == EINTR) {
> > > > >                 continue;
> > > > > @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const
> > > > > size_t len)
> > > > >     return written;
> > > > > }
> > > > > 
> > > > > -static int spice_stream_send_format(int streamfd, unsigned w,
> > > > > unsigned h, uint8_t c)
> > > > > +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> > > > > {
> > > > >     const size_t msgsize = sizeof(FormatMessage);
> > > > >     const size_t hdrsize  = sizeof(StreamDevHeader);
> > > > > @@ -210,14 +220,14 @@ static int spice_stream_send_format(int
> > > > > streamfd, unsigned w, unsigned h, uint8_
> > > > >         }
> > > > >     };
> > > > >     syslog(LOG_DEBUG, "writing format\n");
> > > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > > -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +    if (write_all(&msg, msgsize) != msgsize) {
> > > > >         return EXIT_FAILURE;
> > > > >     }
> > > > >     return EXIT_SUCCESS;
> > > > > }
> > > > > 
> > > > > -static int spice_stream_send_frame(int streamfd, const void
> > > > > *buf, const unsigned size)
> > > > > +int Stream::send_frame(const void *buf, const unsigned size)
> > > > > {
> > > > >     ssize_t n;
> > > > >     const size_t msgsize = sizeof(FormatMessage);
> > > > > @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int
> > > > > streamfd, const void *buf, const unsigned
> > > > >         .msg = {}
> > > > >     };
> > > > > 
> > > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > > -    n = write_all(streamfd, &msg, msgsize);
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +    n = write_all(&msg, msgsize);
> > > > >     syslog(LOG_DEBUG,
> > > > >            "wrote %ld bytes of header of data msg with frame of
> > > > > size %u bytes\n",
> > > > >            n, msg.hdr.size);
> > > > > @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int
> > > > > streamfd, const void *buf, const unsigned
> > > > >                n, msgsize);
> > > > >         return EXIT_FAILURE;
> > > > >     }
> > > > > -    n = write_all(streamfd, buf, size);
> > > > > +    n = write_all(buf, size);
> > > > >     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> > > > >     if (n != size) {
> > > > >         syslog(LOG_WARNING, "write_all header: wrote %ld
> > > > > expected %u\n",
> > > > > @@ -294,11 +304,10 @@ static void usage(const char *progname)
> > > > >     exit(1);
> > > > > }
> > > > > 
> > > > > -static void
> > > > > -send_cursor(int streamfd,
> > > > > -            uint16_t width, uint16_t height,
> > > > > -            uint16_t hotspot_x, uint16_t hotspot_y,
> > > > > -            std::function<void(uint32_t *)> fill_cursor)
> > > > > +void
> > > > > +Stream::send_cursor(uint16_t width, uint16_t height,
> > > > > +                    uint16_t hotspot_x, uint16_t hotspot_y,
> > > > > +                    std::function<void(uint32_t *)>
> > > > > fill_cursor)
> > > > > {
> > > > >     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
> > > > >         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> > > > > @@ -332,11 +341,11 @@ send_cursor(int streamfd,
> > > > >     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg-
> > > > > > msg.data);
> > > > > 
> > > > >     fill_cursor(pixels);
> > > > > 
> > > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > > -    write_all(streamfd, storage.get(), cursor_msgsize);
> > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > +    write_all(storage.get(), cursor_msgsize);
> > > > > }
> > > > > 
> > > > > -static void cursor_changes(int streamfd, Display *display, int
> > > > > event_base)
> > > > > +static void cursor_changes(Stream *stream, Display *display,
> > > > > int event_base)
> > > > > {
> > > > >     unsigned long last_serial = 0;
> > > > > 
> > > > > @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd,
> > > > > Display *display, int event_base)
> > > > >             for (unsigned i = 0; i < cursor->width * cursor-
> > > > > > height; ++i)
> > > > > 
> > > > >                 pixels[i] = cursor->pixels[i];
> > > > >         };
> > > > > -        send_cursor(streamfd,
> > > > > -                    cursor->width, cursor->height, cursor-
> > > > > > xhot, cursor->yhot, fill_cursor);
> > > > > 
> > > > > +        stream->send_cursor(cursor->width, cursor->height,
> > > > > +                            cursor->xhot, cursor->yhot,
> > > > > fill_cursor);
> > > > >     }
> > > > > }
> > > > > 
> > > > > static void
> > > > > -do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > > > +do_capture(Stream &stream, const char *streamport, FILE
> > > > > *f_log)
> > > > > {
> > > > >     unsigned int frame_count = 0;
> > > > >     while (!quit_requested) {
> > > > >         while (!quit_requested && !streaming_requested) {
> > > > > -            if (read_command(streamfd, true) < 0) {
> > > > > +            if (stream.read_command(true) < 0) {
> > > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > > >                 return;
> > > > >             }
> > > > > @@ -413,7 +422,7 @@ do_capture(int streamfd, const char
> > > > > *streamport, FILE *f_log)
> > > > > 
> > > > >                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n",
> > > > > width, height, codec);
> > > > > 
> > > > > -                if (spice_stream_send_format(streamfd, width,
> > > > > height, codec) == EXIT_FAILURE)
> > > > > +                if (stream.send_format(width, height, codec)
> > > > > == EXIT_FAILURE)
> > > > >                     throw std::runtime_error("FAILED to send
> > > > > format message");
> > > > >             }
> > > > >             if (f_log) {
> > > > > @@ -425,13 +434,12 @@ do_capture(int streamfd, const char
> > > > > *streamport, FILE *f_log)
> > > > >                     hexdump(frame.buffer, frame.buffer_size,
> > > > > f_log);
> > > > >                 }
> > > > >             }
> > > > > -            if (spice_stream_send_frame(streamfd,
> > > > > -                                        frame.buffer,
> > > > > frame.buffer_size) == EXIT_FAILURE) {
> > > > > +            if (stream.send_frame(frame.buffer,
> > > > > frame.buffer_size) == EXIT_FAILURE) {
> > > > >                 syslog(LOG_ERR, "FAILED to send a frame\n");
> > > > >                 break;
> > > > >             }
> > > > >             //usleep(1);
> > > > > -            if (read_command(streamfd, false) < 0) {
> > > > > +            if (stream.read_command(false) < 0) {
> > > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > > >                 return;
> > > > >             }
> > > > > @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
> > > > >     XFixesSelectCursorInput(display, rootwindow,
> > > > > XFixesDisplayCursorNotifyMask);
> > > > > 
> > > > >     Stream streamfd(streamport);
> > > > > -    std::thread cursor_th(cursor_changes, (int) streamfd,
> > > > > display, event_base);
> > > > > +    std::thread cursor_th(cursor_changes, &streamfd, display,
> > > > > event_base);
> > > > >     cursor_th.detach();
> > > > > 
> > > > >     int ret = EXIT_SUCCESS;
> > > 
> > > 
> > 
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel@lists.freedesktop.org
> > https://lists.freedesktop.org/mailman/listinfo/spice-devel
On Wed, 2018-02-21 at 10:19 +0100, Lukáš Hrázký wrote:
> > > > I chose short names because I was in that namespace. Otherwise,
> > > > I
> > > > agree with you.
> > > > 
> > > > Do you think that the name is still too vague even within the
> > > > namespace?
> > > 
> > > I think so. Everything in streaming agent is in that namespace,
> > > even
> > > if
> > > it wasn't, you know you're looking at the streaming agent code
> > > and
> > > think of the types in that context. Stream is still a pretty
> > > generic
> > > name, I suppose you could imagine a number of things under it.
> > > 
> > > So what is this class in our case. It handles the socket
> > > communication
> > > over the streaming channel to the server. Is it accurate to call
> > > it a
> > > channel here? If so, maybe StreamingChannel?
> > 
> > It's true that "Stream" in the context of the streaming agent could
> > make you think that it was actually representing the encoded video
> > stream, rather than an encapsulation of a communication channel.
> > But I
> > don't like the name StreamDispatcher. Glib uses the name GIOChannel
> > for
> > something similar. Maybe IOChannel?
> 
> Could be, but at this moment, the class is specific to the streaming
> 'channel', and I'm not sure if we will make it generic in forseeable
> future (it could be done, but since we don't have other channels atm.
> and may never have, it is not necessary). So StreamIOChannel? Then I
> would just leave out the IO as it seems a bit unnecessary and we get
> to
> the StreamChannel/StreamingChannel?


In my option, the problem with the name StreamChannel/StreamingChannel
is that we already use the name StreamChannel in spice-server to refer
to something else: the spice protocol channel. In this case, we are
talking about a communication channel that is used for control and data
transfer between the agent and server. The server then takes the data
that it receives and sends it to the client via spice using a
StreamChannel object. I think it would be good to avoid using the same
name in two different (but closely-related) projects to refer to
different things.

Jonathon