[Spice-devel,spice-gtk,v2,02/16] file-xfer: introduce flush_callback and flush_done

Submitted by Victor Toso on May 23, 2016, 11:50 a.m.

Details

Message ID 1464004253-17924-3-git-send-email-victortoso@redhat.com
State Superseded
Headers show
Series "separate SpiceFileTransferTask logic from channel-main" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Victor Toso May 23, 2016, 11:50 a.m.
By introducing a flush_callback such as SpiceFileTransferTaskFlushCb
SpiceFileTransferTask becomes agnostic on how channel-main flushes
the data.

The spice_file_transfer_task_flush_done() function is now introduced
to tell SpiceFileTransferTask that flushing is over and we can read
more data if no error has happened.

Helper functions spice_file_transfer_task_get_channel() and
spice_file_transfer_task_get_cancellable() were created to retrieve
properties from SpiceFileTransferTask without the need to access its
private structure.

This change is related to split SpiceFileTransferTask from
channel-main.
---
 src/channel-main.c | 157 +++++++++++++++++++++++++++++++++++------------------
 1 file changed, 105 insertions(+), 52 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/channel-main.c b/src/channel-main.c
index 4fb514e..f4e8c2b 100644
--- a/src/channel-main.c
+++ b/src/channel-main.c
@@ -77,6 +77,16 @@  G_DEFINE_TYPE(SpiceFileTransferTask, spice_file_transfer_task, G_TYPE_OBJECT)
         (G_TYPE_INSTANCE_GET_PRIVATE((o), SPICE_TYPE_FILE_TRANSFER_TASK, SpiceFileTransferTaskPrivate))
 
 #define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
+
+typedef void (*SpiceFileTransferTaskFlushCb)(SpiceFileTransferTask *xfer_task,
+                                             void *buffer,
+                                             gssize count,
+                                             gpointer user_data);
+
+static SpiceMainChannel *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self);
+static GCancellable *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self);
+static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self, GError *error);
+
 struct _SpiceFileTransferTaskPrivate
 
 /* private */
@@ -90,6 +100,8 @@  struct _SpiceFileTransferTaskPrivate
     GCancellable                   *cancellable;
     GFileProgressCallback          progress_callback;
     gpointer                       progress_callback_data;
+    SpiceFileTransferTaskFlushCb   flush_callback;
+    gpointer                       flush_callback_data;
     GAsyncReadyCallback            callback;
     gpointer                       user_data;
     char                           *buffer;
@@ -1851,70 +1863,46 @@  static void file_xfer_data_flushed_cb(GObject *source_object,
     SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
     GError *error = NULL;
 
-    self->priv->pending = FALSE;
     file_xfer_flush_finish(channel, res, &error);
-    if (error || self->priv->error) {
-        spice_file_transfer_task_completed(self, error);
-        return;
-    }
-
-    if (spice_util_get_debug()) {
-        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
-        gint64 now = g_get_monotonic_time();
-
-        if (interval < now - self->priv->last_update) {
-            gchar *basename = g_file_get_basename(self->priv->file);
-            self->priv->last_update = now;
-            SPICE_DEBUG("transferred %.2f%% of the file %s",
-                        100.0 * self->priv->read_bytes / self->priv->file_size, basename);
-            g_free(basename);
-        }
-    }
-
-    if (self->priv->progress_callback) {
-        goffset read = 0;
-        goffset total = 0;
-        SpiceMainChannel *main_channel = self->priv->channel;
-        GHashTableIter iter;
-        gpointer key, value;
-
-        /* since the progress_callback does not have a parameter to indicate
-         * which file the progress is associated with, report progress on all
-         * current transfers */
-        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
-        while (g_hash_table_iter_next(&iter, &key, &value)) {
-            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
-            read += t->priv->read_bytes;
-            total += t->priv->file_size;
-        }
-
-        self->priv->progress_callback(read, total, self->priv->progress_callback_data);
-    }
-
-    /* Read more data */
-    file_xfer_continue_read(self);
+    spice_file_transfer_task_flush_done(self, error);
 }
 
-static void file_xfer_queue(SpiceFileTransferTask *self, int data_size)
+static void file_xfer_queue(SpiceFileTransferTask *xfer_task, void *buffer, int data_size)
 {
     VDAgentFileXferDataMessage msg;
-    SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(self->priv->channel);
+    SpiceMainChannel *channel = spice_file_transfer_task_get_channel(xfer_task);
 
-    msg.id = self->priv->id;
+    g_object_get(xfer_task, "id", &msg.id, NULL);
     msg.size = data_size;
     agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA,
                          &msg, sizeof(msg),
-                         self->priv->buffer, data_size, NULL);
+                         buffer, data_size, NULL);
     spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
 }
 
+static void file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
+                                     void *buffer,
+                                     gssize count,
+                                     gpointer user_data)
+{
+    SpiceMainChannel *main_channel;
+    GCancellable *cancellable;
+
+    file_xfer_queue(xfer_task, buffer, count);
+    if (count == 0)
+        return;
+
+    main_channel = spice_file_transfer_task_get_channel(xfer_task);
+    cancellable = spice_file_transfer_task_get_cancellable(xfer_task);
+    file_xfer_flush_async(main_channel, cancellable, file_xfer_data_flushed_cb, xfer_task);
+}
+
 /* main context */
 static void file_xfer_read_cb(GObject *source_object,
                               GAsyncResult *res,
                               gpointer user_data)
 {
     SpiceFileTransferTask *self = user_data;
-    SpiceMainChannel *channel = self->priv->channel;
     gssize count;
     GError *error = NULL;
 
@@ -1930,12 +1918,11 @@  static void file_xfer_read_cb(GObject *source_object,
     if (count > 0 || self->priv->file_size == 0) {
         self->priv->read_bytes += count;
         g_object_notify(G_OBJECT(self), "progress");
-        file_xfer_queue(self, count);
-        if (count == 0)
-            return;
-        file_xfer_flush_async(channel, self->priv->cancellable,
-                              file_xfer_data_flushed_cb, self);
-        self->priv->pending = TRUE;
+
+        if (self->priv->flush_callback) {
+            self->priv->pending = TRUE;
+            self->priv->flush_callback(self, self->priv->buffer, count, self->priv->flush_callback_data);
+        }
     } else if (error) {
         spice_channel_wakeup(SPICE_CHANNEL(self->priv->channel), FALSE);
         spice_file_transfer_task_completed(self, error);
@@ -3083,6 +3070,8 @@  static void file_xfer_send_start_msg_async(SpiceMainChannel *channel,
                                            GCancellable *cancellable,
                                            GFileProgressCallback progress_callback,
                                            gpointer progress_callback_data,
+                                           SpiceFileTransferTaskFlushCb flush_callback,
+                                           gpointer flush_callback_data,
                                            GAsyncReadyCallback callback,
                                            gpointer user_data)
 {
@@ -3102,6 +3091,8 @@  static void file_xfer_send_start_msg_async(SpiceMainChannel *channel,
         task->priv->flags = flags;
         task->priv->progress_callback = progress_callback;
         task->priv->progress_callback_data = progress_callback_data;
+        task->priv->flush_callback = flush_callback;
+        task->priv->flush_callback_data = flush_callback_data;
         task->priv->callback = callback;
         task->priv->user_data = user_data;
 
@@ -3191,6 +3182,8 @@  void spice_main_file_copy_async(SpiceMainChannel *channel,
                                    cancellable,
                                    progress_callback,
                                    progress_callback_data,
+                                   file_xfer_flush_callback,
+                                   NULL,
                                    callback,
                                    user_data);
 }
@@ -3218,6 +3211,66 @@  gboolean spice_main_file_copy_finish(SpiceMainChannel *channel,
     return g_task_propagate_boolean(task, error);
 }
 
+static SpiceMainChannel *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self)
+{
+    g_assert_nonnull(self);
+    return self->priv->channel;
+}
+
+static GCancellable *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self)
+{
+    g_assert_nonnull(self);
+    return self->priv->cancellable;
+}
+
+static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self, GError *error)
+{
+    g_assert_nonnull(self);
+    g_return_if_fail(self->priv->pending);
+
+    self->priv->pending = FALSE;
+
+    if (error || self->priv->error) {
+        spice_file_transfer_task_completed(self, error);
+        return;
+    }
+
+    if (spice_util_get_debug()) {
+        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
+        gint64 now = g_get_monotonic_time();
+
+        if (interval < now - self->priv->last_update) {
+            gchar *basename = g_file_get_basename(self->priv->file);
+            self->priv->last_update = now;
+            SPICE_DEBUG("transferred %.2f%% of the file %s",
+                        100.0 * self->priv->read_bytes / self->priv->file_size, basename);
+            g_free(basename);
+        }
+    }
+
+    if (self->priv->progress_callback) {
+        goffset read = 0;
+        goffset total = 0;
+        SpiceMainChannel *main_channel = self->priv->channel;
+        GHashTableIter iter;
+        gpointer key, value;
+
+        /* since the progress_callback does not have a parameter to indicate
+         * which file the progress is associated with, report progress on all
+         * current transfers */
+        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
+        while (g_hash_table_iter_next(&iter, &key, &value)) {
+            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
+            read += t->priv->read_bytes;
+            total += t->priv->file_size;
+        }
+
+        self->priv->progress_callback(read, total, self->priv->progress_callback_data);
+    }
+
+    /* Read more data */
+    file_xfer_continue_read(self);
+}
 
 
 static void

Comments

Hi Victor,

On Mon, 2016-05-23 at 13:50 +0200, Victor Toso wrote:
> By introducing a flush_callback such as SpiceFileTransferTaskFlushCb
> SpiceFileTransferTask becomes agnostic on how channel-main flushes
> the data.
> The spice_file_transfer_task_flush_done() function is now introduced
> to tell SpiceFileTransferTask that flushing is over and we can read
> more data if no error has happened.
> 
> Helper functions spice_file_transfer_task_get_channel() and
> spice_file_transfer_task_get_cancellable() were created to retrieve
> properties from SpiceFileTransferTask without the need to access its
> private structure.
> 

Can it be done in more steps please - _get_channel() replacing the access to the
private structure in a separate patch ?


> This change is related to split SpiceFileTransferTask from
> channel-main.
> ---
>  src/channel-main.c | 157 +++++++++++++++++++++++++++++++++++-----------------
> -
>  1 file changed, 105 insertions(+), 52 deletions(-)
> 
> diff --git a/src/channel-main.c b/src/channel-main.c
> index 4fb514e..f4e8c2b 100644
> --- a/src/channel-main.c
> +++ b/src/channel-main.c
> @@ -77,6 +77,16 @@ G_DEFINE_TYPE(SpiceFileTransferTask,
> spice_file_transfer_task, G_TYPE_OBJECT)
>          (G_TYPE_INSTANCE_GET_PRIVATE((o), SPICE_TYPE_FILE_TRANSFER_TASK,
> SpiceFileTransferTaskPrivate))
>  
>  #define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
> +
> +typedef void (*SpiceFileTransferTaskFlushCb)(SpiceFileTransferTask
> *xfer_task,
> +                                             void *buffer,
> +                                             gssize count,
> +                                             gpointer user_data);
> +
> +static SpiceMainChannel
> *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self);
> +static GCancellable
> *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self);
> +static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self,
> GError *error);
> +
>  struct _SpiceFileTransferTaskPrivate
>  
>  /* private */
> @@ -90,6 +100,8 @@ struct _SpiceFileTransferTaskPrivate
>      GCancellable                   *cancellable;
>      GFileProgressCallback          progress_callback;
>      gpointer                       progress_callback_data;
> +    SpiceFileTransferTaskFlushCb   flush_callback;
> +    gpointer                       flush_callback_data;
>      GAsyncReadyCallback            callback;
>      gpointer                       user_data;
>      char                           *buffer;
> @@ -1851,70 +1863,46 @@ static void file_xfer_data_flushed_cb(GObject
> *source_object,
>      SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
>      GError *error = NULL;
>  
> -    self->priv->pending = FALSE;
>      file_xfer_flush_finish(channel, res, &error);
> -    if (error || self->priv->error) {
> -        spice_file_transfer_task_completed(self, error);
> -        return;
> -    }
> -
> -    if (spice_util_get_debug()) {
> -        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
> -        gint64 now = g_get_monotonic_time();
> -
> -        if (interval < now - self->priv->last_update) {
> -            gchar *basename = g_file_get_basename(self->priv->file);
> -            self->priv->last_update = now;
> -            SPICE_DEBUG("transferred %.2f%% of the file %s",
> -                        100.0 * self->priv->read_bytes / self->priv-
> >file_size, basename);
> -            g_free(basename);
> -        }
> -    }
> -
> -    if (self->priv->progress_callback) {
> -        goffset read = 0;
> -        goffset total = 0;
> -        SpiceMainChannel *main_channel = self->priv->channel;
> -        GHashTableIter iter;
> -        gpointer key, value;
> -
> -        /* since the progress_callback does not have a parameter to indicate
> -         * which file the progress is associated with, report progress on all
> -         * current transfers */
> -        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> -        while (g_hash_table_iter_next(&iter, &key, &value)) {
> -            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> -            read += t->priv->read_bytes;
> -            total += t->priv->file_size;
> -        }
> -
> -        self->priv->progress_callback(read, total, self->priv-
> >progress_callback_data);
> -    }
> -
> -    /* Read more data */
> -    file_xfer_continue_read(self);
> +    spice_file_transfer_task_flush_done(self, error);
>  }
>  
> -static void file_xfer_queue(SpiceFileTransferTask *self, int data_size)
> +static void file_xfer_queue(SpiceFileTransferTask *xfer_task, void *buffer,
> int data_size)
>  {
>      VDAgentFileXferDataMessage msg;
> -    SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(self->priv->channel);
> +    SpiceMainChannel *channel =
> spice_file_transfer_task_get_channel(xfer_task);

what if the channel is NULL (not possible now?), but it is worth checking
>  
> -    msg.id = self->priv->id;
> +    g_object_get(xfer_task, "id", &msg.id, NULL);
>      msg.size = data_size;
>      agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA,
>                           &msg, sizeof(msg),
> -                         self->priv->buffer, data_size, NULL);
> +                         buffer, data_size, NULL);
>      spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
>  }
>  
> +static void file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
> +                                     void *buffer,
> +                                     gssize count,
> +                                     gpointer user_data)
> +{
> +    SpiceMainChannel *main_channel;
> +    GCancellable *cancellable;
> +
> +    file_xfer_queue(xfer_task, buffer, count);
> +    if (count == 0)
> +        return;
> +
> +    main_channel = spice_file_transfer_task_get_channel(xfer_task);
> +    cancellable = spice_file_transfer_task_get_cancellable(xfer_task);
> +    file_xfer_flush_async(main_channel, cancellable,
> file_xfer_data_flushed_cb, xfer_task);
> +}
> +
>  /* main context */
>  static void file_xfer_read_cb(GObject *source_object,
>                                GAsyncResult *res,
>                                gpointer user_data)
>  {
>      SpiceFileTransferTask *self = user_data;
> -    SpiceMainChannel *channel = self->priv->channel;
>      gssize count;
>      GError *error = NULL;
>  
> @@ -1930,12 +1918,11 @@ static void file_xfer_read_cb(GObject *source_object,
>      if (count > 0 || self->priv->file_size == 0) {
>          self->priv->read_bytes += count;
>          g_object_notify(G_OBJECT(self), "progress");
> -        file_xfer_queue(self, count);
> -        if (count == 0)
> -            return;
> -        file_xfer_flush_async(channel, self->priv->cancellable,
> -                              file_xfer_data_flushed_cb, self);
> -        self->priv->pending = TRUE;
> +
> +        if (self->priv->flush_callback) {
> +            self->priv->pending = TRUE;
> +            self->priv->flush_callback(self, self->priv->buffer, count, self-
> >priv->flush_callback_data);
> +        }
>      } else if (error) {
>          spice_channel_wakeup(SPICE_CHANNEL(self->priv->channel), FALSE);
>          spice_file_transfer_task_completed(self, error);
> @@ -3083,6 +3070,8 @@ static void
> file_xfer_send_start_msg_async(SpiceMainChannel *channel,
>                                             GCancellable *cancellable,
>                                             GFileProgressCallback
> progress_callback,
>                                             gpointer progress_callback_data,
> +                                           SpiceFileTransferTaskFlushCb
> flush_callback,
> +                                           gpointer flush_callback_data,
>                                             GAsyncReadyCallback callback,
>                                             gpointer user_data)
>  {
> @@ -3102,6 +3091,8 @@ static void
> file_xfer_send_start_msg_async(SpiceMainChannel *channel,
>          task->priv->flags = flags;
>          task->priv->progress_callback = progress_callback;
>          task->priv->progress_callback_data = progress_callback_data;
> +        task->priv->flush_callback = flush_callback;
> +        task->priv->flush_callback_data = flush_callback_data;
>          task->priv->callback = callback;
>          task->priv->user_data = user_data;
>  
> @@ -3191,6 +3182,8 @@ void spice_main_file_copy_async(SpiceMainChannel
> *channel,
>                                     cancellable,
>                                     progress_callback,
>                                     progress_callback_data,
> +                                   file_xfer_flush_callback,
> +                                   NULL,
>                                     callback,
>                                     user_data);
>  }
> @@ -3218,6 +3211,66 @@ gboolean spice_main_file_copy_finish(SpiceMainChannel
> *channel,
>      return g_task_propagate_boolean(task, error);
>  }
>  
> +static SpiceMainChannel
> *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self)
> +{
> +    g_assert_nonnull(self);
please, don't assert because of transfer task, use g_return
> +    return self->priv->channel;
> +}
> +
> +static GCancellable
> *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self)
> +{
> +    g_assert_nonnull(self);
> +    return self->priv->cancellable;
> +}
> +
> +static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self,
> GError *error)
> +{
> +    g_assert_nonnull(self);
> +    g_return_if_fail(self->priv->pending);
> +
> +    self->priv->pending = FALSE;
> +
> +    if (error || self->priv->error) {
> +        spice_file_transfer_task_completed(self, error);
> +        return;
> +    }
> +
> +    if (spice_util_get_debug()) {
> +        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
> +        gint64 now = g_get_monotonic_time();
> +
> +        if (interval < now - self->priv->last_update) {
> +            gchar *basename = g_file_get_basename(self->priv->file);
> +            self->priv->last_update = now;
> +            SPICE_DEBUG("transferred %.2f%% of the file %s",
> +                        100.0 * self->priv->read_bytes / self->priv-
> >file_size, basename);
> +            g_free(basename);
> +        }
> +    }
> +
> +    if (self->priv->progress_callback) {
> +        goffset read = 0;
> +        goffset total = 0;
> +        SpiceMainChannel *main_channel = self->priv->channel;
> +        GHashTableIter iter;
> +        gpointer key, value;
> +
> +        /* since the progress_callback does not have a parameter to indicate
> +         * which file the progress is associated with, report progress on all
> +         * current transfers */
> +        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> +        while (g_hash_table_iter_next(&iter, &key, &value)) {
> +            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> +            read += t->priv->read_bytes;
> +            total += t->priv->file_size;
> +        }
> +
> +        self->priv->progress_callback(read, total, self->priv-
> >progress_callback_data);
> +    }
> +
> +    /* Read more data */
> +    file_xfer_continue_read(self);
> +}
>  
>  
>  static void

Reviewed-by: Pavel Grunt <pgrunt@redhat.com>
Hi,

On Fri, May 27, 2016 at 10:54:15AM +0200, Pavel Grunt wrote:
> Hi Victor,
>
> On Mon, 2016-05-23 at 13:50 +0200, Victor Toso wrote:
> > By introducing a flush_callback such as SpiceFileTransferTaskFlushCb
> > SpiceFileTransferTask becomes agnostic on how channel-main flushes
> > the data.
> > The spice_file_transfer_task_flush_done() function is now introduced
> > to tell SpiceFileTransferTask that flushing is over and we can read
> > more data if no error has happened.
> >
> > Helper functions spice_file_transfer_task_get_channel() and
> > spice_file_transfer_task_get_cancellable() were created to retrieve
> > properties from SpiceFileTransferTask without the need to access its
> > private structure.
> >
>
> Can it be done in more steps please - _get_channel() replacing the
> access to the private structure in a separate patch ?

Sure!

>
>
> > This change is related to split SpiceFileTransferTask from
> > channel-main.
> > ---
> >  src/channel-main.c | 157 +++++++++++++++++++++++++++++++++++-----------------
> > -
> >  1 file changed, 105 insertions(+), 52 deletions(-)
> > 
> > diff --git a/src/channel-main.c b/src/channel-main.c
> > index 4fb514e..f4e8c2b 100644
> > --- a/src/channel-main.c
> > +++ b/src/channel-main.c
> > @@ -77,6 +77,16 @@ G_DEFINE_TYPE(SpiceFileTransferTask,
> > spice_file_transfer_task, G_TYPE_OBJECT)
> >          (G_TYPE_INSTANCE_GET_PRIVATE((o), SPICE_TYPE_FILE_TRANSFER_TASK,
> > SpiceFileTransferTaskPrivate))
> >  
> >  #define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
> > +
> > +typedef void (*SpiceFileTransferTaskFlushCb)(SpiceFileTransferTask
> > *xfer_task,
> > +                                             void *buffer,
> > +                                             gssize count,
> > +                                             gpointer user_data);
> > +
> > +static SpiceMainChannel
> > *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self);
> > +static GCancellable
> > *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self);
> > +static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self,
> > GError *error);
> > +
> >  struct _SpiceFileTransferTaskPrivate
> >  
> >  /* private */
> > @@ -90,6 +100,8 @@ struct _SpiceFileTransferTaskPrivate
> >      GCancellable                   *cancellable;
> >      GFileProgressCallback          progress_callback;
> >      gpointer                       progress_callback_data;
> > +    SpiceFileTransferTaskFlushCb   flush_callback;
> > +    gpointer                       flush_callback_data;
> >      GAsyncReadyCallback            callback;
> >      gpointer                       user_data;
> >      char                           *buffer;
> > @@ -1851,70 +1863,46 @@ static void file_xfer_data_flushed_cb(GObject
> > *source_object,
> >      SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
> >      GError *error = NULL;
> >  
> > -    self->priv->pending = FALSE;
> >      file_xfer_flush_finish(channel, res, &error);
> > -    if (error || self->priv->error) {
> > -        spice_file_transfer_task_completed(self, error);
> > -        return;
> > -    }
> > -
> > -    if (spice_util_get_debug()) {
> > -        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
> > -        gint64 now = g_get_monotonic_time();
> > -
> > -        if (interval < now - self->priv->last_update) {
> > -            gchar *basename = g_file_get_basename(self->priv->file);
> > -            self->priv->last_update = now;
> > -            SPICE_DEBUG("transferred %.2f%% of the file %s",
> > -                        100.0 * self->priv->read_bytes / self->priv-
> > >file_size, basename);
> > -            g_free(basename);
> > -        }
> > -    }
> > -
> > -    if (self->priv->progress_callback) {
> > -        goffset read = 0;
> > -        goffset total = 0;
> > -        SpiceMainChannel *main_channel = self->priv->channel;
> > -        GHashTableIter iter;
> > -        gpointer key, value;
> > -
> > -        /* since the progress_callback does not have a parameter to indicate
> > -         * which file the progress is associated with, report progress on all
> > -         * current transfers */
> > -        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> > -        while (g_hash_table_iter_next(&iter, &key, &value)) {
> > -            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> > -            read += t->priv->read_bytes;
> > -            total += t->priv->file_size;
> > -        }
> > -
> > -        self->priv->progress_callback(read, total, self->priv-
> > >progress_callback_data);
> > -    }
> > -
> > -    /* Read more data */
> > -    file_xfer_continue_read(self);
> > +    spice_file_transfer_task_flush_done(self, error);
> >  }
> >  
> > -static void file_xfer_queue(SpiceFileTransferTask *self, int data_size)
> > +static void file_xfer_queue(SpiceFileTransferTask *xfer_task, void *buffer,
> > int data_size)
> >  {
> >      VDAgentFileXferDataMessage msg;
> > -    SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(self->priv->channel);
> > +    SpiceMainChannel *channel =
> > spice_file_transfer_task_get_channel(xfer_task);
>
> what if the channel is NULL (not possible now?), but it is worth
> checking

Right, this was not considered before so I did not really think about
it. In case channel could be NULL due any kind of problem, the
transfer-task should be cancelled and no flush should really happen.

I'll include the check.

> >  
> > -    msg.id = self->priv->id;
> > +    g_object_get(xfer_task, "id", &msg.id, NULL);
> >      msg.size = data_size;
> >      agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA,
> >                           &msg, sizeof(msg),
> > -                         self->priv->buffer, data_size, NULL);
> > +                         buffer, data_size, NULL);
> >      spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
> >  }
> >  
> > +static void file_xfer_flush_callback(SpiceFileTransferTask *xfer_task,
> > +                                     void *buffer,
> > +                                     gssize count,
> > +                                     gpointer user_data)
> > +{
> > +    SpiceMainChannel *main_channel;
> > +    GCancellable *cancellable;
> > +
> > +    file_xfer_queue(xfer_task, buffer, count);
> > +    if (count == 0)
> > +        return;
> > +
> > +    main_channel = spice_file_transfer_task_get_channel(xfer_task);
> > +    cancellable = spice_file_transfer_task_get_cancellable(xfer_task);
> > +    file_xfer_flush_async(main_channel, cancellable,
> > file_xfer_data_flushed_cb, xfer_task);
> > +}
> > +
> >  /* main context */
> >  static void file_xfer_read_cb(GObject *source_object,
> >                                GAsyncResult *res,
> >                                gpointer user_data)
> >  {
> >      SpiceFileTransferTask *self = user_data;
> > -    SpiceMainChannel *channel = self->priv->channel;
> >      gssize count;
> >      GError *error = NULL;
> >  
> > @@ -1930,12 +1918,11 @@ static void file_xfer_read_cb(GObject *source_object,
> >      if (count > 0 || self->priv->file_size == 0) {
> >          self->priv->read_bytes += count;
> >          g_object_notify(G_OBJECT(self), "progress");
> > -        file_xfer_queue(self, count);
> > -        if (count == 0)
> > -            return;
> > -        file_xfer_flush_async(channel, self->priv->cancellable,
> > -                              file_xfer_data_flushed_cb, self);
> > -        self->priv->pending = TRUE;
> > +
> > +        if (self->priv->flush_callback) {
> > +            self->priv->pending = TRUE;
> > +            self->priv->flush_callback(self, self->priv->buffer, count, self-
> > >priv->flush_callback_data);
> > +        }
> >      } else if (error) {
> >          spice_channel_wakeup(SPICE_CHANNEL(self->priv->channel), FALSE);
> >          spice_file_transfer_task_completed(self, error);
> > @@ -3083,6 +3070,8 @@ static void
> > file_xfer_send_start_msg_async(SpiceMainChannel *channel,
> >                                             GCancellable *cancellable,
> >                                             GFileProgressCallback
> > progress_callback,
> >                                             gpointer progress_callback_data,
> > +                                           SpiceFileTransferTaskFlushCb
> > flush_callback,
> > +                                           gpointer flush_callback_data,
> >                                             GAsyncReadyCallback callback,
> >                                             gpointer user_data)
> >  {
> > @@ -3102,6 +3091,8 @@ static void
> > file_xfer_send_start_msg_async(SpiceMainChannel *channel,
> >          task->priv->flags = flags;
> >          task->priv->progress_callback = progress_callback;
> >          task->priv->progress_callback_data = progress_callback_data;
> > +        task->priv->flush_callback = flush_callback;
> > +        task->priv->flush_callback_data = flush_callback_data;
> >          task->priv->callback = callback;
> >          task->priv->user_data = user_data;
> >  
> > @@ -3191,6 +3182,8 @@ void spice_main_file_copy_async(SpiceMainChannel
> > *channel,
> >                                     cancellable,
> >                                     progress_callback,
> >                                     progress_callback_data,
> > +                                   file_xfer_flush_callback,
> > +                                   NULL,
> >                                     callback,
> >                                     user_data);
> >  }
> > @@ -3218,6 +3211,66 @@ gboolean spice_main_file_copy_finish(SpiceMainChannel
> > *channel,
> >      return g_task_propagate_boolean(task, error);
> >  }
> >  
> > +static SpiceMainChannel
> > *spice_file_transfer_task_get_channel(SpiceFileTransferTask *self)
> > +{
> > +    g_assert_nonnull(self);
> please, don't assert because of transfer task, use g_return

Although the discussion was more server-side, I would agree that
internal api could assert in the situation above in spice-gtk too.

But I can change it if you don't agree.

> > +    return self->priv->channel;
> > +}
> > +
> > +static GCancellable
> > *spice_file_transfer_task_get_cancellable(SpiceFileTransferTask *self)
> > +{
> > +    g_assert_nonnull(self);
> > +    return self->priv->cancellable;
> > +}
> > +
> > +static void spice_file_transfer_task_flush_done(SpiceFileTransferTask *self,
> > GError *error)
> > +{
> > +    g_assert_nonnull(self);
> > +    g_return_if_fail(self->priv->pending);
> > +
> > +    self->priv->pending = FALSE;
> > +
> > +    if (error || self->priv->error) {
> > +        spice_file_transfer_task_completed(self, error);
> > +        return;
> > +    }
> > +
> > +    if (spice_util_get_debug()) {
> > +        const GTimeSpan interval = 20 * G_TIME_SPAN_SECOND;
> > +        gint64 now = g_get_monotonic_time();
> > +
> > +        if (interval < now - self->priv->last_update) {
> > +            gchar *basename = g_file_get_basename(self->priv->file);
> > +            self->priv->last_update = now;
> > +            SPICE_DEBUG("transferred %.2f%% of the file %s",
> > +                        100.0 * self->priv->read_bytes / self->priv-
> > >file_size, basename);
> > +            g_free(basename);
> > +        }
> > +    }
> > +
> > +    if (self->priv->progress_callback) {
> > +        goffset read = 0;
> > +        goffset total = 0;
> > +        SpiceMainChannel *main_channel = self->priv->channel;
> > +        GHashTableIter iter;
> > +        gpointer key, value;
> > +
> > +        /* since the progress_callback does not have a parameter to indicate
> > +         * which file the progress is associated with, report progress on all
> > +         * current transfers */
> > +        g_hash_table_iter_init(&iter, main_channel->priv->file_xfer_tasks);
> > +        while (g_hash_table_iter_next(&iter, &key, &value)) {
> > +            SpiceFileTransferTask *t = (SpiceFileTransferTask *)value;
> > +            read += t->priv->read_bytes;
> > +            total += t->priv->file_size;
> > +        }
> > +
> > +        self->priv->progress_callback(read, total, self->priv-
> > >progress_callback_data);
> > +    }
> > +
> > +    /* Read more data */
> > +    file_xfer_continue_read(self);
> > +}
> >  
> >  
> >  static void
>
> Reviewed-by: Pavel Grunt <pgrunt@redhat.com>

Thanks!
  toso