[phodav,11/13] spice: move OutputQueue to file

Submitted by Jakub Janku on May 23, 2019, 8:37 a.m.

Details

Message ID 20190523083725.1554-12-jjanku@redhat.com
State New
Headers show
Series "Miscellaneous series" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Jakub Janku May 23, 2019, 8:37 a.m.
OutputQueue is a self-contained unit and as such can be put in
a separate file to make the spice-webdavd.c less cluttered.

Also, as the current implementation defines output_queue_{ref, unref},
turn OutputQueue into a GObject which can handle these for us.

Signed-off-by: Jakub Janků <jjanku@redhat.com>
---
 spice/meson.build     |   8 ++-
 spice/output-queue.c  | 164 ++++++++++++++++++++++++++++++++++++++++++
 spice/output-queue.h  |  38 ++++++++++
 spice/spice-webdavd.c | 162 ++---------------------------------------
 4 files changed, 214 insertions(+), 158 deletions(-)
 create mode 100644 spice/output-queue.c
 create mode 100644 spice/output-queue.h

Patch hide | download patch | download mbox

diff --git a/spice/meson.build b/spice/meson.build
index 6db22cc..06d20e6 100644
--- a/spice/meson.build
+++ b/spice/meson.build
@@ -4,9 +4,15 @@  if host_machine.system() == 'windows'
   win32_deps += compiler.find_library('mpr')
 endif
 
+sources = [
+  'spice-webdavd.c',
+  'output-queue.c',
+  'output-queue.h'
+]
+
 executable(
   'spice-webdavd',
-  [ 'spice-webdavd.c' ],
+  sources,
   install_dir : sbindir,
   include_directories : incdir,
   dependencies : win32_deps + avahi_deps + deps,
diff --git a/spice/output-queue.c b/spice/output-queue.c
new file mode 100644
index 0000000..6991493
--- /dev/null
+++ b/spice/output-queue.c
@@ -0,0 +1,164 @@ 
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include <config.h>
+
+#include "output-queue.h"
+
+typedef struct _OutputQueueElem
+{
+  OutputQueue  *queue;
+  const guint8 *buf;
+  gsize         size;
+  PushedCb      cb;
+  gpointer      user_data;
+} OutputQueueElem;
+
+struct _OutputQueue
+{
+  GObject        parent_instance;
+  GOutputStream *output;
+  gboolean       flushing;
+  guint          idle_id;
+  GQueue        *queue;
+  GCancellable  *cancel;
+};
+
+G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT);
+
+static void output_queue_init(OutputQueue *self)
+{
+    self->queue = g_queue_new ();
+}
+
+static void output_queue_finalize(GObject *obj)
+{
+    OutputQueue *self = OUTPUT_QUEUE(obj);
+
+    g_warn_if_fail (g_queue_get_length (self->queue) == 0);
+    g_warn_if_fail (!self->flushing);
+    g_warn_if_fail (!self->idle_id);
+
+    g_queue_free_full (self->queue, g_free);
+    g_object_unref (self->output);
+    g_object_unref (self->cancel);
+
+    G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj);
+}
+
+static void output_queue_class_init(OutputQueueClass *klass)
+{
+    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+    gobject_class->finalize = output_queue_finalize;
+}
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
+{
+  OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL);
+  self->output = g_object_ref (output);
+  self->cancel = g_object_ref (cancel);
+  return self;
+}
+
+static gboolean output_queue_idle (gpointer user_data);
+
+static void
+output_queue_flush_cb (GObject      *source_object,
+                       GAsyncResult *res,
+                       gpointer      user_data)
+{
+  GError *error = NULL;
+  OutputQueueElem *e = user_data;
+  OutputQueue *q = e->queue;
+
+  g_debug ("flushed");
+  q->flushing = FALSE;
+  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+                                res, &error);
+  if (error)
+    g_warning ("error: %s", error->message);
+
+  g_clear_error (&error);
+
+  if (!q->idle_id)
+    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+
+  g_free (e);
+  g_object_unref (q);
+}
+
+static gboolean
+output_queue_idle (gpointer user_data)
+{
+  OutputQueue *q = user_data;
+  OutputQueueElem *e = NULL;
+  GError *error = NULL;
+
+  if (q->flushing)
+    {
+      g_debug ("already flushing");
+      goto end;
+    }
+
+  e = g_queue_pop_head (q->queue);
+  if (!e)
+    {
+      g_debug ("No more data to flush");
+      goto end;
+    }
+
+  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
+  g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
+  if (e->cb)
+    e->cb (q, e->user_data, error);
+
+  if (error)
+      goto end;
+
+  q->flushing = TRUE;
+  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
+
+  q->idle_id = 0;
+  return FALSE;
+
+end:
+  g_clear_error (&error);
+  q->idle_id = 0;
+  g_free (e);
+  g_object_unref (q);
+
+  return FALSE;
+}
+
+void
+output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+                   PushedCb pushed_cb, gpointer user_data)
+{
+  OutputQueueElem *e;
+
+  g_return_if_fail (q != NULL);
+
+  e = g_new (OutputQueueElem, 1);
+  e->buf = buf;
+  e->size = size;
+  e->cb = pushed_cb;
+  e->user_data = user_data;
+  e->queue = q;
+  g_queue_push_tail (q->queue, e);
+
+  if (!q->idle_id && !q->flushing)
+    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+}
diff --git a/spice/output-queue.h b/spice/output-queue.h
new file mode 100644
index 0000000..ab8f6eb
--- /dev/null
+++ b/spice/output-queue.h
@@ -0,0 +1,38 @@ 
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __OUTPUT_QUEUE_H
+#define __OUTPUT_QUEUE_H
+
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define OUTPUT_TYPE_QUEUE output_queue_get_type()
+G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
+
+typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
+
+void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+                        PushedCb pushed_cb, gpointer user_data);
+
+G_END_DECLS
+
+#endif
diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
index f2c7f07..84ab770 100644
--- a/spice/spice-webdavd.c
+++ b/spice/spice-webdavd.c
@@ -39,25 +39,7 @@ 
 #include <avahi-gobject/ga-entry-group.h>
 #endif
 
-typedef struct _OutputQueue
-{
-  guint          refs;
-  GOutputStream *output;
-  gboolean       flushing;
-  guint          idle_id;
-  GQueue        *queue;
-} OutputQueue;
-
-typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
-
-typedef struct _OutputQueueElem
-{
-  OutputQueue  *queue;
-  const guint8 *buf;
-  gsize         size;
-  PushedCb      cb;
-  gpointer      user_data;
-} OutputQueueElem;
+#include "output-queue.h"
 
 typedef struct _ServiceData
 {
@@ -69,139 +51,6 @@  typedef struct _ServiceData
 
 static GCancellable *cancel;
 
-static OutputQueue*
-output_queue_new (GOutputStream *output)
-{
-  OutputQueue *queue = g_new0 (OutputQueue, 1);
-
-  queue->output = g_object_ref (output);
-  queue->queue = g_queue_new ();
-  queue->refs = 1;
-
-  return queue;
-}
-
-static
-void
-output_queue_free (OutputQueue *queue)
-{
-  g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
-  g_warn_if_fail (!queue->flushing);
-  g_warn_if_fail (!queue->idle_id);
-
-  g_queue_free_full (queue->queue, g_free);
-  g_clear_object (&queue->output);
-  g_free (queue);
-}
-
-static OutputQueue*
-output_queue_ref (OutputQueue *q)
-{
-  q->refs++;
-  return q;
-}
-
-static void
-output_queue_unref (OutputQueue *q)
-{
-  g_return_if_fail (q != NULL);
-
-  q->refs--;
-  if (q->refs == 0)
-    output_queue_free (q);
-}
-
-static gboolean output_queue_idle (gpointer user_data);
-
-static void
-output_queue_flush_cb (GObject      *source_object,
-                       GAsyncResult *res,
-                       gpointer      user_data)
-{
-  GError *error = NULL;
-  OutputQueueElem *e = user_data;
-  OutputQueue *q = e->queue;
-
-  g_debug ("flushed");
-  q->flushing = FALSE;
-  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
-                                res, &error);
-  if (error)
-    g_warning ("error: %s", error->message);
-
-  g_clear_error (&error);
-
-  if (!q->idle_id)
-    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-
-  g_free (e);
-  output_queue_unref (q);
-}
-
-static gboolean
-output_queue_idle (gpointer user_data)
-{
-  OutputQueue *q = user_data;
-  OutputQueueElem *e = NULL;
-  GError *error = NULL;
-
-  if (q->flushing)
-    {
-      g_debug ("already flushing");
-      goto end;
-    }
-
-  e = g_queue_pop_head (q->queue);
-  if (!e)
-    {
-      g_debug ("No more data to flush");
-      goto end;
-    }
-
-  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
-  g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
-  if (e->cb)
-    e->cb (q, e->user_data, error);
-
-  if (error)
-      goto end;
-
-  q->flushing = TRUE;
-  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
-
-  q->idle_id = 0;
-  return FALSE;
-
-end:
-  g_clear_error (&error);
-  q->idle_id = 0;
-  g_free (e);
-  output_queue_unref (q);
-
-  return FALSE;
-}
-
-static void
-output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
-                   PushedCb pushed_cb, gpointer user_data)
-{
-  OutputQueueElem *e;
-
-  g_return_if_fail (q != NULL);
-
-  e = g_new (OutputQueueElem, 1);
-  e->buf = buf;
-  e->size = size;
-  e->cb = pushed_cb;
-  e->user_data = user_data;
-  e->queue = q;
-  g_queue_push_tail (q->queue, e);
-
-  if (!q->idle_id && !q->flushing)
-    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-}
-
-
 static struct _DemuxData
 {
   gint64  client;
@@ -264,7 +113,7 @@  add_client (GSocketConnection *client_connection)
   client->client_connection = g_object_ref (client_connection);
   // TODO: check if usage of this idiom is portable, or if we need to check collisions
   client->id = GPOINTER_TO_INT (client_connection);
-  client->queue = output_queue_new (bostream);
+  client->queue = output_queue_new (bostream, cancel);
   g_object_unref (bostream);
 
   g_hash_table_insert (clients, &client->id, client);
@@ -280,7 +129,7 @@  client_free (Client *c)
 
   g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
   g_object_unref (c->client_connection);
-  output_queue_unref (c->queue);
+  g_object_unref (c->queue);
   g_free (c);
 }
 
@@ -732,7 +581,7 @@  open_mux_path (const char *path)
   mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
 #endif
 
-  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
+  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
 }
 
 #ifdef G_OS_WIN32
@@ -1002,12 +851,11 @@  run_service (ServiceData *service_data)
   g_clear_object (&mux_istream);
   g_clear_object (&mux_ostream);
 
-  output_queue_unref (mux_queue);
+  g_clear_object (&mux_queue);
   g_hash_table_unref (clients);
 
   g_socket_service_stop (socket_service);
 
-  mux_queue = NULL;
   g_clear_object (&cancel);
 
 #ifdef G_OS_WIN32

Comments

Hi

On Thu, May 23, 2019 at 10:37 AM Jakub Janků <jjanku@redhat.com> wrote:
>
> OutputQueue is a self-contained unit and as such can be put in
> a separate file to make the spice-webdavd.c less cluttered.
>
> Also, as the current implementation defines output_queue_{ref, unref},
> turn OutputQueue into a GObject which can handle these for us.
>
> Signed-off-by: Jakub Janků <jjanku@redhat.com>

ack in principle, minor coding style issues

The phodav source code tries to follow glib code style. Can you indent
accordingly?

> ---
>  spice/meson.build     |   8 ++-
>  spice/output-queue.c  | 164 ++++++++++++++++++++++++++++++++++++++++++
>  spice/output-queue.h  |  38 ++++++++++
>  spice/spice-webdavd.c | 162 ++---------------------------------------
>  4 files changed, 214 insertions(+), 158 deletions(-)
>  create mode 100644 spice/output-queue.c
>  create mode 100644 spice/output-queue.h
>
> diff --git a/spice/meson.build b/spice/meson.build
> index 6db22cc..06d20e6 100644
> --- a/spice/meson.build
> +++ b/spice/meson.build
> @@ -4,9 +4,15 @@ if host_machine.system() == 'windows'
>    win32_deps += compiler.find_library('mpr')
>  endif
>
> +sources = [
> +  'spice-webdavd.c',
> +  'output-queue.c',
> +  'output-queue.h'
> +]
> +
>  executable(
>    'spice-webdavd',
> -  [ 'spice-webdavd.c' ],
> +  sources,
>    install_dir : sbindir,
>    include_directories : incdir,
>    dependencies : win32_deps + avahi_deps + deps,
> diff --git a/spice/output-queue.c b/spice/output-queue.c
> new file mode 100644
> index 0000000..6991493
> --- /dev/null
> +++ b/spice/output-queue.c
> @@ -0,0 +1,164 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <config.h>
> +
> +#include "output-queue.h"
> +
> +typedef struct _OutputQueueElem
> +{
> +  OutputQueue  *queue;
> +  const guint8 *buf;
> +  gsize         size;
> +  PushedCb      cb;
> +  gpointer      user_data;
> +} OutputQueueElem;
> +
> +struct _OutputQueue
> +{
> +  GObject        parent_instance;
> +  GOutputStream *output;
> +  gboolean       flushing;
> +  guint          idle_id;
> +  GQueue        *queue;
> +  GCancellable  *cancel;
> +};
> +
> +G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT);
> +
> +static void output_queue_init(OutputQueue *self)
> +{
> +    self->queue = g_queue_new ();
> +}
> +
> +static void output_queue_finalize(GObject *obj)
> +{
> +    OutputQueue *self = OUTPUT_QUEUE(obj);
> +
> +    g_warn_if_fail (g_queue_get_length (self->queue) == 0);
> +    g_warn_if_fail (!self->flushing);
> +    g_warn_if_fail (!self->idle_id);
> +
> +    g_queue_free_full (self->queue, g_free);
> +    g_object_unref (self->output);
> +    g_object_unref (self->cancel);
> +
> +    G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj);
> +}
> +
> +static void output_queue_class_init(OutputQueueClass *klass)
> +{
> +    GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
> +    gobject_class->finalize = output_queue_finalize;
> +}
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
> +{
> +  OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL);
> +  self->output = g_object_ref (output);
> +  self->cancel = g_object_ref (cancel);
> +  return self;
> +}
> +
> +static gboolean output_queue_idle (gpointer user_data);
> +
> +static void
> +output_queue_flush_cb (GObject      *source_object,
> +                       GAsyncResult *res,
> +                       gpointer      user_data)
> +{
> +  GError *error = NULL;
> +  OutputQueueElem *e = user_data;
> +  OutputQueue *q = e->queue;
> +
> +  g_debug ("flushed");
> +  q->flushing = FALSE;
> +  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> +                                res, &error);
> +  if (error)
> +    g_warning ("error: %s", error->message);
> +
> +  g_clear_error (&error);
> +
> +  if (!q->idle_id)
> +    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +
> +  g_free (e);
> +  g_object_unref (q);
> +}
> +
> +static gboolean
> +output_queue_idle (gpointer user_data)
> +{
> +  OutputQueue *q = user_data;
> +  OutputQueueElem *e = NULL;
> +  GError *error = NULL;
> +
> +  if (q->flushing)
> +    {
> +      g_debug ("already flushing");
> +      goto end;
> +    }
> +
> +  e = g_queue_pop_head (q->queue);
> +  if (!e)
> +    {
> +      g_debug ("No more data to flush");
> +      goto end;
> +    }
> +
> +  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> +  g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
> +  if (e->cb)
> +    e->cb (q, e->user_data, error);
> +
> +  if (error)
> +      goto end;
> +
> +  q->flushing = TRUE;
> +  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
> +
> +  q->idle_id = 0;
> +  return FALSE;
> +
> +end:
> +  g_clear_error (&error);
> +  q->idle_id = 0;
> +  g_free (e);
> +  g_object_unref (q);
> +
> +  return FALSE;
> +}
> +
> +void
> +output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> +                   PushedCb pushed_cb, gpointer user_data)
> +{
> +  OutputQueueElem *e;
> +
> +  g_return_if_fail (q != NULL);
> +
> +  e = g_new (OutputQueueElem, 1);
> +  e->buf = buf;
> +  e->size = size;
> +  e->cb = pushed_cb;
> +  e->user_data = user_data;
> +  e->queue = q;
> +  g_queue_push_tail (q->queue, e);
> +
> +  if (!q->idle_id && !q->flushing)
> +    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
> +}
> diff --git a/spice/output-queue.h b/spice/output-queue.h
> new file mode 100644
> index 0000000..ab8f6eb
> --- /dev/null
> +++ b/spice/output-queue.h
> @@ -0,0 +1,38 @@
> +/*
> + * Copyright (C) 2019 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef __OUTPUT_QUEUE_H
> +#define __OUTPUT_QUEUE_H
> +
> +#include <gio/gio.h>
> +#include <glib-object.h>
> +
> +G_BEGIN_DECLS
> +
> +#define OUTPUT_TYPE_QUEUE output_queue_get_type()
> +G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
> +
> +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
> +
> +typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> +
> +void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> +                        PushedCb pushed_cb, gpointer user_data);
> +
> +G_END_DECLS
> +
> +#endif
> diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
> index f2c7f07..84ab770 100644
> --- a/spice/spice-webdavd.c
> +++ b/spice/spice-webdavd.c
> @@ -39,25 +39,7 @@
>  #include <avahi-gobject/ga-entry-group.h>
>  #endif
>
> -typedef struct _OutputQueue
> -{
> -  guint          refs;
> -  GOutputStream *output;
> -  gboolean       flushing;
> -  guint          idle_id;
> -  GQueue        *queue;
> -} OutputQueue;
> -
> -typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
> -
> -typedef struct _OutputQueueElem
> -{
> -  OutputQueue  *queue;
> -  const guint8 *buf;
> -  gsize         size;
> -  PushedCb      cb;
> -  gpointer      user_data;
> -} OutputQueueElem;
> +#include "output-queue.h"
>
>  typedef struct _ServiceData
>  {
> @@ -69,139 +51,6 @@ typedef struct _ServiceData
>
>  static GCancellable *cancel;
>
> -static OutputQueue*
> -output_queue_new (GOutputStream *output)
> -{
> -  OutputQueue *queue = g_new0 (OutputQueue, 1);
> -
> -  queue->output = g_object_ref (output);
> -  queue->queue = g_queue_new ();
> -  queue->refs = 1;
> -
> -  return queue;
> -}
> -
> -static
> -void
> -output_queue_free (OutputQueue *queue)
> -{
> -  g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
> -  g_warn_if_fail (!queue->flushing);
> -  g_warn_if_fail (!queue->idle_id);
> -
> -  g_queue_free_full (queue->queue, g_free);
> -  g_clear_object (&queue->output);
> -  g_free (queue);
> -}
> -
> -static OutputQueue*
> -output_queue_ref (OutputQueue *q)
> -{
> -  q->refs++;
> -  return q;
> -}
> -
> -static void
> -output_queue_unref (OutputQueue *q)
> -{
> -  g_return_if_fail (q != NULL);
> -
> -  q->refs--;
> -  if (q->refs == 0)
> -    output_queue_free (q);
> -}
> -
> -static gboolean output_queue_idle (gpointer user_data);
> -
> -static void
> -output_queue_flush_cb (GObject      *source_object,
> -                       GAsyncResult *res,
> -                       gpointer      user_data)
> -{
> -  GError *error = NULL;
> -  OutputQueueElem *e = user_data;
> -  OutputQueue *q = e->queue;
> -
> -  g_debug ("flushed");
> -  q->flushing = FALSE;
> -  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
> -                                res, &error);
> -  if (error)
> -    g_warning ("error: %s", error->message);
> -
> -  g_clear_error (&error);
> -
> -  if (!q->idle_id)
> -    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -
> -  g_free (e);
> -  output_queue_unref (q);
> -}
> -
> -static gboolean
> -output_queue_idle (gpointer user_data)
> -{
> -  OutputQueue *q = user_data;
> -  OutputQueueElem *e = NULL;
> -  GError *error = NULL;
> -
> -  if (q->flushing)
> -    {
> -      g_debug ("already flushing");
> -      goto end;
> -    }
> -
> -  e = g_queue_pop_head (q->queue);
> -  if (!e)
> -    {
> -      g_debug ("No more data to flush");
> -      goto end;
> -    }
> -
> -  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
> -  g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
> -  if (e->cb)
> -    e->cb (q, e->user_data, error);
> -
> -  if (error)
> -      goto end;
> -
> -  q->flushing = TRUE;
> -  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
> -
> -  q->idle_id = 0;
> -  return FALSE;
> -
> -end:
> -  g_clear_error (&error);
> -  q->idle_id = 0;
> -  g_free (e);
> -  output_queue_unref (q);
> -
> -  return FALSE;
> -}
> -
> -static void
> -output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
> -                   PushedCb pushed_cb, gpointer user_data)
> -{
> -  OutputQueueElem *e;
> -
> -  g_return_if_fail (q != NULL);
> -
> -  e = g_new (OutputQueueElem, 1);
> -  e->buf = buf;
> -  e->size = size;
> -  e->cb = pushed_cb;
> -  e->user_data = user_data;
> -  e->queue = q;
> -  g_queue_push_tail (q->queue, e);
> -
> -  if (!q->idle_id && !q->flushing)
> -    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
> -}
> -
> -
>  static struct _DemuxData
>  {
>    gint64  client;
> @@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection)
>    client->client_connection = g_object_ref (client_connection);
>    // TODO: check if usage of this idiom is portable, or if we need to check collisions
>    client->id = GPOINTER_TO_INT (client_connection);
> -  client->queue = output_queue_new (bostream);
> +  client->queue = output_queue_new (bostream, cancel);
>    g_object_unref (bostream);
>
>    g_hash_table_insert (clients, &client->id, client);
> @@ -280,7 +129,7 @@ client_free (Client *c)
>
>    g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
>    g_object_unref (c->client_connection);
> -  output_queue_unref (c->queue);
> +  g_object_unref (c->queue);
>    g_free (c);
>  }
>
> @@ -732,7 +581,7 @@ open_mux_path (const char *path)
>    mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
>  #endif
>
> -  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
> +  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
>  }
>
>  #ifdef G_OS_WIN32
> @@ -1002,12 +851,11 @@ run_service (ServiceData *service_data)
>    g_clear_object (&mux_istream);
>    g_clear_object (&mux_ostream);
>
> -  output_queue_unref (mux_queue);
> +  g_clear_object (&mux_queue);
>    g_hash_table_unref (clients);
>
>    g_socket_service_stop (socket_service);
>
> -  mux_queue = NULL;
>    g_clear_object (&cancel);
>
>  #ifdef G_OS_WIN32
> --
> 2.21.0
>
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel