[Spice-devel,RFC] server: add main_dispatcher

Submitted by Alon Levy on Sept. 8, 2011, 6:17 a.m.

Details

Message ID 1315437469-32681-1-git-send-email-alevy@redhat.com
State New, archived
Headers show

Not browsing as part of any series.

Commit Message

Alon Levy Sept. 8, 2011, 6:17 a.m.
---
 server/Makefile.am       |    2 +
 server/main_dispatcher.c |  132 ++++++++++++++++++++++++++++++++++++++++++++++
 server/main_dispatcher.h |    9 +++
 server/reds.c            |    5 ++-
 4 files changed, 147 insertions(+), 1 deletions(-)
 create mode 100644 server/main_dispatcher.c
 create mode 100644 server/main_dispatcher.h

Patch hide | download patch | download mbox

diff --git a/server/Makefile.am b/server/Makefile.am
index a7cdd84..f1d4052 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -80,6 +80,8 @@  libspice_server_la_SOURCES =			\
 	red_common.h				\
 	red_dispatcher.c			\
 	red_dispatcher.h			\
+	main_dispatcher.c           \
+	main_dispatcher.h           \
 	red_memslots.c				\
 	red_memslots.h				\
 	red_parse_qxl.c				\
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
new file mode 100644
index 0000000..8cdb2e0
--- /dev/null
+++ b/server/main_dispatcher.c
@@ -0,0 +1,132 @@ 
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include "common/spice_common.h"
+#include "main_dispatcher.h"
+
+/*
+ * Main Dispatcher
+ * ===============
+ *
+ * Communication channel between any non main thread and the main thread.
+ *
+ * The main thread is that from which spice_server_init is called.
+ *
+ * Messages are single sized, sent from the non-main thread to the main-thread.
+ * No acknowledge is sent back. This prevents a possible deadlock with the main
+ * thread already waiting on a response for the existing red_dispatcher used
+ * by the worker thread.
+ *
+ * All events have three functions:
+ * main_dispatcher_<event_name> - non static, public function
+ * main_dispatcher_self_<event_name> - handler for main thread
+ * main_dispatcher_handle_<event_name> - handler for callback from main thread
+ *   seperate from self because it may send an ack or do other work in the future.
+ */
+
+typedef struct {
+    SpiceCoreInterface *core;
+    int main_fd;
+    int other_fd;
+    pthread_t self;
+} MainDispatcher;
+
+MainDispatcher main_dispatcher;
+
+enum {
+    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
+
+    MAIN_DISPATCHER_NUM_MESSAGES
+};
+
+typedef struct MainDispatcherMessage {
+    uint32_t type;
+    union {
+        struct {
+            int event;
+            SpiceChannelEventInfo info;
+        } channel_event;
+    } data;
+} MainDispatcherMessage;
+
+
+/* channel_event - calls core->channel_event, must be done in main thread */
+static void main_dispatcher_self_handle_channel_event(int event,
+    SpiceChannelEventInfo *info)
+{
+    main_dispatcher.core->channel_event(event, info);
+}
+
+static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+{
+    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
+        &msg->data.channel_event.info);
+}
+
+void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
+{
+    MainDispatcherMessage msg;
+
+    if (pthread_self() == main_dispatcher.self) {
+        main_dispatcher_self_handle_channel_event(event, info);
+        return;
+    }
+    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
+    msg.data.channel_event.event = event;
+    msg.data.channel_event.info = *info;
+    write(main_dispatcher.other_fd, &msg, sizeof(msg));
+}
+
+
+static void main_dispatcher_handle_read(int fd, int event, void *opaque)
+{
+    int ret;
+    MainDispatcher *md = opaque;
+    MainDispatcherMessage msg;
+
+    ret = read(md->main_fd, &msg, sizeof(msg));
+    if (ret == -1) {
+        if (errno == EAGAIN) {
+            return;
+        } else {
+            red_printf("error reading from main dispatcher: %d\n", errno);
+            /* TODO: close channel? */
+            return;
+        }
+    }
+    if (ret != sizeof(msg)) {
+        red_printf("short read in main dispatcher: %d < %zd\n", ret, sizeof(msg));
+        /* TODO: close channel?  */
+        return;
+    }
+
+    switch (msg.type) {
+        case MAIN_DISPATCHER_CHANNEL_EVENT:
+            main_dispatcher_handle_channel_event(&msg);
+            break;
+        default:
+            red_printf("error: unhandled main dispatcher message type %d\n",
+                       msg.type);
+    }
+}
+
+void main_dispatcher_init(SpiceCoreInterface *core)
+{
+    int channels[2];
+
+    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
+    main_dispatcher.core = core;
+
+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
+        red_error("socketpair failed %s", strerror(errno));
+        return;
+    }
+    main_dispatcher.main_fd = channels[0];
+    main_dispatcher.other_fd = channels[1];
+    main_dispatcher.self = pthread_self();
+
+    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
+                    main_dispatcher_handle_read, &main_dispatcher);
+}
diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
new file mode 100644
index 0000000..2c201c7
--- /dev/null
+++ b/server/main_dispatcher.h
@@ -0,0 +1,9 @@ 
+#ifndef MAIN_DISPATCHER_H
+#define MAIN_DISPATCHER_H
+
+#include <spice.h>
+
+void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
+void main_dispatcher_init(SpiceCoreInterface *core);
+
+#endif //MAIN_DISPATCHER_H
diff --git a/server/reds.c b/server/reds.c
index 90779ff..ef9da4f 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -56,6 +56,7 @@ 
 #include "main_channel.h"
 #include "red_common.h"
 #include "red_dispatcher.h"
+#include "main_dispatcher.h"
 #include "snd_worker.h"
 #include <spice/stats.h>
 #include "stat.h"
@@ -322,7 +323,8 @@  static void reds_stream_channel_event(RedsStream *s, int event)
 {
     if (core->base.minor_version < 3 || core->channel_event == NULL)
         return;
-    core->channel_event(event, &s->info);
+
+    main_dispatcher_channel_event(event, &s->info);
 }
 
 static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
@@ -3519,6 +3521,7 @@  static int do_spice_init(SpiceCoreInterface *core_interface)
     init_vd_agent_resources();
     ring_init(&reds->clients);
     reds->num_clients = 0;
+    main_dispatcher_init(core);
 
     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
         red_error("migration timer create failed");

Comments

Hi Alon,

On Thu, Sep 8, 2011 at 1:17 AM, Alon Levy <alevy@redhat.com> wrote:
> ---
>  server/Makefile.am       |    2 +
>  server/main_dispatcher.c |  132 ++++++++++++++++++++++++++++++++++++++++++++++
>  server/main_dispatcher.h |    9 +++
>  server/reds.c            |    5 ++-
>  4 files changed, 147 insertions(+), 1 deletions(-)
>  create mode 100644 server/main_dispatcher.c
>  create mode 100644 server/main_dispatcher.h
>
> diff --git a/server/Makefile.am b/server/Makefile.am
> index a7cdd84..f1d4052 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -80,6 +80,8 @@ libspice_server_la_SOURCES =                  \
>        red_common.h                            \
>        red_dispatcher.c                        \
>        red_dispatcher.h                        \
> +       main_dispatcher.c           \
> +       main_dispatcher.h           \
>        red_memslots.c                          \
>        red_memslots.h                          \
>        red_parse_qxl.c                         \
> diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> new file mode 100644
> index 0000000..8cdb2e0
> --- /dev/null
> +++ b/server/main_dispatcher.c
> @@ -0,0 +1,132 @@
> +#include <unistd.h>
> +#include <string.h>
> +#include <errno.h>
> +#include <pthread.h>
> +
> +#include "common/spice_common.h"
> +#include "main_dispatcher.h"
> +
> +/*
> + * Main Dispatcher
> + * ===============
> + *
> + * Communication channel between any non main thread and the main thread.
> + *
> + * The main thread is that from which spice_server_init is called.
> + *
> + * Messages are single sized, sent from the non-main thread to the main-thread.
> + * No acknowledge is sent back. This prevents a possible deadlock with the main
> + * thread already waiting on a response for the existing red_dispatcher used
> + * by the worker thread.
> + *

How often do you expect it to be used? What will be the overhead of
those messages, size & rate? From the description, it looks a lot like
the async message queues in Pulseaudio (see src/pulsecore/asyncq.h).
PA uses eventfd for notification/semaphore & lock-free queues (and a
simple object system, not unions). Just an idea, could be changed
later perhaps. socketpair is probably fine.

Lennart, I wonder what today you think about it, if there is any
drawback or improvements. ie would you use it again?

cheers

> + * All events have three functions:
> + * main_dispatcher_<event_name> - non static, public function
> + * main_dispatcher_self_<event_name> - handler for main thread
> + * main_dispatcher_handle_<event_name> - handler for callback from main thread
> + *   seperate from self because it may send an ack or do other work in the future.
> + */
> +
> +typedef struct {
> +    SpiceCoreInterface *core;
> +    int main_fd;
> +    int other_fd;
> +    pthread_t self;
> +} MainDispatcher;
> +
> +MainDispatcher main_dispatcher;
> +
> +enum {
> +    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> +
> +    MAIN_DISPATCHER_NUM_MESSAGES
> +};
> +
> +typedef struct MainDispatcherMessage {
> +    uint32_t type;
> +    union {
> +        struct {
> +            int event;
> +            SpiceChannelEventInfo info;
> +        } channel_event;
> +    } data;
> +} MainDispatcherMessage;
> +
> +
> +/* channel_event - calls core->channel_event, must be done in main thread */
> +static void main_dispatcher_self_handle_channel_event(int event,
> +    SpiceChannelEventInfo *info)
> +{
> +    main_dispatcher.core->channel_event(event, info);
> +}
> +
> +static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> +{
> +    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> +        &msg->data.channel_event.info);
> +}
> +
> +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> +{
> +    MainDispatcherMessage msg;
> +
> +    if (pthread_self() == main_dispatcher.self) {
> +        main_dispatcher_self_handle_channel_event(event, info);
> +        return;
> +    }
> +    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> +    msg.data.channel_event.event = event;
> +    msg.data.channel_event.info = *info;
> +    write(main_dispatcher.other_fd, &msg, sizeof(msg));
> +}
> +
> +
> +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> +{
> +    int ret;
> +    MainDispatcher *md = opaque;
> +    MainDispatcherMessage msg;
> +
> +    ret = read(md->main_fd, &msg, sizeof(msg));
> +    if (ret == -1) {
> +        if (errno == EAGAIN) {
> +            return;
> +        } else {
> +            red_printf("error reading from main dispatcher: %d\n", errno);
> +            /* TODO: close channel? */
> +            return;
> +        }
> +    }
> +    if (ret != sizeof(msg)) {
> +        red_printf("short read in main dispatcher: %d < %zd\n", ret, sizeof(msg));
> +        /* TODO: close channel?  */
> +        return;
> +    }
> +
> +    switch (msg.type) {
> +        case MAIN_DISPATCHER_CHANNEL_EVENT:
> +            main_dispatcher_handle_channel_event(&msg);
> +            break;
> +        default:
> +            red_printf("error: unhandled main dispatcher message type %d\n",
> +                       msg.type);
> +    }
> +}
> +
> +void main_dispatcher_init(SpiceCoreInterface *core)
> +{
> +    int channels[2];
> +
> +    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> +    main_dispatcher.core = core;
> +
> +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> +        red_error("socketpair failed %s", strerror(errno));
> +        return;
> +    }
> +    main_dispatcher.main_fd = channels[0];
> +    main_dispatcher.other_fd = channels[1];
> +    main_dispatcher.self = pthread_self();
> +
> +    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> +                    main_dispatcher_handle_read, &main_dispatcher);
> +}
> diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> new file mode 100644
> index 0000000..2c201c7
> --- /dev/null
> +++ b/server/main_dispatcher.h
> @@ -0,0 +1,9 @@
> +#ifndef MAIN_DISPATCHER_H
> +#define MAIN_DISPATCHER_H
> +
> +#include <spice.h>
> +
> +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> +void main_dispatcher_init(SpiceCoreInterface *core);
> +
> +#endif //MAIN_DISPATCHER_H
> diff --git a/server/reds.c b/server/reds.c
> index 90779ff..ef9da4f 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -56,6 +56,7 @@
>  #include "main_channel.h"
>  #include "red_common.h"
>  #include "red_dispatcher.h"
> +#include "main_dispatcher.h"
>  #include "snd_worker.h"
>  #include <spice/stats.h>
>  #include "stat.h"
> @@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
>  {
>     if (core->base.minor_version < 3 || core->channel_event == NULL)
>         return;
> -    core->channel_event(event, &s->info);
> +
> +    main_dispatcher_channel_event(event, &s->info);
>  }
>
>  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> @@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
>     init_vd_agent_resources();
>     ring_init(&reds->clients);
>     reds->num_clients = 0;
> +    main_dispatcher_init(core);
>
>     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
>         red_error("migration timer create failed");
> --
> 1.7.6.1
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
>
On Thu, Sep 08, 2011 at 02:24:32AM +0200, Marc-André Lureau wrote:
> Hi Alon,
> 
> On Thu, Sep 8, 2011 at 1:17 AM, Alon Levy <alevy@redhat.com> wrote:
> > ---
> >  server/Makefile.am       |    2 +
> >  server/main_dispatcher.c |  132 ++++++++++++++++++++++++++++++++++++++++++++++
> >  server/main_dispatcher.h |    9 +++
> >  server/reds.c            |    5 ++-
> >  4 files changed, 147 insertions(+), 1 deletions(-)
> >  create mode 100644 server/main_dispatcher.c
> >  create mode 100644 server/main_dispatcher.h
> >
> > diff --git a/server/Makefile.am b/server/Makefile.am
> > index a7cdd84..f1d4052 100644
> > --- a/server/Makefile.am
> > +++ b/server/Makefile.am
> > @@ -80,6 +80,8 @@ libspice_server_la_SOURCES =                  \
> >        red_common.h                            \
> >        red_dispatcher.c                        \
> >        red_dispatcher.h                        \
> > +       main_dispatcher.c           \
> > +       main_dispatcher.h           \
> >        red_memslots.c                          \
> >        red_memslots.h                          \
> >        red_parse_qxl.c                         \
> > diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> > new file mode 100644
> > index 0000000..8cdb2e0
> > --- /dev/null
> > +++ b/server/main_dispatcher.c
> > @@ -0,0 +1,132 @@
> > +#include <unistd.h>
> > +#include <string.h>
> > +#include <errno.h>
> > +#include <pthread.h>
> > +
> > +#include "common/spice_common.h"
> > +#include "main_dispatcher.h"
> > +
> > +/*
> > + * Main Dispatcher
> > + * ===============
> > + *
> > + * Communication channel between any non main thread and the main thread.
> > + *
> > + * The main thread is that from which spice_server_init is called.
> > + *
> > + * Messages are single sized, sent from the non-main thread to the main-thread.
> > + * No acknowledge is sent back. This prevents a possible deadlock with the main
> > + * thread already waiting on a response for the existing red_dispatcher used
> > + * by the worker thread.
> > + *
> 
> How often do you expect it to be used? What will be the overhead of
> those messages, size & rate? From the description, it looks a lot like
> the async message queues in Pulseaudio (see src/pulsecore/asyncq.h).
> PA uses eventfd for notification/semaphore & lock-free queues (and a
> simple object system, not unions). Just an idea, could be changed
> later perhaps. socketpair is probably fine.
> 

First of all I see I forgot to add a mutex, since I said "any non main thread", right
now we only have one, so no need to lock, but the idea is that it can also be called from
others, and so we will need it later. (unless that write can be considered atomic, but
I don't think it can)

To answer your question - infrequently. So I don't want / think we need to invest in
optimizing this. Right now the only usage is the channel event (core->channel_event)
calls, which have to happen in the main thread, and are currently done also from the
worker thread.

Thanks for the reference, I'll definitely check it. We already use a single socketpair
btw in red_dispatcher, this is a simple copy from there, I didn't actually create a
base "dispatcher" class because there are some differences and there isn't that much
shared code (differences: dispatcher does an ack, here I don't because otherwise we
can deadlock with the dispatcher. Also dispatcher has non union messages, here I wanted
to do the least code even if it meant some messages are larger then neccessary. I can
always just send a pointer, but then I must have acks to avoid freeing it.)

> Lennart, I wonder what today you think about it, if there is any
> drawback or improvements. ie would you use it again?
> 
> cheers
> 
> > + * All events have three functions:
> > + * main_dispatcher_<event_name> - non static, public function
> > + * main_dispatcher_self_<event_name> - handler for main thread
> > + * main_dispatcher_handle_<event_name> - handler for callback from main thread
> > + *   seperate from self because it may send an ack or do other work in the future.
> > + */
> > +
> > +typedef struct {
> > +    SpiceCoreInterface *core;
> > +    int main_fd;
> > +    int other_fd;
> > +    pthread_t self;
> > +} MainDispatcher;
> > +
> > +MainDispatcher main_dispatcher;
> > +
> > +enum {
> > +    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> > +
> > +    MAIN_DISPATCHER_NUM_MESSAGES
> > +};
> > +
> > +typedef struct MainDispatcherMessage {
> > +    uint32_t type;
> > +    union {
> > +        struct {
> > +            int event;
> > +            SpiceChannelEventInfo info;
> > +        } channel_event;
> > +    } data;
> > +} MainDispatcherMessage;
> > +
> > +
> > +/* channel_event - calls core->channel_event, must be done in main thread */
> > +static void main_dispatcher_self_handle_channel_event(int event,
> > +    SpiceChannelEventInfo *info)
> > +{
> > +    main_dispatcher.core->channel_event(event, info);
> > +}
> > +
> > +static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> > +{
> > +    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> > +        &msg->data.channel_event.info);
> > +}
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> > +{
> > +    MainDispatcherMessage msg;
> > +
> > +    if (pthread_self() == main_dispatcher.self) {
> > +        main_dispatcher_self_handle_channel_event(event, info);
> > +        return;
> > +    }
> > +    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> > +    msg.data.channel_event.event = event;
> > +    msg.data.channel_event.info = *info;
> > +    write(main_dispatcher.other_fd, &msg, sizeof(msg));
> > +}
> > +
> > +
> > +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> > +{
> > +    int ret;
> > +    MainDispatcher *md = opaque;
> > +    MainDispatcherMessage msg;
> > +
> > +    ret = read(md->main_fd, &msg, sizeof(msg));
> > +    if (ret == -1) {
> > +        if (errno == EAGAIN) {
> > +            return;
> > +        } else {
> > +            red_printf("error reading from main dispatcher: %d\n", errno);
> > +            /* TODO: close channel? */
> > +            return;
> > +        }
> > +    }
> > +    if (ret != sizeof(msg)) {
> > +        red_printf("short read in main dispatcher: %d < %zd\n", ret, sizeof(msg));
> > +        /* TODO: close channel?  */
> > +        return;
> > +    }
> > +
> > +    switch (msg.type) {
> > +        case MAIN_DISPATCHER_CHANNEL_EVENT:
> > +            main_dispatcher_handle_channel_event(&msg);
> > +            break;
> > +        default:
> > +            red_printf("error: unhandled main dispatcher message type %d\n",
> > +                       msg.type);
> > +    }
> > +}
> > +
> > +void main_dispatcher_init(SpiceCoreInterface *core)
> > +{
> > +    int channels[2];
> > +
> > +    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> > +    main_dispatcher.core = core;
> > +
> > +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> > +        red_error("socketpair failed %s", strerror(errno));
> > +        return;
> > +    }
> > +    main_dispatcher.main_fd = channels[0];
> > +    main_dispatcher.other_fd = channels[1];
> > +    main_dispatcher.self = pthread_self();
> > +
> > +    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> > +                    main_dispatcher_handle_read, &main_dispatcher);
> > +}
> > diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> > new file mode 100644
> > index 0000000..2c201c7
> > --- /dev/null
> > +++ b/server/main_dispatcher.h
> > @@ -0,0 +1,9 @@
> > +#ifndef MAIN_DISPATCHER_H
> > +#define MAIN_DISPATCHER_H
> > +
> > +#include <spice.h>
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> > +void main_dispatcher_init(SpiceCoreInterface *core);
> > +
> > +#endif //MAIN_DISPATCHER_H
> > diff --git a/server/reds.c b/server/reds.c
> > index 90779ff..ef9da4f 100644
> > --- a/server/reds.c
> > +++ b/server/reds.c
> > @@ -56,6 +56,7 @@
> >  #include "main_channel.h"
> >  #include "red_common.h"
> >  #include "red_dispatcher.h"
> > +#include "main_dispatcher.h"
> >  #include "snd_worker.h"
> >  #include <spice/stats.h>
> >  #include "stat.h"
> > @@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
> >  {
> >     if (core->base.minor_version < 3 || core->channel_event == NULL)
> >         return;
> > -    core->channel_event(event, &s->info);
> > +
> > +    main_dispatcher_channel_event(event, &s->info);
> >  }
> >
> >  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> > @@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> >     init_vd_agent_resources();
> >     ring_init(&reds->clients);
> >     reds->num_clients = 0;
> > +    main_dispatcher_init(core);
> >
> >     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
> >         red_error("migration timer create failed");
> > --
> > 1.7.6.1
> >
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel@lists.freedesktop.org
> > http://lists.freedesktop.org/mailman/listinfo/spice-devel
> >
> 
> 
> 
> -- 
> Marc-André Lureau
Hi,

>> simple object system, not unions). Just an idea, could be changed
>> later perhaps. socketpair is probably fine.
>
> First of all I see I forgot to add a mutex, since I said "any non main thread", right
> now we only have one, so no need to lock, but the idea is that it can also be called from
> others, and so we will need it later. (unless that write can be considered atomic, but
> I don't think it can)

A single write system call is atomic.

> To answer your question - infrequently. So I don't want / think we need to invest in
> optimizing this. Right now the only usage is the channel event (core->channel_event)
> calls, which have to happen in the main thread, and are currently done also from the
> worker thread.
>
> Thanks for the reference, I'll definitely check it. We already use a single socketpair
> btw in red_dispatcher, this is a simple copy from there, I didn't actually create a
> base "dispatcher" class because there are some differences and there isn't that much
> shared code

It makes sense to use the same mechanism everythere in spice.

We might want to look out for something else, I guess the pipe / 
socketpair isn't exactly the most efficient way to do inter-thread 
communication.  But I don't think that is a bottle-neck in spice, there 
isn't that much communication going on between red_dispatcher and 
red_worker.

cheers,
   Gerd
Hi,

On 09/08/2011 09:31 AM, Gerd Hoffmann wrote:
> Hi,
>
>>> simple object system, not unions). Just an idea, could be changed
>>> later perhaps. socketpair is probably fine.
>>
>> First of all I see I forgot to add a mutex, since I said "any non main thread", right
>> now we only have one, so no need to lock, but the idea is that it can also be called from
>> others, and so we will need it later. (unless that write can be considered atomic, but
>> I don't think it can)
>
> A single write system call is atomic.

Yes and no, this is true, except when the write blocks after writing part of the
message, once that happens all bets are off. I think we may have several cases
where we don't check for this. At the minimum we should assert (and not ASSERT)
when a write returns a wrong number of bytes written (iow it managed
to write some but not all of the message).

Regards,

Hans
On Thu, Sep 08, 2011 at 09:31:33AM +0200, Gerd Hoffmann wrote:
>   Hi,
> 
> >>simple object system, not unions). Just an idea, could be changed
> >>later perhaps. socketpair is probably fine.
> >
> >First of all I see I forgot to add a mutex, since I said "any non main thread", right
> >now we only have one, so no need to lock, but the idea is that it can also be called from
> >others, and so we will need it later. (unless that write can be considered atomic, but
> >I don't think it can)
> 
> A single write system call is atomic.

ok, I'll add a comment to this effect.

> 
> >To answer your question - infrequently. So I don't want / think we need to invest in
> >optimizing this. Right now the only usage is the channel event (core->channel_event)
> >calls, which have to happen in the main thread, and are currently done also from the
> >worker thread.
> >
> >Thanks for the reference, I'll definitely check it. We already use a single socketpair
> >btw in red_dispatcher, this is a simple copy from there, I didn't actually create a
> >base "dispatcher" class because there are some differences and there isn't that much
> >shared code
> 
> It makes sense to use the same mechanism everythere in spice.

Is this a vote for creating a base/shared struct? but then we have the existing send+ack behavior
that can't be copied because of the mentioned deadlock, so either I make it so it can be used
in both ways, i.e. messaging is left for the user, or what, check for deadlocks?

> 
> We might want to look out for something else, I guess the pipe /
> socketpair isn't exactly the most efficient way to do inter-thread
> communication.  But I don't think that is a bottle-neck in spice,
> there isn't that much communication going on between red_dispatcher
> and red_worker.
> 

So - I was going to send this patch as a PATCH and to 0.8 as well (I have it ready, tested) - is
further work required? do you want to try to create a base class?

> cheers,
>   Gerd
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
On Thu, Sep 08, 2011 at 10:58:41AM +0200, Hans de Goede wrote:
> Hi,
> 
> On 09/08/2011 09:31 AM, Gerd Hoffmann wrote:
> >Hi,
> >
> >>>simple object system, not unions). Just an idea, could be changed
> >>>later perhaps. socketpair is probably fine.
> >>
> >>First of all I see I forgot to add a mutex, since I said "any non main thread", right
> >>now we only have one, so no need to lock, but the idea is that it can also be called from
> >>others, and so we will need it later. (unless that write can be considered atomic, but
> >>I don't think it can)
> >
> >A single write system call is atomic.
> 
> Yes and no, this is true, except when the write blocks after writing part of the
> message, once that happens all bets are off. I think we may have several cases
> where we don't check for this. At the minimum we should assert (and not ASSERT)
> when a write returns a wrong number of bytes written (iow it managed
> to write some but not all of the message).

Can that happen in a normal order of events? or is that a thing that really merits
crashing the process?

> 
> Regards,
> 
> Hans
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
Hi,

On 09/08/2011 11:18 AM, Alon Levy wrote:
> On Thu, Sep 08, 2011 at 10:58:41AM +0200, Hans de Goede wrote:
>> Hi,
>>
>> On 09/08/2011 09:31 AM, Gerd Hoffmann wrote:
>>> Hi,
>>>
>>>>> simple object system, not unions). Just an idea, could be changed
>>>>> later perhaps. socketpair is probably fine.
>>>>
>>>> First of all I see I forgot to add a mutex, since I said "any non main thread", right
>>>> now we only have one, so no need to lock, but the idea is that it can also be called from
>>>> others, and so we will need it later. (unless that write can be considered atomic, but
>>>> I don't think it can)
>>>
>>> A single write system call is atomic.
>>
>> Yes and no, this is true, except when the write blocks after writing part of the
>> message, once that happens all bets are off. I think we may have several cases
>> where we don't check for this. At the minimum we should assert (and not ASSERT)
>> when a write returns a wrong number of bytes written (iow it managed
>> to write some but not all of the message).
>
> Can that happen in a normal order of events? or is that a thing that really merits
> crashing the process?

Define normal, I think the kernel will buffer up to 16k (or maybe only 4k ?) before blocking,
so if the producing side manages to write 16k before the consumer gets around to consuming
any you will hit this. This seems an exceptional circumstance, so I think crashing is an
ok answer to this.

We should either handle this gracefully (which means extra code + locking for something
which should never happen), or at least catch the should never happen case.

Regards,

Hans
On Thu, Sep 08, 2011 at 11:26:00AM +0200, Hans de Goede wrote:
> Hi,
> 
> On 09/08/2011 11:18 AM, Alon Levy wrote:
> >On Thu, Sep 08, 2011 at 10:58:41AM +0200, Hans de Goede wrote:
> >>Hi,
> >>
> >>On 09/08/2011 09:31 AM, Gerd Hoffmann wrote:
> >>>Hi,
> >>>
> >>>>>simple object system, not unions). Just an idea, could be changed
> >>>>>later perhaps. socketpair is probably fine.
> >>>>
> >>>>First of all I see I forgot to add a mutex, since I said "any non main thread", right
> >>>>now we only have one, so no need to lock, but the idea is that it can also be called from
> >>>>others, and so we will need it later. (unless that write can be considered atomic, but
> >>>>I don't think it can)
> >>>
> >>>A single write system call is atomic.
> >>
> >>Yes and no, this is true, except when the write blocks after writing part of the
> >>message, once that happens all bets are off. I think we may have several cases
> >>where we don't check for this. At the minimum we should assert (and not ASSERT)
> >>when a write returns a wrong number of bytes written (iow it managed
> >>to write some but not all of the message).
> >
> >Can that happen in a normal order of events? or is that a thing that really merits
> >crashing the process?
> 
> Define normal, I think the kernel will buffer up to 16k (or maybe only 4k ?) before blocking,
> so if the producing side manages to write 16k before the consumer gets around to consuming
> any you will hit this. This seems an exceptional circumstance, so I think crashing is an
> ok answer to this.
> 
> We should either handle this gracefully (which means extra code + locking for something
> which should never happen), or at least catch the should never happen case.
> 

ok. I guess I'll add an assert (not ASSERT). I'm still looking at the pulseaudio IPC - it looks
like asyncmsgq is good for what we need - it's meant for multiple writer single reader, where the
reader is a realtime thread, but should be fine if the reader is not. It would drag quite a lot
of the pulseaudio support files - fdsem, asyncq, asynqmsg, some macro.h stuff. Or maybe it's already
been split off by someone?

> Regards,
> 
> Hans
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
On Thu, Sep 08, 2011 at 02:24:32AM +0200, Marc-André Lureau wrote:
> Hi Alon,
> 
> On Thu, Sep 8, 2011 at 1:17 AM, Alon Levy <alevy@redhat.com> wrote:
> > ---
> >  server/Makefile.am       |    2 +
> >  server/main_dispatcher.c |  132 ++++++++++++++++++++++++++++++++++++++++++++++
> >  server/main_dispatcher.h |    9 +++
> >  server/reds.c            |    5 ++-
> >  4 files changed, 147 insertions(+), 1 deletions(-)
> >  create mode 100644 server/main_dispatcher.c
> >  create mode 100644 server/main_dispatcher.h
> >
> > diff --git a/server/Makefile.am b/server/Makefile.am
> > index a7cdd84..f1d4052 100644
> > --- a/server/Makefile.am
> > +++ b/server/Makefile.am
> > @@ -80,6 +80,8 @@ libspice_server_la_SOURCES =                  \
> >        red_common.h                            \
> >        red_dispatcher.c                        \
> >        red_dispatcher.h                        \
> > +       main_dispatcher.c           \
> > +       main_dispatcher.h           \
> >        red_memslots.c                          \
> >        red_memslots.h                          \
> >        red_parse_qxl.c                         \
> > diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
> > new file mode 100644
> > index 0000000..8cdb2e0
> > --- /dev/null
> > +++ b/server/main_dispatcher.c
> > @@ -0,0 +1,132 @@
> > +#include <unistd.h>
> > +#include <string.h>
> > +#include <errno.h>
> > +#include <pthread.h>
> > +
> > +#include "common/spice_common.h"
> > +#include "main_dispatcher.h"
> > +
> > +/*
> > + * Main Dispatcher
> > + * ===============
> > + *
> > + * Communication channel between any non main thread and the main thread.
> > + *
> > + * The main thread is that from which spice_server_init is called.
> > + *
> > + * Messages are single sized, sent from the non-main thread to the main-thread.
> > + * No acknowledge is sent back. This prevents a possible deadlock with the main
> > + * thread already waiting on a response for the existing red_dispatcher used
> > + * by the worker thread.
> > + *
> 
> How often do you expect it to be used? What will be the overhead of
> those messages, size & rate? From the description, it looks a lot like
> the async message queues in Pulseaudio (see src/pulsecore/asyncq.h).

Ok, having looked at that stuff I think we don't need that complexity. It is
trying to solve a more complicated problem, sending from a single or multiple writers
to a real time thread. We don't have that requirement, since this is a strictly
secondary information channel. I think just adding a mutex to ensure multiple writer
support despite currently using a single write, and that's it. What do you think?

> PA uses eventfd for notification/semaphore & lock-free queues (and a
> simple object system, not unions). Just an idea, could be changed
> later perhaps. socketpair is probably fine.
> 
> Lennart, I wonder what today you think about it, if there is any
> drawback or improvements. ie would you use it again?
> 
> cheers
> 
> > + * All events have three functions:
> > + * main_dispatcher_<event_name> - non static, public function
> > + * main_dispatcher_self_<event_name> - handler for main thread
> > + * main_dispatcher_handle_<event_name> - handler for callback from main thread
> > + *   seperate from self because it may send an ack or do other work in the future.
> > + */
> > +
> > +typedef struct {
> > +    SpiceCoreInterface *core;
> > +    int main_fd;
> > +    int other_fd;
> > +    pthread_t self;
> > +} MainDispatcher;
> > +
> > +MainDispatcher main_dispatcher;
> > +
> > +enum {
> > +    MAIN_DISPATCHER_CHANNEL_EVENT = 0,
> > +
> > +    MAIN_DISPATCHER_NUM_MESSAGES
> > +};
> > +
> > +typedef struct MainDispatcherMessage {
> > +    uint32_t type;
> > +    union {
> > +        struct {
> > +            int event;
> > +            SpiceChannelEventInfo info;
> > +        } channel_event;
> > +    } data;
> > +} MainDispatcherMessage;
> > +
> > +
> > +/* channel_event - calls core->channel_event, must be done in main thread */
> > +static void main_dispatcher_self_handle_channel_event(int event,
> > +    SpiceChannelEventInfo *info)
> > +{
> > +    main_dispatcher.core->channel_event(event, info);
> > +}
> > +
> > +static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
> > +{
> > +    main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
> > +        &msg->data.channel_event.info);
> > +}
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
> > +{
> > +    MainDispatcherMessage msg;
> > +
> > +    if (pthread_self() == main_dispatcher.self) {
> > +        main_dispatcher_self_handle_channel_event(event, info);
> > +        return;
> > +    }
> > +    msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
> > +    msg.data.channel_event.event = event;
> > +    msg.data.channel_event.info = *info;
> > +    write(main_dispatcher.other_fd, &msg, sizeof(msg));
> > +}
> > +
> > +
> > +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> > +{
> > +    int ret;
> > +    MainDispatcher *md = opaque;
> > +    MainDispatcherMessage msg;
> > +
> > +    ret = read(md->main_fd, &msg, sizeof(msg));
> > +    if (ret == -1) {
> > +        if (errno == EAGAIN) {
> > +            return;
> > +        } else {
> > +            red_printf("error reading from main dispatcher: %d\n", errno);
> > +            /* TODO: close channel? */
> > +            return;
> > +        }
> > +    }
> > +    if (ret != sizeof(msg)) {
> > +        red_printf("short read in main dispatcher: %d < %zd\n", ret, sizeof(msg));
> > +        /* TODO: close channel?  */
> > +        return;
> > +    }
> > +
> > +    switch (msg.type) {
> > +        case MAIN_DISPATCHER_CHANNEL_EVENT:
> > +            main_dispatcher_handle_channel_event(&msg);
> > +            break;
> > +        default:
> > +            red_printf("error: unhandled main dispatcher message type %d\n",
> > +                       msg.type);
> > +    }
> > +}
> > +
> > +void main_dispatcher_init(SpiceCoreInterface *core)
> > +{
> > +    int channels[2];
> > +
> > +    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> > +    main_dispatcher.core = core;
> > +
> > +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> > +        red_error("socketpair failed %s", strerror(errno));
> > +        return;
> > +    }
> > +    main_dispatcher.main_fd = channels[0];
> > +    main_dispatcher.other_fd = channels[1];
> > +    main_dispatcher.self = pthread_self();
> > +
> > +    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> > +                    main_dispatcher_handle_read, &main_dispatcher);
> > +}
> > diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> > new file mode 100644
> > index 0000000..2c201c7
> > --- /dev/null
> > +++ b/server/main_dispatcher.h
> > @@ -0,0 +1,9 @@
> > +#ifndef MAIN_DISPATCHER_H
> > +#define MAIN_DISPATCHER_H
> > +
> > +#include <spice.h>
> > +
> > +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> > +void main_dispatcher_init(SpiceCoreInterface *core);
> > +
> > +#endif //MAIN_DISPATCHER_H
> > diff --git a/server/reds.c b/server/reds.c
> > index 90779ff..ef9da4f 100644
> > --- a/server/reds.c
> > +++ b/server/reds.c
> > @@ -56,6 +56,7 @@
> >  #include "main_channel.h"
> >  #include "red_common.h"
> >  #include "red_dispatcher.h"
> > +#include "main_dispatcher.h"
> >  #include "snd_worker.h"
> >  #include <spice/stats.h>
> >  #include "stat.h"
> > @@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
> >  {
> >     if (core->base.minor_version < 3 || core->channel_event == NULL)
> >         return;
> > -    core->channel_event(event, &s->info);
> > +
> > +    main_dispatcher_channel_event(event, &s->info);
> >  }
> >
> >  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> > @@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> >     init_vd_agent_resources();
> >     ring_init(&reds->clients);
> >     reds->num_clients = 0;
> > +    main_dispatcher_init(core);
> >
> >     if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
> >         red_error("migration timer create failed");
> > --
> > 1.7.6.1
> >
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel@lists.freedesktop.org
> > http://lists.freedesktop.org/mailman/listinfo/spice-devel
> >
> 
> 
> 
> -- 
> Marc-André Lureau
Hi

On Thu, Sep 8, 2011 at 11:38 AM, Alon Levy <alevy@redhat.com> wrote:
> On Thu, Sep 08, 2011 at 11:26:00AM +0200, Hans de Goede wrote:
>> Hi,
>
> ok. I guess I'll add an assert (not ASSERT). I'm still looking at the pulseaudio IPC - it looks
> like asyncmsgq is good for what we need - it's meant for multiple writer single reader, where the
> reader is a realtime thread, but should be fine if the reader is not. It would drag quite a lot
> of the pulseaudio support files - fdsem, asyncq, asynqmsg, some macro.h stuff. Or maybe it's already
> been split off by someone?

Some of the macro/lock/obj stuff are in my wip/oo branch. Teuf took a
look recently, he seemed to like it.

https://gitorious.org/~elmarco/spice/elmarco-spice/commits/wip/oo
On Thu, 08.09.11 02:24, Marc-André Lureau (marcandre.lureau@gmail.com) wrote:

> Lennart, I wonder what today you think about it, if there is any
> drawback or improvements. ie would you use it again?

I'd always go for socketpair() unless you have a really really good
reason to go for atomic op stuff. Atomic ops are messy, difficult, not
portable to weird archs, and so on. socketpair() is a nice API, atomic
ops are messy. So unless you do audio stuff avoid them.

(Sorry for the late response, been travelling)

Lennart
On 09/08/2011 02:17 AM, Alon Levy wrote:
Hi,
I think this patch is sufficient for the 0.8 branch (+ the comments 
bellow + locks (for several display channels threads)).
However for master, due to the red_channel refactoring I think we should 
have a more general dispatcher. It would be nice if (1) we could 
register events and their handlers, so that main_dispatcher won't need 
to know about the specific events. (2) the dispatcher will be 
bi-directional and red_dispatcher will reuse it.

Now that we have a main dispatcher, the calls to reds_update_mouse_mode 
should also be done via the red_dispatcher (those that are triggered by 
red_dispatcher_async_complete, and those that are executed from the vcpu 
thread).

<snip>

> +
> +static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> +{
> +    int ret;
> +    MainDispatcher *md = opaque;
> +    MainDispatcherMessage msg;
> +
> +    ret = read(md->main_fd,&msg, sizeof(msg));
> +    if (ret == -1)
> +        if (errno == EAGAIN) {
> +            return;
> +        } else {
> +            red_printf("error reading from main dispatcher: %d\n", errno);
> +            /* TODO: close channel? */
> +            return;
> +        }
> +    }
> +    if (ret != sizeof(msg)) {
Even if it is very rare, I think we should support it and read till we 
get the whole msg.
> +        red_printf("short read in main dispatcher: %d<  %zd\n", ret, sizeof(msg));
> +        /* TODO: close channel?  */
> +        return;
> +    }
> +
> +    switch (msg.type) {
> +        case MAIN_DISPATCHER_CHANNEL_EVENT:
> +            main_dispatcher_handle_channel_event(&msg);
> +            break;
> +        default:
> +            red_printf("error: unhandled main dispatcher message type %d\n",
> +                       msg.type);
> +    }
> +}
> +
> +void main_dispatcher_init(SpiceCoreInterface *core)
> +{
> +    int channels[2];
> +
> +    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> +    main_dispatcher.core = core;
> +
> +    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> +        red_error("socketpair failed %s", strerror(errno));
> +        return;
> +    }
> +    main_dispatcher.main_fd = channels[0];
> +    main_dispatcher.other_fd = channels[1];
> +    main_dispatcher.self = pthread_self();
> +
> +    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> +                    main_dispatcher_handle_read,&main_dispatcher);
> +}
> diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> new file mode 100644
> index 0000000..2c201c7
> --- /dev/null
> +++ b/server/main_dispatcher.h
> @@ -0,0 +1,9 @@
> +#ifndef MAIN_DISPATCHER_H
> +#define MAIN_DISPATCHER_H
> +
> +#include<spice.h>
> +
> +void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> +void main_dispatcher_init(SpiceCoreInterface *core);
> +
> +#endif //MAIN_DISPATCHER_H
> diff --git a/server/reds.c b/server/reds.c
> index 90779ff..ef9da4f 100644
> --- a/server/reds.c
> +++ b/server/reds.c
> @@ -56,6 +56,7 @@
>   #include "main_channel.h"
>   #include "red_common.h"
>   #include "red_dispatcher.h"
> +#include "main_dispatcher.h"
>   #include "snd_worker.h"
>   #include<spice/stats.h>
>   #include "stat.h"
> @@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
>   {
>       if (core->base.minor_version<  3 || core->channel_event == NULL)
>           return;
> -    core->channel_event(event,&s->info);
> +
> +    main_dispatcher_channel_event(event,&s->info);
>   }
>
>   static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> @@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
>       init_vd_agent_resources();
>       ring_init(&reds->clients);
>       reds->num_clients = 0;
> +    main_dispatcher_init(core);
>
>       if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
>           red_error("migration timer create failed");
On Tue, Oct 18, 2011 at 01:34:09PM +0200, Yonit Halperin wrote:
> On 09/08/2011 02:17 AM, Alon Levy wrote:
> Hi,
> I think this patch is sufficient for the 0.8 branch (+ the comments
> bellow + locks (for several display channels threads)).
> However for master, due to the red_channel refactoring I think we
> should have a more general dispatcher. It would be nice if (1) we
> could register events and their handlers, so that main_dispatcher
> won't need to know about the specific events. (2) the dispatcher
> will be bi-directional and red_dispatcher will reuse it.
> 

Ok for (1).

(2) is also doable - you want to have the dispatcher support both the
synchronous calls like all the old red_dispatcher (old as in age, not as
in obsolete) calls, and async like this patch adds and like all the
async red_dispatcher calls.

> Now that we have a main dispatcher, the calls to
> reds_update_mouse_mode should also be done via the red_dispatcher
> (those that are triggered by red_dispatcher_async_complete, and
> those that are executed from the vcpu thread).

I think you're right, without looking too much into it.

Ok, guess I'll do a respin for 0.8

> 
> <snip>
> 
> >+
> >+static void main_dispatcher_handle_read(int fd, int event, void *opaque)
> >+{
> >+    int ret;
> >+    MainDispatcher *md = opaque;
> >+    MainDispatcherMessage msg;
> >+
> >+    ret = read(md->main_fd,&msg, sizeof(msg));
> >+    if (ret == -1)
> >+        if (errno == EAGAIN) {
> >+            return;
> >+        } else {
> >+            red_printf("error reading from main dispatcher: %d\n", errno);
> >+            /* TODO: close channel? */
> >+            return;
> >+        }
> >+    }
> >+    if (ret != sizeof(msg)) {
> Even if it is very rare, I think we should support it and read till
> we get the whole msg.
> >+        red_printf("short read in main dispatcher: %d<  %zd\n", ret, sizeof(msg));
> >+        /* TODO: close channel?  */
> >+        return;
> >+    }
> >+
> >+    switch (msg.type) {
> >+        case MAIN_DISPATCHER_CHANNEL_EVENT:
> >+            main_dispatcher_handle_channel_event(&msg);
> >+            break;
> >+        default:
> >+            red_printf("error: unhandled main dispatcher message type %d\n",
> >+                       msg.type);
> >+    }
> >+}
> >+
> >+void main_dispatcher_init(SpiceCoreInterface *core)
> >+{
> >+    int channels[2];
> >+
> >+    memset(&main_dispatcher, 0, sizeof(main_dispatcher));
> >+    main_dispatcher.core = core;
> >+
> >+    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
> >+        red_error("socketpair failed %s", strerror(errno));
> >+        return;
> >+    }
> >+    main_dispatcher.main_fd = channels[0];
> >+    main_dispatcher.other_fd = channels[1];
> >+    main_dispatcher.self = pthread_self();
> >+
> >+    core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
> >+                    main_dispatcher_handle_read,&main_dispatcher);
> >+}
> >diff --git a/server/main_dispatcher.h b/server/main_dispatcher.h
> >new file mode 100644
> >index 0000000..2c201c7
> >--- /dev/null
> >+++ b/server/main_dispatcher.h
> >@@ -0,0 +1,9 @@
> >+#ifndef MAIN_DISPATCHER_H
> >+#define MAIN_DISPATCHER_H
> >+
> >+#include<spice.h>
> >+
> >+void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info);
> >+void main_dispatcher_init(SpiceCoreInterface *core);
> >+
> >+#endif //MAIN_DISPATCHER_H
> >diff --git a/server/reds.c b/server/reds.c
> >index 90779ff..ef9da4f 100644
> >--- a/server/reds.c
> >+++ b/server/reds.c
> >@@ -56,6 +56,7 @@
> >  #include "main_channel.h"
> >  #include "red_common.h"
> >  #include "red_dispatcher.h"
> >+#include "main_dispatcher.h"
> >  #include "snd_worker.h"
> >  #include<spice/stats.h>
> >  #include "stat.h"
> >@@ -322,7 +323,8 @@ static void reds_stream_channel_event(RedsStream *s, int event)
> >  {
> >      if (core->base.minor_version<  3 || core->channel_event == NULL)
> >          return;
> >-    core->channel_event(event,&s->info);
> >+
> >+    main_dispatcher_channel_event(event,&s->info);
> >  }
> >
> >  static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
> >@@ -3519,6 +3521,7 @@ static int do_spice_init(SpiceCoreInterface *core_interface)
> >      init_vd_agent_resources();
> >      ring_init(&reds->clients);
> >      reds->num_clients = 0;
> >+    main_dispatcher_init(core);
> >
> >      if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) {
> >          red_error("migration timer create failed");
>