[spice-streaming-agent,v4,5/5] Move all stream-related functions within SpiceStream class

Submitted by Christophe de Dinechin on Nov. 10, 2017, 3:15 p.m.

Details

Message ID 20171110151546.32221-6-christophe@dinechin.org
State New
Headers show
Series "Add a SpiceStream class for stream access" ( rev: 3 ) in Spice

Not browsing as part of any series.

Commit Message

Christophe de Dinechin Nov. 10, 2017, 3:15 p.m.
From: Christophe de Dinechin <dinechin@redhat.com>

This incidentally fixes a race condition processing X events,
where we could possibly start sending cursor events to the
stream before it was actually open.

Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
---
 src/spice-streaming-agent.cpp | 84 ++++++++++++++++++++++++-------------------
 1 file changed, 48 insertions(+), 36 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
index d46e308..f82e874 100644
--- a/src/spice-streaming-agent.cpp
+++ b/src/spice-streaming-agent.cpp
@@ -51,31 +51,45 @@  struct SpiceStreamDataMessage
     StreamMsgData msg;
 };
 
-struct Stream
+class SpiceStream
 {
-    Stream(const char *name, int &fd): fd(fd)
+public:
+    SpiceStream(const char *name): streamfd(open(name, O_RDWR))
     {
-        fd = open(name, O_RDWR);
-        if (fd < 0)
+        if (streamfd < 0)
             throw std::runtime_error("failed to open streaming device");
     }
-    ~Stream()
+    ~SpiceStream()
     {
-        if (fd >= 0)
-            close(fd);
-        fd = -1;
+        close(streamfd);
     }
-    int &fd;
+    
+public:
+    bool have_something_to_read(int *pfd, int timeout);
+    int read_command_from_stdin(void);
+    int read_command_from_device(void);
+    int read_command(bool blocking);
+    int send_format(unsigned w, unsigned h, unsigned c);
+    int send_frame(const void *buf, const unsigned size);
+    void send_cursor(const XFixesCursorImage &image);  
+
+private:
+    size_t write_all(const void *buf, const size_t len);
+
+    SpiceStream(const SpiceStream &) = delete;
+    SpiceStream &operator=(const SpiceStream &) = delete;
+
+private:
+    int streamfd;
 };
 
 static int streaming_requested;
 static bool quit;
-static int streamfd = -1;
 static bool stdin_ok;
 static int log_binary = 0;
 static std::mutex stream_mtx;
 
-static int have_something_to_read(int *pfd, int timeout)
+bool SpiceStream::have_something_to_read(int *pfd, int timeout)
 {
     int nfds;
     struct pollfd pollfds[2] = {
@@ -97,7 +111,7 @@  static int have_something_to_read(int *pfd, int timeout)
     return *pfd != -1;
 }
 
-static int read_command_from_stdin(void)
+int SpiceStream::read_command_from_stdin(void)
 {
     char buffer[64], *p, *save = NULL;
 
@@ -121,7 +135,7 @@  static int read_command_from_stdin(void)
     return 1;
 }
 
-static int read_command_from_device(void)
+int SpiceStream::read_command_from_device(void)
 {
     StreamDevHeader hdr;
     uint8_t msg[64];
@@ -163,7 +177,7 @@  static int read_command_from_device(void)
     return 1;
 }
 
-static int read_command(bool blocking)
+int SpiceStream::read_command(bool blocking)
 {
     int fd, n=1;
     int timeout = blocking?-1:0;
@@ -185,12 +199,11 @@  static int read_command(bool blocking)
     return n;
 }
 
-static size_t
-write_all(int fd, const void *buf, const size_t len)
+size_t SpiceStream::write_all(const void *buf, const size_t len)
 {
     size_t written = 0;
     while (written < len) {
-        int l = write(fd, (const char *) buf + written, len - written);
+        int l = write(streamfd, (const char *) buf + written, len - written);
         if (l < 0 && errno == EINTR) {
             continue;
         }
@@ -204,7 +217,7 @@  write_all(int fd, const void *buf, const size_t len)
     return written;
 }
 
-static int spice_stream_send_format(unsigned w, unsigned h, unsigned c)
+int SpiceStream::send_format(unsigned w, unsigned h, unsigned c)
 {
 
     SpiceStreamFormatMessage msg;
@@ -219,13 +232,13 @@  static int spice_stream_send_format(unsigned w, unsigned h, unsigned c)
     msg.msg.codec = c;
     syslog(LOG_DEBUG, "writing format\n");
     std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    if (write_all(streamfd, &msg, msgsize) != msgsize) {
+    if (write_all(&msg, msgsize) != msgsize) {
         return EXIT_FAILURE;
     }
     return EXIT_SUCCESS;
 }
 
-static int spice_stream_send_frame(const void *buf, const unsigned size)
+int SpiceStream::send_frame(const void *buf, const unsigned size)
 {
     SpiceStreamDataMessage msg;
     const size_t msgsize = sizeof(msg);
@@ -236,7 +249,7 @@  static int spice_stream_send_frame(const void *buf, const unsigned size)
     msg.hdr.type = STREAM_TYPE_DATA;
     msg.hdr.size = size; /* includes only the body? */
     std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    n = write_all(streamfd, &msg, msgsize);
+    n = write_all(&msg, msgsize);
     syslog(LOG_DEBUG,
            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
            n, msg.hdr.size);
@@ -245,7 +258,7 @@  static int spice_stream_send_frame(const void *buf, const unsigned size)
                n, msgsize);
         return EXIT_FAILURE;
     }
-    n = write_all(streamfd, buf, size);
+    n = write_all(buf, size);
     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
     if (n != size) {
         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
@@ -304,7 +317,7 @@  static void usage(const char *progname)
     exit(1);
 }
 
-static void send_cursor(const XFixesCursorImage &image)
+void SpiceStream::send_cursor(const XFixesCursorImage &image)
 {
     if (image.width >= 1024 || image.height >= 1024)
         return;
@@ -334,10 +347,10 @@  static void send_cursor(const XFixesCursorImage &image)
         pixels[i] = image.pixels[i];
 
     std::lock_guard<std::mutex> stream_guard(stream_mtx);
-    write_all(streamfd, msg.get(), cursor_size);
+    write_all(msg.get(), cursor_size);
 }
 
-static void cursor_changes(Display *display, int event_base)
+static void cursor_changes(Display *display, int event_base, SpiceStream *stream)
 {
     unsigned long last_serial = 0;
 
@@ -355,23 +368,25 @@  static void cursor_changes(Display *display, int event_base)
             continue;
 
         last_serial = cursor->cursor_serial;
-        send_cursor(*cursor);
+        stream->send_cursor(*cursor);
     }
 }
 
 static void
-do_capture(const char *streamport, FILE *f_log)
+do_capture(const char *streamport, FILE *f_log, Display *display, int event_base)
 {
     std::unique_ptr<FrameCapture> capture(agent.GetBestFrameCapture());
     if (!capture)
         throw std::runtime_error("cannot find a suitable capture system");
 
-    Stream stream(streamport, streamfd);
+    SpiceStream stream(streamport);
+    std::thread cursor_th(cursor_changes, display, event_base, &stream);
+    cursor_th.detach();
 
     unsigned int frame_count = 0;
     while (! quit) {
         while (!quit && !streaming_requested) {
-            if (read_command(1) < 0) {
+            if (stream.read_command(1) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -406,7 +421,7 @@  do_capture(const char *streamport, FILE *f_log)
 
                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
 
-                if (spice_stream_send_format(width, height, codec) == EXIT_FAILURE)
+                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
                     throw std::runtime_error("FAILED to send format message");
             }
             if (f_log) {
@@ -417,12 +432,12 @@  do_capture(const char *streamport, FILE *f_log)
                     hexdump(frame.buffer, frame.buffer_size, f_log);
                 }
             }
-            if (spice_stream_send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
+            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
                 syslog(LOG_ERR, "FAILED to send a frame\n");
                 break;
             }
             //usleep(1);
-            if (read_command(0) < 0) {
+            if (stream.read_command(0) < 0) {
                 syslog(LOG_ERR, "FAILED to read command\n");
                 return;
             }
@@ -516,12 +531,9 @@  int main(int argc, char* argv[])
     Window rootwindow = DefaultRootWindow(display);
     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
 
-    std::thread cursor_th(cursor_changes, display, event_base);
-    cursor_th.detach();
-
     int ret = EXIT_SUCCESS;
     try {
-        do_capture(streamport, f_log);
+        do_capture(streamport, f_log, display, event_base);
     }
     catch (std::runtime_error &err) {
         syslog(LOG_ERR, "%s\n", err.what());

Comments

> 
> From: Christophe de Dinechin <dinechin@redhat.com>
> 
> This incidentally fixes a race condition processing X events,
> where we could possibly start sending cursor events to the
> stream before it was actually open.
> 
> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 84
>  ++++++++++++++++++++++++-------------------
>  1 file changed, 48 insertions(+), 36 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index d46e308..f82e874 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -51,31 +51,45 @@ struct SpiceStreamDataMessage
>      StreamMsgData msg;
>  };
>  
> -struct Stream
> +class SpiceStream
>  {
> -    Stream(const char *name, int &fd): fd(fd)
> +public:
> +    SpiceStream(const char *name): streamfd(open(name, O_RDWR))
>      {
> -        fd = open(name, O_RDWR);
> -        if (fd < 0)
> +        if (streamfd < 0)
>              throw std::runtime_error("failed to open streaming device");
>      }
> -    ~Stream()
> +    ~SpiceStream()
>      {
> -        if (fd >= 0)
> -            close(fd);
> -        fd = -1;
> +        close(streamfd);
>      }
> -    int &fd;
> +
> +public:
> +    bool have_something_to_read(int *pfd, int timeout);
> +    int read_command_from_stdin(void);

There are dependent on some program state, this is a bad encapsulation.

> +    int read_command_from_device(void);
> +    int read_command(bool blocking);
> +    int send_format(unsigned w, unsigned h, unsigned c);
> +    int send_frame(const void *buf, const unsigned size);
> +    void send_cursor(const XFixesCursorImage &image);

I would attempt to remove X11 dependency.

> +
> +private:
> +    size_t write_all(const void *buf, const size_t len);

Why? Old write_all could be reused for sockets or any other
file descriptor.

> +
> +    SpiceStream(const SpiceStream &) = delete;
> +    SpiceStream &operator=(const SpiceStream &) = delete;
> +
> +private:
> +    int streamfd;

The class is Stream/SpiceStream, why repeating the stream part
having a SpiceStream::streamfd ?

While we are building a class should we use C style naming
(this_is_a_function) or some camel case ?

Should we use Spice in the names or better to use namespaces?

Maybe would be better to move the class into separate files.

>  };
>  
>  static int streaming_requested;
>  static bool quit;
> -static int streamfd = -1;
>  static bool stdin_ok;
>  static int log_binary = 0;
>  static std::mutex stream_mtx;
>  
> -static int have_something_to_read(int *pfd, int timeout)
> +bool SpiceStream::have_something_to_read(int *pfd, int timeout)
>  {
>      int nfds;
>      struct pollfd pollfds[2] = {
> @@ -97,7 +111,7 @@ static int have_something_to_read(int *pfd, int timeout)
>      return *pfd != -1;
>  }
>  
> -static int read_command_from_stdin(void)
> +int SpiceStream::read_command_from_stdin(void)
>  {
>      char buffer[64], *p, *save = NULL;
>  
> @@ -121,7 +135,7 @@ static int read_command_from_stdin(void)
>      return 1;
>  }
>  
> -static int read_command_from_device(void)
> +int SpiceStream::read_command_from_device(void)
>  {
>      StreamDevHeader hdr;
>      uint8_t msg[64];
> @@ -163,7 +177,7 @@ static int read_command_from_device(void)
>      return 1;
>  }
>  
> -static int read_command(bool blocking)
> +int SpiceStream::read_command(bool blocking)
>  {
>      int fd, n=1;
>      int timeout = blocking?-1:0;
> @@ -185,12 +199,11 @@ static int read_command(bool blocking)
>      return n;
>  }
>  
> -static size_t
> -write_all(int fd, const void *buf, const size_t len)
> +size_t SpiceStream::write_all(const void *buf, const size_t len)
>  {
>      size_t written = 0;
>      while (written < len) {
> -        int l = write(fd, (const char *) buf + written, len - written);
> +        int l = write(streamfd, (const char *) buf + written, len -
> written);
>          if (l < 0 && errno == EINTR) {
>              continue;
>          }
> @@ -204,7 +217,7 @@ write_all(int fd, const void *buf, const size_t len)
>      return written;
>  }
>  
> -static int spice_stream_send_format(unsigned w, unsigned h, unsigned c)
> +int SpiceStream::send_format(unsigned w, unsigned h, unsigned c)
>  {
>  
>      SpiceStreamFormatMessage msg;
> @@ -219,13 +232,13 @@ static int spice_stream_send_format(unsigned w,
> unsigned h, unsigned c)
>      msg.msg.codec = c;
>      syslog(LOG_DEBUG, "writing format\n");
>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> +    if (write_all(&msg, msgsize) != msgsize) {
>          return EXIT_FAILURE;
>      }
>      return EXIT_SUCCESS;
>  }
>  
> -static int spice_stream_send_frame(const void *buf, const unsigned size)
> +int SpiceStream::send_frame(const void *buf, const unsigned size)
>  {
>      SpiceStreamDataMessage msg;
>      const size_t msgsize = sizeof(msg);
> @@ -236,7 +249,7 @@ static int spice_stream_send_frame(const void *buf, const
> unsigned size)
>      msg.hdr.type = STREAM_TYPE_DATA;
>      msg.hdr.size = size; /* includes only the body? */
>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    n = write_all(streamfd, &msg, msgsize);
> +    n = write_all(&msg, msgsize);
>      syslog(LOG_DEBUG,
>             "wrote %ld bytes of header of data msg with frame of size %u
>             bytes\n",
>             n, msg.hdr.size);
> @@ -245,7 +258,7 @@ static int spice_stream_send_frame(const void *buf, const
> unsigned size)
>                 n, msgsize);
>          return EXIT_FAILURE;
>      }
> -    n = write_all(streamfd, buf, size);
> +    n = write_all(buf, size);
>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>      if (n != size) {
>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> @@ -304,7 +317,7 @@ static void usage(const char *progname)
>      exit(1);
>  }
>  
> -static void send_cursor(const XFixesCursorImage &image)
> +void SpiceStream::send_cursor(const XFixesCursorImage &image)
>  {
>      if (image.width >= 1024 || image.height >= 1024)
>          return;
> @@ -334,10 +347,10 @@ static void send_cursor(const XFixesCursorImage &image)
>          pixels[i] = image.pixels[i];
>  
>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    write_all(streamfd, msg.get(), cursor_size);
> +    write_all(msg.get(), cursor_size);
>  }
>  
> -static void cursor_changes(Display *display, int event_base)
> +static void cursor_changes(Display *display, int event_base, SpiceStream
> *stream)
>  {
>      unsigned long last_serial = 0;
>  
> @@ -355,23 +368,25 @@ static void cursor_changes(Display *display, int
> event_base)
>              continue;
>  
>          last_serial = cursor->cursor_serial;
> -        send_cursor(*cursor);
> +        stream->send_cursor(*cursor);
>      }
>  }
>  
>  static void
> -do_capture(const char *streamport, FILE *f_log)
> +do_capture(const char *streamport, FILE *f_log, Display *display, int
> event_base)
>  {
>      std::unique_ptr<FrameCapture> capture(agent.GetBestFrameCapture());
>      if (!capture)
>          throw std::runtime_error("cannot find a suitable capture system");
>  
> -    Stream stream(streamport, streamfd);
> +    SpiceStream stream(streamport);
> +    std::thread cursor_th(cursor_changes, display, event_base, &stream);
> +    cursor_th.detach();
>  
>      unsigned int frame_count = 0;
>      while (! quit) {
>          while (!quit && !streaming_requested) {
> -            if (read_command(1) < 0) {
> +            if (stream.read_command(1) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -406,7 +421,7 @@ do_capture(const char *streamport, FILE *f_log)
>  
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
>                  codec);
>  
> -                if (spice_stream_send_format(width, height, codec) ==
> EXIT_FAILURE)
> +                if (stream.send_format(width, height, codec) ==
> EXIT_FAILURE)
>                      throw std::runtime_error("FAILED to send format
>                      message");
>              }
>              if (f_log) {
> @@ -417,12 +432,12 @@ do_capture(const char *streamport, FILE *f_log)
>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>                  }
>              }
> -            if (spice_stream_send_frame(frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
>                  syslog(LOG_ERR, "FAILED to send a frame\n");
>                  break;
>              }
>              //usleep(1);
> -            if (read_command(0) < 0) {
> +            if (stream.read_command(0) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -516,12 +531,9 @@ int main(int argc, char* argv[])
>      Window rootwindow = DefaultRootWindow(display);
>      XFixesSelectCursorInput(display, rootwindow,
>      XFixesDisplayCursorNotifyMask);
>  
> -    std::thread cursor_th(cursor_changes, display, event_base);
> -    cursor_th.detach();
> -
>      int ret = EXIT_SUCCESS;
>      try {
> -        do_capture(streamport, f_log);
> +        do_capture(streamport, f_log, display, event_base);
>      }
>      catch (std::runtime_error &err) {
>          syslog(LOG_ERR, "%s\n", err.what());

Frediano
Frediano Ziglio writes:

>>
>> From: Christophe de Dinechin <dinechin@redhat.com>
>>
>> This incidentally fixes a race condition processing X events,
>> where we could possibly start sending cursor events to the
>> stream before it was actually open.
>>
>> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
>> ---
>>  src/spice-streaming-agent.cpp | 84
>>  ++++++++++++++++++++++++-------------------
>>  1 file changed, 48 insertions(+), 36 deletions(-)
>>
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index d46e308..f82e874 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -51,31 +51,45 @@ struct SpiceStreamDataMessage
>>      StreamMsgData msg;
>>  };
>>
>> -struct Stream
>> +class SpiceStream
>>  {
>> -    Stream(const char *name, int &fd): fd(fd)
>> +public:
>> +    SpiceStream(const char *name): streamfd(open(name, O_RDWR))
>>      {
>> -        fd = open(name, O_RDWR);
>> -        if (fd < 0)
>> +        if (streamfd < 0)
>>              throw std::runtime_error("failed to open streaming device");
>>      }
>> -    ~Stream()
>> +    ~SpiceStream()
>>      {
>> -        if (fd >= 0)
>> -            close(fd);
>> -        fd = -1;
>> +        close(streamfd);
>>      }
>> -    int &fd;
>> +
>> +public:
>> +    bool have_something_to_read(int *pfd, int timeout);
>> +    int read_command_from_stdin(void);
>
> There are dependent on some program state, this is a bad
> encapsulation.

I noticed that too, but to me, that belongs to a follow-up patch, which
is in progress but not ready yet.

>
>> +    int read_command_from_device(void);
>> +    int read_command(bool blocking);
>> +    int send_format(unsigned w, unsigned h, unsigned c);
>> +    int send_frame(const void *buf, const unsigned size);
>> +    void send_cursor(const XFixesCursorImage &image);
>
> I would attempt to remove X11 dependency.

Same here. I wonder how much of the existing code would work for a
future Windows agent. Has anybody looked at why the NVIDIA headers look
like for Windows?
>
>> +
>> +private:
>> +    size_t write_all(const void *buf, const size_t len);
>
> Why? Old write_all could be reused for sockets or any other
> file descriptor.

Yes but the callers were all using it only for streamfd, and I don't see
a future use with another stream in the context of the streaming agent.0

>
>> +
>> +    SpiceStream(const SpiceStream &) = delete;
>> +    SpiceStream &operator=(const SpiceStream &) = delete;
>> +
>> +private:
>> +    int streamfd;
>
> The class is Stream/SpiceStream, why repeating the stream part
> having a SpiceStream::streamfd ?

Minimizing number of useless diffs ;-) I could do the renaming in a
subsequent patch if you think it adds value, but I don't see much value
myself, so I didn't do it here.

Also, I never write SpiceStream::streamfd in the code. What is there is
"streamfd", and knowing we have functions that also deal with other fds
(not for long, maybe, but that's still the case today), I think it's
easier on the reader.

>
> While we are building a class should we use C style naming
> (this_is_a_function) or some camel case ?

I would favor camelcase, but I would rather do it in a separate
name-change-only patch.

>
> Should we use Spice in the names or better to use namespaces?

Actually, I was wondering why you had used anonymous namespaces for
nvidia. I think that it makes error messages a bit annoying ;-)

>
> Maybe would be better to move the class into separate files.

Yes, but definitely a follow-up patch, so that it's easier to review.

>
>>  };
>>
>>  static int streaming_requested;
>>  static bool quit;
>> -static int streamfd = -1;
>>  static bool stdin_ok;
>>  static int log_binary = 0;
>>  static std::mutex stream_mtx;
>>
>> -static int have_something_to_read(int *pfd, int timeout)
>> +bool SpiceStream::have_something_to_read(int *pfd, int timeout)
>>  {
>>      int nfds;
>>      struct pollfd pollfds[2] = {
>> @@ -97,7 +111,7 @@ static int have_something_to_read(int *pfd, int timeout)
>>      return *pfd != -1;
>>  }
>>
>> -static int read_command_from_stdin(void)
>> +int SpiceStream::read_command_from_stdin(void)
>>  {
>>      char buffer[64], *p, *save = NULL;
>>
>> @@ -121,7 +135,7 @@ static int read_command_from_stdin(void)
>>      return 1;
>>  }
>>
>> -static int read_command_from_device(void)
>> +int SpiceStream::read_command_from_device(void)
>>  {
>>      StreamDevHeader hdr;
>>      uint8_t msg[64];
>> @@ -163,7 +177,7 @@ static int read_command_from_device(void)
>>      return 1;
>>  }
>>
>> -static int read_command(bool blocking)
>> +int SpiceStream::read_command(bool blocking)
>>  {
>>      int fd, n=1;
>>      int timeout = blocking?-1:0;
>> @@ -185,12 +199,11 @@ static int read_command(bool blocking)
>>      return n;
>>  }
>>
>> -static size_t
>> -write_all(int fd, const void *buf, const size_t len)
>> +size_t SpiceStream::write_all(const void *buf, const size_t len)
>>  {
>>      size_t written = 0;
>>      while (written < len) {
>> -        int l = write(fd, (const char *) buf + written, len - written);
>> +        int l = write(streamfd, (const char *) buf + written, len -
>> written);
>>          if (l < 0 && errno == EINTR) {
>>              continue;
>>          }
>> @@ -204,7 +217,7 @@ write_all(int fd, const void *buf, const size_t len)
>>      return written;
>>  }
>>
>> -static int spice_stream_send_format(unsigned w, unsigned h, unsigned c)
>> +int SpiceStream::send_format(unsigned w, unsigned h, unsigned c)
>>  {
>>
>>      SpiceStreamFormatMessage msg;
>> @@ -219,13 +232,13 @@ static int spice_stream_send_format(unsigned w,
>> unsigned h, unsigned c)
>>      msg.msg.codec = c;
>>      syslog(LOG_DEBUG, "writing format\n");
>>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
>> +    if (write_all(&msg, msgsize) != msgsize) {
>>          return EXIT_FAILURE;
>>      }
>>      return EXIT_SUCCESS;
>>  }
>>
>> -static int spice_stream_send_frame(const void *buf, const unsigned size)
>> +int SpiceStream::send_frame(const void *buf, const unsigned size)
>>  {
>>      SpiceStreamDataMessage msg;
>>      const size_t msgsize = sizeof(msg);
>> @@ -236,7 +249,7 @@ static int spice_stream_send_frame(const void *buf, const
>> unsigned size)
>>      msg.hdr.type = STREAM_TYPE_DATA;
>>      msg.hdr.size = size; /* includes only the body? */
>>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    n = write_all(streamfd, &msg, msgsize);
>> +    n = write_all(&msg, msgsize);
>>      syslog(LOG_DEBUG,
>>             "wrote %ld bytes of header of data msg with frame of size %u
>>             bytes\n",
>>             n, msg.hdr.size);
>> @@ -245,7 +258,7 @@ static int spice_stream_send_frame(const void *buf, const
>> unsigned size)
>>                 n, msgsize);
>>          return EXIT_FAILURE;
>>      }
>> -    n = write_all(streamfd, buf, size);
>> +    n = write_all(buf, size);
>>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>>      if (n != size) {
>>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
>> @@ -304,7 +317,7 @@ static void usage(const char *progname)
>>      exit(1);
>>  }
>>
>> -static void send_cursor(const XFixesCursorImage &image)
>> +void SpiceStream::send_cursor(const XFixesCursorImage &image)
>>  {
>>      if (image.width >= 1024 || image.height >= 1024)
>>          return;
>> @@ -334,10 +347,10 @@ static void send_cursor(const XFixesCursorImage &image)
>>          pixels[i] = image.pixels[i];
>>
>>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    write_all(streamfd, msg.get(), cursor_size);
>> +    write_all(msg.get(), cursor_size);
>>  }
>>
>> -static void cursor_changes(Display *display, int event_base)
>> +static void cursor_changes(Display *display, int event_base, SpiceStream
>> *stream)
>>  {
>>      unsigned long last_serial = 0;
>>
>> @@ -355,23 +368,25 @@ static void cursor_changes(Display *display, int
>> event_base)
>>              continue;
>>
>>          last_serial = cursor->cursor_serial;
>> -        send_cursor(*cursor);
>> +        stream->send_cursor(*cursor);
>>      }
>>  }
>>
>>  static void
>> -do_capture(const char *streamport, FILE *f_log)
>> +do_capture(const char *streamport, FILE *f_log, Display *display, int
>> event_base)
>>  {
>>      std::unique_ptr<FrameCapture> capture(agent.GetBestFrameCapture());
>>      if (!capture)
>>          throw std::runtime_error("cannot find a suitable capture system");
>>
>> -    Stream stream(streamport, streamfd);
>> +    SpiceStream stream(streamport);
>> +    std::thread cursor_th(cursor_changes, display, event_base, &stream);
>> +    cursor_th.detach();
>>
>>      unsigned int frame_count = 0;
>>      while (! quit) {
>>          while (!quit && !streaming_requested) {
>> -            if (read_command(1) < 0) {
>> +            if (stream.read_command(1) < 0) {
>>                  syslog(LOG_ERR, "FAILED to read command\n");
>>                  return;
>>              }
>> @@ -406,7 +421,7 @@ do_capture(const char *streamport, FILE *f_log)
>>
>>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
>>                  codec);
>>
>> -                if (spice_stream_send_format(width, height, codec) ==
>> EXIT_FAILURE)
>> +                if (stream.send_format(width, height, codec) ==
>> EXIT_FAILURE)
>>                      throw std::runtime_error("FAILED to send format
>>                      message");
>>              }
>>              if (f_log) {
>> @@ -417,12 +432,12 @@ do_capture(const char *streamport, FILE *f_log)
>>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>>                  }
>>              }
>> -            if (spice_stream_send_frame(frame.buffer, frame.buffer_size) ==
>> EXIT_FAILURE) {
>> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
>> EXIT_FAILURE) {
>>                  syslog(LOG_ERR, "FAILED to send a frame\n");
>>                  break;
>>              }
>>              //usleep(1);
>> -            if (read_command(0) < 0) {
>> +            if (stream.read_command(0) < 0) {
>>                  syslog(LOG_ERR, "FAILED to read command\n");
>>                  return;
>>              }
>> @@ -516,12 +531,9 @@ int main(int argc, char* argv[])
>>      Window rootwindow = DefaultRootWindow(display);
>>      XFixesSelectCursorInput(display, rootwindow,
>>      XFixesDisplayCursorNotifyMask);
>>
>> -    std::thread cursor_th(cursor_changes, display, event_base);
>> -    cursor_th.detach();
>> -
>>      int ret = EXIT_SUCCESS;
>>      try {
>> -        do_capture(streamport, f_log);
>> +        do_capture(streamport, f_log, display, event_base);
>>      }
>>      catch (std::runtime_error &err) {
>>          syslog(LOG_ERR, "%s\n", err.what());
>
> Frediano


--
Cheers,
Christophe de Dinechin (IRC c3d)
> Frediano Ziglio writes:
> 
> >>
> >> From: Christophe de Dinechin <dinechin@redhat.com>
> >>
> >> This incidentally fixes a race condition processing X events,
> >> where we could possibly start sending cursor events to the
> >> stream before it was actually open.
> >>
> >> Signed-off-by: Christophe de Dinechin <dinechin@redhat.com>
> >> ---
> >>  src/spice-streaming-agent.cpp | 84
> >>  ++++++++++++++++++++++++-------------------
> >>  1 file changed, 48 insertions(+), 36 deletions(-)
> >>
> >> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> >> index d46e308..f82e874 100644
> >> --- a/src/spice-streaming-agent.cpp
> >> +++ b/src/spice-streaming-agent.cpp
> >> @@ -51,31 +51,45 @@ struct SpiceStreamDataMessage
> >>      StreamMsgData msg;
> >>  };
> >>
> >> -struct Stream
> >> +class SpiceStream
> >>  {
> >> -    Stream(const char *name, int &fd): fd(fd)
> >> +public:
> >> +    SpiceStream(const char *name): streamfd(open(name, O_RDWR))
> >>      {
> >> -        fd = open(name, O_RDWR);
> >> -        if (fd < 0)
> >> +        if (streamfd < 0)
> >>              throw std::runtime_error("failed to open streaming device");
> >>      }
> >> -    ~Stream()
> >> +    ~SpiceStream()
> >>      {
> >> -        if (fd >= 0)
> >> -            close(fd);
> >> -        fd = -1;
> >> +        close(streamfd);
> >>      }
> >> -    int &fd;
> >> +
> >> +public:
> >> +    bool have_something_to_read(int *pfd, int timeout);
> >> +    int read_command_from_stdin(void);
> >
> > There are dependent on some program state, this is a bad
> > encapsulation.
> 
> I noticed that too, but to me, that belongs to a follow-up patch, which
> is in progress but not ready yet.

I have a file with:


#ifndef STREAMING_AGENT_STREAM_DEVICE_HPP
#define STREAMING_AGENT_STREAM_DEVICE_HPP

#include <cstdio>

/*!
 * Pure base class implementing the device
 */
class StreamDevice
{
public:
    virtual ~StreamDevice()=default;
    /*! Send streaming format
     * Must be send before first frame or when the size or encoding change
     */
    virtual void SendFormat(unsigned width, unsigned height, unsigned encoding)=0;
    /*! Send single frame data
     */
    virtual void SendFrame(const void *data, size_t data_size)=0;
    /*! Read a message from the server and handle it.
     * This function is not blocking.
     * @return true if some messages was readed and handled
     */
    virtual bool HandleMessage()=0;
protected:
    StreamDevice()=default;
    StreamDevice(const StreamDevice&)=delete;
    void operator=(const StreamDevice&)=delete;
};

#endif // STREAMING_AGENT_STREAM_DEVICE_HPP


does not seem impossible to get a proper interface for the device.
The only missing part I can see above is how to read the messages from the
device.

Frediano

> 
> >
> >> +    int read_command_from_device(void);
> >> +    int read_command(bool blocking);
> >> +    int send_format(unsigned w, unsigned h, unsigned c);
> >> +    int send_frame(const void *buf, const unsigned size);
> >> +    void send_cursor(const XFixesCursorImage &image);
> >
> > I would attempt to remove X11 dependency.
> 
> Same here. I wonder how much of the existing code would work for a
> future Windows agent. Has anybody looked at why the NVIDIA headers look
> like for Windows?
> >
> >> +
> >> +private:
> >> +    size_t write_all(const void *buf, const size_t len);
> >
> > Why? Old write_all could be reused for sockets or any other
> > file descriptor.
> 
> Yes but the callers were all using it only for streamfd, and I don't see
> a future use with another stream in the context of the streaming agent.0
> 
> >
> >> +
> >> +    SpiceStream(const SpiceStream &) = delete;
> >> +    SpiceStream &operator=(const SpiceStream &) = delete;
> >> +
> >> +private:
> >> +    int streamfd;
> >
> > The class is Stream/SpiceStream, why repeating the stream part
> > having a SpiceStream::streamfd ?
> 
> Minimizing number of useless diffs ;-) I could do the renaming in a
> subsequent patch if you think it adds value, but I don't see much value
> myself, so I didn't do it here.
> 
> Also, I never write SpiceStream::streamfd in the code. What is there is
> "streamfd", and knowing we have functions that also deal with other fds
> (not for long, maybe, but that's still the case today), I think it's
> easier on the reader.
> 
> >
> > While we are building a class should we use C style naming
> > (this_is_a_function) or some camel case ?
> 
> I would favor camelcase, but I would rather do it in a separate
> name-change-only patch.
> 
> >
> > Should we use Spice in the names or better to use namespaces?
> 
> Actually, I was wondering why you had used anonymous namespaces for
> nvidia. I think that it makes error messages a bit annoying ;-)
> 
> >
> > Maybe would be better to move the class into separate files.
> 
> Yes, but definitely a follow-up patch, so that it's easier to review.
> 
> >
> >>  };
> >>
> >>  static int streaming_requested;
> >>  static bool quit;
> >> -static int streamfd = -1;
> >>  static bool stdin_ok;
> >>  static int log_binary = 0;
> >>  static std::mutex stream_mtx;
> >>
> >> -static int have_something_to_read(int *pfd, int timeout)
> >> +bool SpiceStream::have_something_to_read(int *pfd, int timeout)
> >>  {
> >>      int nfds;
> >>      struct pollfd pollfds[2] = {
> >> @@ -97,7 +111,7 @@ static int have_something_to_read(int *pfd, int
> >> timeout)
> >>      return *pfd != -1;
> >>  }
> >>
> >> -static int read_command_from_stdin(void)
> >> +int SpiceStream::read_command_from_stdin(void)
> >>  {
> >>      char buffer[64], *p, *save = NULL;
> >>
> >> @@ -121,7 +135,7 @@ static int read_command_from_stdin(void)
> >>      return 1;
> >>  }
> >>
> >> -static int read_command_from_device(void)
> >> +int SpiceStream::read_command_from_device(void)
> >>  {
> >>      StreamDevHeader hdr;
> >>      uint8_t msg[64];
> >> @@ -163,7 +177,7 @@ static int read_command_from_device(void)
> >>      return 1;
> >>  }
> >>
> >> -static int read_command(bool blocking)
> >> +int SpiceStream::read_command(bool blocking)
> >>  {
> >>      int fd, n=1;
> >>      int timeout = blocking?-1:0;
> >> @@ -185,12 +199,11 @@ static int read_command(bool blocking)
> >>      return n;
> >>  }
> >>
> >> -static size_t
> >> -write_all(int fd, const void *buf, const size_t len)
> >> +size_t SpiceStream::write_all(const void *buf, const size_t len)
> >>  {
> >>      size_t written = 0;
> >>      while (written < len) {
> >> -        int l = write(fd, (const char *) buf + written, len - written);
> >> +        int l = write(streamfd, (const char *) buf + written, len -
> >> written);
> >>          if (l < 0 && errno == EINTR) {
> >>              continue;
> >>          }
> >> @@ -204,7 +217,7 @@ write_all(int fd, const void *buf, const size_t len)
> >>      return written;
> >>  }
> >>
> >> -static int spice_stream_send_format(unsigned w, unsigned h, unsigned c)
> >> +int SpiceStream::send_format(unsigned w, unsigned h, unsigned c)
> >>  {
> >>
> >>      SpiceStreamFormatMessage msg;
> >> @@ -219,13 +232,13 @@ static int spice_stream_send_format(unsigned w,
> >> unsigned h, unsigned c)
> >>      msg.msg.codec = c;
> >>      syslog(LOG_DEBUG, "writing format\n");
> >>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> >> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> >> +    if (write_all(&msg, msgsize) != msgsize) {
> >>          return EXIT_FAILURE;
> >>      }
> >>      return EXIT_SUCCESS;
> >>  }
> >>
> >> -static int spice_stream_send_frame(const void *buf, const unsigned size)
> >> +int SpiceStream::send_frame(const void *buf, const unsigned size)
> >>  {
> >>      SpiceStreamDataMessage msg;
> >>      const size_t msgsize = sizeof(msg);
> >> @@ -236,7 +249,7 @@ static int spice_stream_send_frame(const void *buf,
> >> const
> >> unsigned size)
> >>      msg.hdr.type = STREAM_TYPE_DATA;
> >>      msg.hdr.size = size; /* includes only the body? */
> >>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> >> -    n = write_all(streamfd, &msg, msgsize);
> >> +    n = write_all(&msg, msgsize);
> >>      syslog(LOG_DEBUG,
> >>             "wrote %ld bytes of header of data msg with frame of size %u
> >>             bytes\n",
> >>             n, msg.hdr.size);
> >> @@ -245,7 +258,7 @@ static int spice_stream_send_frame(const void *buf,
> >> const
> >> unsigned size)
> >>                 n, msgsize);
> >>          return EXIT_FAILURE;
> >>      }
> >> -    n = write_all(streamfd, buf, size);
> >> +    n = write_all(buf, size);
> >>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> >>      if (n != size) {
> >>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> >> @@ -304,7 +317,7 @@ static void usage(const char *progname)
> >>      exit(1);
> >>  }
> >>
> >> -static void send_cursor(const XFixesCursorImage &image)
> >> +void SpiceStream::send_cursor(const XFixesCursorImage &image)
> >>  {
> >>      if (image.width >= 1024 || image.height >= 1024)
> >>          return;
> >> @@ -334,10 +347,10 @@ static void send_cursor(const XFixesCursorImage
> >> &image)
> >>          pixels[i] = image.pixels[i];
> >>
> >>      std::lock_guard<std::mutex> stream_guard(stream_mtx);
> >> -    write_all(streamfd, msg.get(), cursor_size);
> >> +    write_all(msg.get(), cursor_size);
> >>  }
> >>
> >> -static void cursor_changes(Display *display, int event_base)
> >> +static void cursor_changes(Display *display, int event_base, SpiceStream
> >> *stream)
> >>  {
> >>      unsigned long last_serial = 0;
> >>
> >> @@ -355,23 +368,25 @@ static void cursor_changes(Display *display, int
> >> event_base)
> >>              continue;
> >>
> >>          last_serial = cursor->cursor_serial;
> >> -        send_cursor(*cursor);
> >> +        stream->send_cursor(*cursor);
> >>      }
> >>  }
> >>
> >>  static void
> >> -do_capture(const char *streamport, FILE *f_log)
> >> +do_capture(const char *streamport, FILE *f_log, Display *display, int
> >> event_base)
> >>  {
> >>      std::unique_ptr<FrameCapture> capture(agent.GetBestFrameCapture());
> >>      if (!capture)
> >>          throw std::runtime_error("cannot find a suitable capture
> >>          system");
> >>
> >> -    Stream stream(streamport, streamfd);
> >> +    SpiceStream stream(streamport);
> >> +    std::thread cursor_th(cursor_changes, display, event_base, &stream);
> >> +    cursor_th.detach();
> >>
> >>      unsigned int frame_count = 0;
> >>      while (! quit) {
> >>          while (!quit && !streaming_requested) {
> >> -            if (read_command(1) < 0) {
> >> +            if (stream.read_command(1) < 0) {
> >>                  syslog(LOG_ERR, "FAILED to read command\n");
> >>                  return;
> >>              }
> >> @@ -406,7 +421,7 @@ do_capture(const char *streamport, FILE *f_log)
> >>
> >>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
> >>                  codec);
> >>
> >> -                if (spice_stream_send_format(width, height, codec) ==
> >> EXIT_FAILURE)
> >> +                if (stream.send_format(width, height, codec) ==
> >> EXIT_FAILURE)
> >>                      throw std::runtime_error("FAILED to send format
> >>                      message");
> >>              }
> >>              if (f_log) {
> >> @@ -417,12 +432,12 @@ do_capture(const char *streamport, FILE *f_log)
> >>                      hexdump(frame.buffer, frame.buffer_size, f_log);
> >>                  }
> >>              }
> >> -            if (spice_stream_send_frame(frame.buffer, frame.buffer_size)
> >> ==
> >> EXIT_FAILURE) {
> >> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
> >> EXIT_FAILURE) {
> >>                  syslog(LOG_ERR, "FAILED to send a frame\n");
> >>                  break;
> >>              }
> >>              //usleep(1);
> >> -            if (read_command(0) < 0) {
> >> +            if (stream.read_command(0) < 0) {
> >>                  syslog(LOG_ERR, "FAILED to read command\n");
> >>                  return;
> >>              }
> >> @@ -516,12 +531,9 @@ int main(int argc, char* argv[])
> >>      Window rootwindow = DefaultRootWindow(display);
> >>      XFixesSelectCursorInput(display, rootwindow,
> >>      XFixesDisplayCursorNotifyMask);
> >>
> >> -    std::thread cursor_th(cursor_changes, display, event_base);
> >> -    cursor_th.detach();
> >> -
> >>      int ret = EXIT_SUCCESS;
> >>      try {
> >> -        do_capture(streamport, f_log);
> >> +        do_capture(streamport, f_log, display, event_base);
> >>      }
> >>      catch (std::runtime_error &err) {
> >>          syslog(LOG_ERR, "%s\n", err.what());
> >