[Spice-devel,client,2/3] streaming: Remove the video decoder's dependency on SpiceMsgIn messages

Submitted by Francois Gouget on April 4, 2017, 3:45 p.m.

Details

Message ID c09c35a8ef7cce4ab32fb153c41139f09a4b930e.1491320131.git.fgouget@free.fr
State New
Headers show
Series "Abstract video streaming from the network details" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Francois Gouget April 4, 2017, 3:45 p.m.
Code-wise this improves the separation between networking and the video
decoding.
It also makes it easier to reuse the latter should the client one day
receive video streams through other messages.

Signed-off-by: Francois Gouget <fgouget@codeweavers.com>
---
 src/channel-display-gst.c   | 128 ++++++++++++++++++++------------------------
 src/channel-display-mjpeg.c |  74 +++++++++++++------------
 src/channel-display-priv.h  |  24 +++++++--
 src/channel-display.c       |  35 ++++++------
 4 files changed, 134 insertions(+), 127 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
index c4190b2d..93b61bab 100644
--- a/src/channel-display-gst.c
+++ b/src/channel-display-gst.c
@@ -86,31 +86,30 @@  G_STATIC_ASSERT(G_N_ELEMENTS(gst_opts) <= SPICE_VIDEO_CODEC_TYPE_ENUM_END);
 #define VALID_VIDEO_CODEC_TYPE(codec) \
     (codec > 0 && codec < G_N_ELEMENTS(gst_opts))
 
-/* ---------- SpiceFrame ---------- */
+/* ---------- SpiceMetaFrame ---------- */
 
-typedef struct _SpiceFrame {
-    GstClockTime timestamp;
-    SpiceMsgIn *msg;
-    GstSample *sample;
-} SpiceFrame;
+typedef struct SpiceMetaFrame {
+    SpiceFrame *frame;
+    GstClockTime pts;
+    void *sample;
+} SpiceMetaFrame;
 
-static SpiceFrame *create_frame(GstBuffer *buffer, SpiceMsgIn *msg)
+static SpiceMetaFrame *create_meta_frame(SpiceFrame *frame, GstBuffer *buffer)
 {
-    SpiceFrame *frame = spice_new(SpiceFrame, 1);
-    frame->timestamp = GST_BUFFER_PTS(buffer);
-    frame->msg = msg;
-    spice_msg_in_ref(msg);
-    frame->sample = NULL;
-    return frame;
+    SpiceMetaFrame *meta = spice_new(SpiceMetaFrame, 1);
+    meta->frame = frame;
+    meta->pts = GST_BUFFER_PTS(buffer);
+    meta->sample = NULL;
+    return meta;
 }
 
-static void free_frame(SpiceFrame *frame)
+static void free_meta_frame(SpiceMetaFrame *meta)
 {
-    spice_msg_in_unref(frame->msg);
-    if (frame->sample) {
-        gst_sample_unref(frame->sample);
+    meta->frame->free(meta->frame);
+    if (meta->sample) {
+        gst_sample_unref(meta->sample);
     }
-    free(frame);
+    free(meta);
 }
 
 
@@ -122,7 +121,7 @@  static void schedule_frame(SpiceGstDecoder *decoder);
 static gboolean display_frame(gpointer video_decoder)
 {
     SpiceGstDecoder *decoder = (SpiceGstDecoder*)video_decoder;
-    SpiceFrame *frame;
+    SpiceMetaFrame *meta;
     GstCaps *caps;
     gint width, height;
     GstStructure *s;
@@ -131,17 +130,17 @@  static gboolean display_frame(gpointer video_decoder)
 
     g_mutex_lock(&decoder->queues_mutex);
     decoder->timer_id = 0;
-    frame = g_queue_pop_head(decoder->display_queue);
+    meta = g_queue_pop_head(decoder->display_queue);
     g_mutex_unlock(&decoder->queues_mutex);
     /* If the queue is empty we don't even need to reschedule */
-    g_return_val_if_fail(frame, G_SOURCE_REMOVE);
+    g_return_val_if_fail(meta, G_SOURCE_REMOVE);
 
-    if (!frame->sample) {
+    if (!meta->sample) {
         spice_warning("got a frame without a sample!");
         goto error;
     }
 
-    caps = gst_sample_get_caps(frame->sample);
+    caps = gst_sample_get_caps(meta->sample);
     if (!caps) {
         spice_warning("GStreamer error: could not get the caps of the sample");
         goto error;
@@ -154,18 +153,18 @@  static gboolean display_frame(gpointer video_decoder)
         goto error;
     }
 
-    buffer = gst_sample_get_buffer(frame->sample);
+    buffer = gst_sample_get_buffer(meta->sample);
     if (!gst_buffer_map(buffer, &mapinfo, GST_MAP_READ)) {
         spice_warning("GStreamer error: could not map the buffer");
         goto error;
     }
 
-    stream_display_frame(decoder->base.stream, frame->msg,
+    stream_display_frame(decoder->base.stream, meta->frame,
                          width, height, mapinfo.data);
     gst_buffer_unmap(buffer, &mapinfo);
 
  error:
-    free_frame(frame);
+    free_meta_frame(meta);
     schedule_frame(decoder);
     return G_SOURCE_REMOVE;
 }
@@ -177,14 +176,13 @@  static void schedule_frame(SpiceGstDecoder *decoder)
     g_mutex_lock(&decoder->queues_mutex);
 
     while (!decoder->timer_id) {
-        SpiceFrame *frame = g_queue_peek_head(decoder->display_queue);
-        if (!frame) {
+        SpiceMetaFrame *meta = g_queue_peek_head(decoder->display_queue);
+        if (!meta) {
             break;
         }
 
-        SpiceStreamDataHeader *op = spice_msg_in_parsed(frame->msg);
-        if (now < op->multi_media_time) {
-            decoder->timer_id = g_timeout_add(op->multi_media_time - now,
+        if (now < meta->frame->mm_time) {
+            decoder->timer_id = g_timeout_add(meta->frame->mm_time - now,
                                               display_frame, decoder);
         } else if (g_queue_get_length(decoder->display_queue) == 1) {
             /* Still attempt to display the least out of date frame so the
@@ -193,11 +191,11 @@  static void schedule_frame(SpiceGstDecoder *decoder)
             decoder->timer_id = g_timeout_add(0, display_frame, decoder);
         } else {
             SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping",
-                        __FUNCTION__, now - op->multi_media_time,
-                        op->multi_media_time, now);
+                        __FUNCTION__, now - meta->frame->mm_time,
+                        meta->frame->mm_time, now);
             stream_dropped_frame_on_playback(decoder->base.stream);
             g_queue_pop_head(decoder->display_queue);
-            free_frame(frame);
+            free_meta_frame(meta);
         }
     }
 
@@ -228,27 +226,27 @@  static GstFlowReturn new_sample(GstAppSink *gstappsink, gpointer video_decoder)
          * finding a match either, etc. So check the buffer has a matching
          * frame first.
          */
-        SpiceFrame *frame;
+        SpiceMetaFrame *meta;
         GList *l = g_queue_peek_head_link(decoder->decoding_queue);
         while (l) {
-            frame = l->data;
-            if (frame->timestamp == GST_BUFFER_PTS(buffer)) {
+            meta = l->data;
+            if (meta->pts == GST_BUFFER_PTS(buffer)) {
                 /* The frame is now ready for display */
-                frame->sample = sample;
-                g_queue_push_tail(decoder->display_queue, frame);
+                meta->sample = sample;
+                g_queue_push_tail(decoder->display_queue, meta);
 
                 /* Now that we know there is a match, remove it and the older
                  * frames from the decoding queue.
                  */
-                while ((frame = g_queue_pop_head(decoder->decoding_queue))) {
-                    if (frame->timestamp == GST_BUFFER_PTS(buffer)) {
+                while ((meta = g_queue_pop_head(decoder->decoding_queue))) {
+                    if (meta->pts == GST_BUFFER_PTS(buffer)) {
                         break;
                     }
                     /* The GStreamer pipeline dropped the corresponding
                      * buffer.
                      */
                     SPICE_DEBUG("the GStreamer pipeline dropped a frame");
-                    free_frame(frame);
+                    free_meta_frame(meta);
                 }
                 break;
             }
@@ -394,13 +392,13 @@  static void spice_gst_decoder_destroy(VideoDecoder *video_decoder)
         g_source_remove(decoder->timer_id);
     }
     g_mutex_clear(&decoder->queues_mutex);
-    SpiceFrame *frame;
-    while ((frame = g_queue_pop_head(decoder->decoding_queue))) {
-        free_frame(frame);
+    SpiceMetaFrame *meta;
+    while ((meta = g_queue_pop_head(decoder->decoding_queue))) {
+        free_meta_frame(meta);
     }
     g_queue_free(decoder->decoding_queue);
-    while ((frame = g_queue_pop_head(decoder->display_queue))) {
-        free_frame(frame);
+    while ((meta = g_queue_pop_head(decoder->display_queue))) {
+        free_meta_frame(meta);
     }
     g_queue_free(decoder->display_queue);
 
@@ -411,34 +409,25 @@  static void spice_gst_decoder_destroy(VideoDecoder *video_decoder)
      */
 }
 
-static void release_buffer_data(gpointer data)
-{
-    SpiceMsgIn* frame_msg = (SpiceMsgIn*)data;
-    spice_msg_in_unref(frame_msg);
-}
 
 static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
-                                              SpiceMsgIn *frame_msg,
-                                              int32_t latency)
+                                              SpiceFrame *frame, int latency)
 {
     SpiceGstDecoder *decoder = (SpiceGstDecoder*)video_decoder;
 
-    uint8_t *data;
-    uint32_t size = spice_msg_in_frame_data(frame_msg, &data);
-    if (size == 0) {
+    if (frame->size == 0) {
         SPICE_DEBUG("got an empty frame buffer!");
+        frame->free(frame);
         return TRUE;
     }
 
-    SpiceStreamDataHeader *frame_op = spice_msg_in_parsed(frame_msg);
-    if (frame_op->multi_media_time < decoder->last_mm_time) {
+    if (frame->mm_time < decoder->last_mm_time) {
         SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
-                    " resetting stream, id %u",
-                    frame_op->multi_media_time,
-                    decoder->last_mm_time, frame_op->id);
+                    " resetting stream",
+                    frame->mm_time, decoder->last_mm_time);
         /* Let GStreamer deal with the frame anyway */
     }
-    decoder->last_mm_time = frame_op->multi_media_time;
+    decoder->last_mm_time = frame->mm_time;
 
     if (latency < 0 &&
         decoder->base.codec_type == SPICE_VIDEO_CODEC_TYPE_MJPEG) {
@@ -446,6 +435,7 @@  static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
          * saves CPU so do it.
          */
         SPICE_DEBUG("dropping a late MJPEG frame");
+        frame->free(frame);
         return TRUE;
     }
 
@@ -455,22 +445,22 @@  static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
         return FALSE;
     }
 
-    /* ref() the frame_msg for the buffer */
-    spice_msg_in_ref(frame_msg);
+    /* ref() the frame data for the buffer */
+    frame->ref_data(frame->data_opaque);
     GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
-                                                    data, size, 0, size,
-                                                    frame_msg, &release_buffer_data);
+                                                    frame->data, frame->size, 0, frame->size,
+                                                    frame->data_opaque, frame->unref_data);
 
     GST_BUFFER_DURATION(buffer) = GST_CLOCK_TIME_NONE;
     GST_BUFFER_DTS(buffer) = GST_CLOCK_TIME_NONE;
     GST_BUFFER_PTS(buffer) = gst_clock_get_time(decoder->clock) - gst_element_get_base_time(decoder->pipeline) + ((uint64_t)MAX(0, latency)) * 1000 * 1000;
 
     g_mutex_lock(&decoder->queues_mutex);
-    g_queue_push_tail(decoder->decoding_queue, create_frame(buffer, frame_msg));
+    g_queue_push_tail(decoder->decoding_queue, create_meta_frame(frame, buffer));
     g_mutex_unlock(&decoder->queues_mutex);
 
     if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
-        SPICE_DEBUG("GStreamer error: unable to push frame of size %u", size);
+        SPICE_DEBUG("GStreamer error: unable to push frame of size %u", frame->size);
         stream_dropped_frame_on_playback(decoder->base.stream);
     }
     return TRUE;
diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c
index 722494ee..85e8487f 100644
--- a/src/channel-display-mjpeg.c
+++ b/src/channel-display-mjpeg.c
@@ -38,7 +38,7 @@  typedef struct MJpegDecoder {
     /* ---------- Frame queue ---------- */
 
     GQueue *msgq;
-    SpiceMsgIn *cur_frame_msg;
+    SpiceFrame *cur_frame;
     guint timer_id;
 
     /* ---------- Output frame data ---------- */
@@ -53,10 +53,8 @@  typedef struct MJpegDecoder {
 static void mjpeg_src_init(struct jpeg_decompress_struct *cinfo)
 {
     MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder, mjpeg_src);
-
-    uint8_t *data;
-    cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->cur_frame_msg, &data);
-    cinfo->src->next_input_byte = data;
+    cinfo->src->bytes_in_buffer = decoder->cur_frame->size;
+    cinfo->src->next_input_byte = decoder->cur_frame->data;
 }
 
 static boolean mjpeg_src_fill(struct jpeg_decompress_struct *cinfo)
@@ -166,12 +164,13 @@  static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
         dest = &(decoder->out_frame[decoder->mjpeg_cinfo.output_scanline * width * 4]);
     }
     jpeg_finish_decompress(&decoder->mjpeg_cinfo);
+    decoder->cur_frame->unref_data(decoder->cur_frame->data_opaque);
 
     /* Display the frame and dispose of it */
-    stream_display_frame(decoder->base.stream, decoder->cur_frame_msg,
+    stream_display_frame(decoder->base.stream, decoder->cur_frame,
                          width, height, decoder->out_frame);
-    spice_msg_in_unref(decoder->cur_frame_msg);
-    decoder->cur_frame_msg = NULL;
+    decoder->cur_frame->free(decoder->cur_frame);
+    decoder->cur_frame = NULL;
     decoder->timer_id = 0;
 
     /* Schedule the next frame */
@@ -190,33 +189,35 @@  static void mjpeg_decoder_schedule(MJpegDecoder *decoder)
     }
 
     guint32 time = stream_get_time(decoder->base.stream);
-    SpiceMsgIn *frame_msg = decoder->cur_frame_msg;
-    decoder->cur_frame_msg = NULL;
+    SpiceFrame *frame = decoder->cur_frame;
+    decoder->cur_frame = NULL;
     do {
-        if (frame_msg) {
-            SpiceStreamDataHeader *op = spice_msg_in_parsed(frame_msg);
-            if (time <= op->multi_media_time) {
-                guint32 d = op->multi_media_time - time;
-                decoder->cur_frame_msg = frame_msg;
+        if (frame) {
+            if (time <= frame->mm_time) {
+                guint32 d = frame->mm_time - time;
+                decoder->cur_frame = frame;
                 decoder->timer_id = g_timeout_add(d, mjpeg_decoder_decode_frame, decoder);
                 break;
             }
 
             SPICE_DEBUG("%s: rendering too late by %u ms (ts: %u, mmtime: %u), dropping ",
-                        __FUNCTION__, time - op->multi_media_time,
-                        op->multi_media_time, time);
+                        __FUNCTION__, time - frame->mm_time,
+                        frame->mm_time, time);
             stream_dropped_frame_on_playback(decoder->base.stream);
-            spice_msg_in_unref(frame_msg);
+            frame->unref_data(frame->data_opaque);
+            frame->free(frame);
         }
-        frame_msg = g_queue_pop_head(decoder->msgq);
-    } while (frame_msg);
+        frame = g_queue_pop_head(decoder->msgq);
+    } while (frame);
 }
 
 
 /* mjpeg_decoder_drop_queue() helper */
 static void _msg_in_unref_func(gpointer data, gpointer user_data)
 {
-    spice_msg_in_unref(data);
+    SpiceFrame *frame = data;
+    frame->unref_data(frame->data_opaque);
+    frame->free(frame);
 }
 
 static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
@@ -225,9 +226,10 @@  static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
         g_source_remove(decoder->timer_id);
         decoder->timer_id = 0;
     }
-    if (decoder->cur_frame_msg) {
-        spice_msg_in_unref(decoder->cur_frame_msg);
-        decoder->cur_frame_msg = NULL;
+    if (decoder->cur_frame) {
+        decoder->cur_frame->unref_data(decoder->cur_frame->data_opaque);
+        decoder->cur_frame->free(decoder->cur_frame);
+        decoder->cur_frame = NULL;
     }
     g_queue_foreach(decoder->msgq, _msg_in_unref_func, NULL);
     g_queue_clear(decoder->msgq);
@@ -236,25 +238,21 @@  static void mjpeg_decoder_drop_queue(MJpegDecoder *decoder)
 /* ---------- VideoDecoder's public API ---------- */
 
 static gboolean mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
-                                          SpiceMsgIn *frame_msg,
-                                          int32_t latency)
+                                          SpiceFrame *frame, int32_t latency)
 {
     MJpegDecoder *decoder = (MJpegDecoder*)video_decoder;
-    SpiceMsgIn *last_msg;
+    SpiceFrame *last_frame;
 
     SPICE_DEBUG("%s", __FUNCTION__);
 
-    last_msg = g_queue_peek_tail(decoder->msgq);
-    if (last_msg) {
-        SpiceStreamDataHeader *last_op, *frame_op;
-        last_op = spice_msg_in_parsed(last_msg);
-        frame_op = spice_msg_in_parsed(frame_msg);
-        if (frame_op->multi_media_time < last_op->multi_media_time) {
+    last_frame = g_queue_peek_tail(decoder->msgq);
+    if (last_frame) {
+        if (frame->mm_time < last_frame->mm_time) {
             /* This should really not happen */
             SPICE_DEBUG("new-frame-time < last-frame-time (%u < %u):"
-                        " resetting stream, id %u",
-                        frame_op->multi_media_time,
-                        last_op->multi_media_time, frame_op->id);
+                        " resetting stream",
+                        frame->mm_time,
+                        last_frame->mm_time);
             mjpeg_decoder_drop_queue(decoder);
         }
     }
@@ -266,8 +264,8 @@  static gboolean mjpeg_decoder_queue_frame(VideoDecoder *video_decoder,
         return TRUE;
     }
 
-    spice_msg_in_ref(frame_msg);
-    g_queue_push_tail(decoder->msgq, frame_msg);
+    frame->ref_data(frame->data_opaque);
+    g_queue_push_tail(decoder->msgq, frame);
     mjpeg_decoder_schedule(decoder);
     return TRUE;
 }
diff --git a/src/channel-display-priv.h b/src/channel-display-priv.h
index 60cc8efa..032725db 100644
--- a/src/channel-display-priv.h
+++ b/src/channel-display-priv.h
@@ -36,6 +36,20 @@  G_BEGIN_DECLS
 
 typedef struct display_stream display_stream;
 
+typedef struct SpiceFrame SpiceFrame;
+struct SpiceFrame {
+    uint32_t mm_time;
+    SpiceRect dest;
+
+    uint8_t *data;
+    uint32_t size;
+    gpointer data_opaque;
+    void (*ref_data)(gpointer data_opaque);
+    void (*unref_data)(gpointer data_opaque);
+
+    void (*free)(SpiceFrame *frame);
+};
+
 typedef struct VideoDecoder VideoDecoder;
 struct VideoDecoder {
     /* Releases the video decoder's resources */
@@ -44,16 +58,17 @@  struct VideoDecoder {
     /* Notifies the decoder that the mm-time clock changed. */
     void (*reschedule)(VideoDecoder *decoder);
 
-    /* Decompresses the specified frame.
+    /* Takes ownership of the specified frame, decompresses it,
+     * and displays it at the right time.
      *
      * @decoder:   The video decoder.
-     * @frame_msg: The Spice message containing the compressed frame.
+     * @frame:     The compressed Spice frame.
      * @latency:   How long in milliseconds until the frame should be
      *             displayed. Negative values mean the frame is late.
      * @return:    False if the decoder can no longer decode frames,
      *             True otherwise.
      */
-    gboolean (*queue_frame)(VideoDecoder *decoder, SpiceMsgIn *frame_msg, int32_t latency);
+    gboolean (*queue_frame)(VideoDecoder *video_decoder, SpiceFrame *frame, int latency);
 
     /* The format of the encoded video. */
     int codec_type;
@@ -137,8 +152,7 @@  struct display_stream {
 
 guint32 stream_get_time(display_stream *st);
 void stream_dropped_frame_on_playback(display_stream *st);
-void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg, uint32_t width, uint32_t height, uint8_t *data);
-uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data);
+void stream_display_frame(display_stream *st, SpiceFrame *frame, uint32_t width, uint32_t height, uint8_t* data);
 
 
 G_END_DECLS
diff --git a/src/channel-display.c b/src/channel-display.c
index 2423fb0e..00b54406 100644
--- a/src/channel-display.c
+++ b/src/channel-display.c
@@ -1234,8 +1234,7 @@  static const SpiceRect *stream_get_dest(display_stream *st, SpiceMsgIn *frame_ms
 
 }
 
-G_GNUC_INTERNAL
-uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data)
+static uint32_t spice_msg_in_frame_data(SpiceMsgIn *frame_msg, uint8_t **data)
 {
     switch (spice_msg_in_type(frame_msg)) {
     case SPICE_MSG_DISPLAY_STREAM_DATA: {
@@ -1270,15 +1269,10 @@  void stream_dropped_frame_on_playback(display_stream *st)
 
 /* main context */
 G_GNUC_INTERNAL
-void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
+void stream_display_frame(display_stream *st, SpiceFrame *frame,
                           uint32_t width, uint32_t height, uint8_t *data)
 {
-    const SpiceRect *dest;
-    int stride;
-
-    dest = stream_get_dest(st, frame_msg);
-
-    stride = width * sizeof(uint32_t);
+    int stride = width * sizeof(uint32_t);
     if (!(st->flags & SPICE_STREAM_FLAGS_TOP_DOWN)) {
         data += stride * (height - 1);
         stride = -stride;
@@ -1288,15 +1282,16 @@  void stream_display_frame(display_stream *st, SpiceMsgIn *frame_msg,
 #ifdef G_OS_WIN32
                                         SPICE_DISPLAY_CHANNEL(st->channel)->priv->dc,
 #endif
-                                        dest, data,
+                                        &frame->dest, data,
                                         width, height, stride,
                                         st->have_region ? &st->region : NULL);
 
-    if (st->surface->primary)
+    if (st->surface->primary) {
         g_signal_emit(st->channel, signals[SPICE_DISPLAY_INVALIDATE], 0,
-                      dest->left, dest->top,
-                      dest->right - dest->left,
-                      dest->bottom - dest->top);
+                      frame->dest.left, frame->dest.top,
+                      frame->dest.right - frame->dest.left,
+                      frame->dest.bottom - frame->dest.top);
+    }
 }
 
 /* after a sequence of 3 drops, push a report to the server, even
@@ -1425,6 +1420,7 @@  static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
     display_stream *st = get_stream_by_id(channel, op->id);
     guint32 mmtime;
     int32_t latency;
+    SpiceFrame *frame;
 
     g_return_if_fail(st != NULL);
     mmtime = stream_get_time(st);
@@ -1471,11 +1467,20 @@  static void display_handle_stream_data(SpiceChannel *channel, SpiceMsgIn *in)
      * decoding and best decide if/when to drop them when they are late,
      * taking into account the impact on later frames.
      */
-    if (!st->video_decoder->queue_frame(st->video_decoder, in, latency)) {
+    frame = spice_new(SpiceFrame, 1);
+    frame->mm_time = op->multi_media_time;
+    frame->dest = *stream_get_dest(st, in);
+    frame->size = spice_msg_in_frame_data(in, &frame->data);
+    frame->data_opaque = in;
+    frame->ref_data = (void*)spice_msg_in_ref;
+    frame->unref_data = (void*)spice_msg_in_unref;
+    frame->free = (void*)free;
+    if (!st->video_decoder->queue_frame(st->video_decoder, frame, latency)) {
         destroy_stream(channel, op->id);
         report_invalid_stream(channel, op->id);
         return;
     }
+
     if (c->enable_adaptive_streaming) {
         display_update_stream_report(SPICE_DISPLAY_CHANNEL(channel), op->id,
                                      op->multi_media_time, latency);

Comments

On Tue, Apr 04, 2017 at 05:45:35PM +0200, Francois Gouget wrote:
> Code-wise this improves the separation between networking and the video
> decoding.
> It also makes it easier to reuse the latter should the client one day
> receive video streams through other messages.
> 
> Signed-off-by: Francois Gouget <fgouget@codeweavers.com>
> ---
>  src/channel-display-gst.c   | 128 ++++++++++++++++++++------------------------
>  src/channel-display-mjpeg.c |  74 +++++++++++++------------
>  src/channel-display-priv.h  |  24 +++++++--
>  src/channel-display.c       |  35 ++++++------
>  4 files changed, 134 insertions(+), 127 deletions(-)
> 
> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> index c4190b2d..93b61bab 100644
> --- a/src/channel-display-gst.c
> +++ b/src/channel-display-gst.c
> @@ -86,31 +86,30 @@ G_STATIC_ASSERT(G_N_ELEMENTS(gst_opts) <= SPICE_VIDEO_CODEC_TYPE_ENUM_END);
>  #define VALID_VIDEO_CODEC_TYPE(codec) \
>      (codec > 0 && codec < G_N_ELEMENTS(gst_opts))
>  
> -/* ---------- SpiceFrame ---------- */
> +/* ---------- SpiceMetaFrame ---------- */
>  
> -typedef struct _SpiceFrame {
> -    GstClockTime timestamp;
> -    SpiceMsgIn *msg;
> -    GstSample *sample;
> -} SpiceFrame;
> +typedef struct SpiceMetaFrame {
> +    SpiceFrame *frame;
> +    GstClockTime pts;
> +    void *sample;

Why not GstSample *sample?

> +} SpiceMetaFrame;

SpiceGstFrame rather than SpiceMetaFrame?

> @@ -455,22 +445,22 @@ static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
>          return FALSE;
>      }
>  
> -    /* ref() the frame_msg for the buffer */
> -    spice_msg_in_ref(frame_msg);
> +    /* ref() the frame data for the buffer */
> +    frame->ref_data(frame->data_opaque);
>      GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
> -                                                    data, size, 0, size,
> -                                                    frame_msg, &release_buffer_data);
> +                                                    frame->data, frame->size, 0, frame->size,
> +                                                    frame->data_opaque, frame->unref_data);
>  
>      GST_BUFFER_DURATION(buffer) = GST_CLOCK_TIME_NONE;
>      GST_BUFFER_DTS(buffer) = GST_CLOCK_TIME_NONE;
>      GST_BUFFER_PTS(buffer) = gst_clock_get_time(decoder->clock) - gst_element_get_base_time(decoder->pipeline) + ((uint64_t)MAX(0, latency)) * 1000 * 1000;
>  
>      g_mutex_lock(&decoder->queues_mutex);
> -    g_queue_push_tail(decoder->decoding_queue, create_frame(buffer, frame_msg));
> +    g_queue_push_tail(decoder->decoding_queue, create_meta_frame(frame, buffer));

I believe this is my main grip with this patch, the somehow disjoint
SpiceFrame::free and SpiceFrame::unref_data which needs to get called at
different times, sounds like something we'll get wrong in the future.
I'm wondering if passing a SpiceGstFrame (aka SpiceMetaFrame) as the
last arg to gst_buffer_new_wrapped_full() would help here by grouping
the time we unref the message, and the time we free the SpiceFrame.

Or does the SpiceGstFrame need to outlive this buffer? (lifetime of
everything involved is not clear, the SpiceGstFrame moves from
decoding_queue to display_queue, and is mostly needed for display_queue
consumers, I did not check how long the GstBuffer created here stays
alive compared to a buffer sitting in these queues).

> diff --git a/src/channel-display-mjpeg.c b/src/channel-display-mjpeg.c
> index 722494ee..85e8487f 100644
> --- a/src/channel-display-mjpeg.c
> +++ b/src/channel-display-mjpeg.c
> @@ -38,7 +38,7 @@ typedef struct MJpegDecoder {
>      /* ---------- Frame queue ---------- */
>  
>      GQueue *msgq;
> -    SpiceMsgIn *cur_frame_msg;
> +    SpiceFrame *cur_frame;
>      guint timer_id;
>  
>      /* ---------- Output frame data ---------- */
> @@ -53,10 +53,8 @@ typedef struct MJpegDecoder {
>  static void mjpeg_src_init(struct jpeg_decompress_struct *cinfo)
>  {
>      MJpegDecoder *decoder = SPICE_CONTAINEROF(cinfo->src, MJpegDecoder, mjpeg_src);
> -
> -    uint8_t *data;
> -    cinfo->src->bytes_in_buffer = spice_msg_in_frame_data(decoder->cur_frame_msg, &data);
> -    cinfo->src->next_input_byte = data;
> +    cinfo->src->bytes_in_buffer = decoder->cur_frame->size;
> +    cinfo->src->next_input_byte = decoder->cur_frame->data;
>  }
>  
>  static boolean mjpeg_src_fill(struct jpeg_decompress_struct *cinfo)
> @@ -166,12 +164,13 @@ static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
>          dest = &(decoder->out_frame[decoder->mjpeg_cinfo.output_scanline * width * 4]);
>      }
>      jpeg_finish_decompress(&decoder->mjpeg_cinfo);
> +    decoder->cur_frame->unref_data(decoder->cur_frame->data_opaque);
>  
>      /* Display the frame and dispose of it */
> -    stream_display_frame(decoder->base.stream, decoder->cur_frame_msg,
> +    stream_display_frame(decoder->base.stream, decoder->cur_frame,
>                           width, height, decoder->out_frame);
> -    spice_msg_in_unref(decoder->cur_frame_msg);
> -    decoder->cur_frame_msg = NULL;
> +    decoder->cur_frame->free(decoder->cur_frame);
> +    decoder->cur_frame = NULL;
>      decoder->timer_id = 0;

->unref_data + ->free can probably be done in a helper for the mjpeg
decoder?

Christophe
On Wed, 5 Apr 2017, Christophe Fergeau wrote:
[...]
> > +typedef struct SpiceMetaFrame {
> > +    SpiceFrame *frame;
> > +    GstClockTime pts;
> > +    void *sample;
> 
> Why not GstSample *sample?

Eh. This comes from a codebase that has to support... GStreamer 0.10. 
But sure this should just be a GstSample in this incarnation.


> > +} SpiceMetaFrame;
> 
> SpiceGstFrame rather than SpiceMetaFrame?

The Meta came from my attempts to use GstMeta which is unfortunately 
unusable. Then I kept is since it's more meta information about the 
SpiceFrame. But I'm fine with SpiceGstFrame and gstframe.


> > @@ -455,22 +445,22 @@ static gboolean spice_gst_decoder_queue_frame(VideoDecoder *video_decoder,
> >          return FALSE;
> >      }
> >  
> > -    /* ref() the frame_msg for the buffer */
> > -    spice_msg_in_ref(frame_msg);
> > +    /* ref() the frame data for the buffer */
> > +    frame->ref_data(frame->data_opaque);
> >      GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
> > -                                                    data, size, 0, size,
> > -                                                    frame_msg, &release_buffer_data);
> > +                                                    frame->data, frame->size, 0, frame->size,
> > +                                                    frame->data_opaque, frame->unref_data);
> >  
> >      GST_BUFFER_DURATION(buffer) = GST_CLOCK_TIME_NONE;
> >      GST_BUFFER_DTS(buffer) = GST_CLOCK_TIME_NONE;
> >      GST_BUFFER_PTS(buffer) = gst_clock_get_time(decoder->clock) - gst_element_get_base_time(decoder->pipeline) + ((uint64_t)MAX(0, latency)) * 1000 * 1000;
> >  
> >      g_mutex_lock(&decoder->queues_mutex);
> > -    g_queue_push_tail(decoder->decoding_queue, create_frame(buffer, frame_msg));
> > +    g_queue_push_tail(decoder->decoding_queue, create_meta_frame(frame, buffer));
> 
> I believe this is my main grip with this patch, the somehow disjoint
> SpiceFrame::free and SpiceFrame::unref_data which needs to get called at
> different times, sounds like something we'll get wrong in the future.
> I'm wondering if passing a SpiceGstFrame (aka SpiceMetaFrame) as the
> last arg to gst_buffer_new_wrapped_full() would help here by grouping
> the time we unref the message, and the time we free the SpiceFrame.
> 
> Or does the SpiceGstFrame need to outlive this buffer?

Yes. The GstBuffer is no longer needed once it has been decoded by the 
video codec. The GStreamer video codec is the one that frees frame->data 
through frame->unref_data(), even before we get the decompressed frame.

On the other side of the decoding pipeline we get a GstSample which 
contains our decoded frame and which we match with the corresponding 
SpiceGstFrame structure so we know when and where to display it. So the 
SpiceGstFrame can outlive frame->data by tens of milliseconds.

So we are forced to implement a ref()/unref() API for GStreamer 
compatibility. Fortunately this is more general than a simple free() so 
it should not impose undue restrictions down the line.

If you prefer I could implement a ref()/unref() API on SpiceFrame too to 
be more consistent but it's more complex than a simple free.


> (lifetime of
> everything involved is not clear, the SpiceGstFrame moves from
> decoding_queue to display_queue, and is mostly needed for display_queue
> consumers, I did not check how long the GstBuffer created here stays
> alive compared to a buffer sitting in these queues).

I'll add a comment detailing the process to 
spice_gst_decoder_queue_frame() (since it's responsible for getting the 
ball rolling. But it could also go in the file header).


[...]
> > @@ -166,12 +164,13 @@ static gboolean mjpeg_decoder_decode_frame(gpointer video_decoder)
> >          dest = &(decoder->out_frame[decoder->mjpeg_cinfo.output_scanline * width * 4]);
> >      }
> >      jpeg_finish_decompress(&decoder->mjpeg_cinfo);
> > +    decoder->cur_frame->unref_data(decoder->cur_frame->data_opaque);
> >  
> >      /* Display the frame and dispose of it */
> > -    stream_display_frame(decoder->base.stream, decoder->cur_frame_msg,
> > +    stream_display_frame(decoder->base.stream, decoder->cur_frame,
> >                           width, height, decoder->out_frame);
> > -    spice_msg_in_unref(decoder->cur_frame_msg);
> > -    decoder->cur_frame_msg = NULL;
> > +    decoder->cur_frame->free(decoder->cur_frame);
> > +    decoder->cur_frame = NULL;
> >      decoder->timer_id = 0;
> 
> ->unref_data + ->free can probably be done in a helper for the mjpeg
> decoder?

Yes. It does mean freeing the network message a bit later than strictly 
necessary, that is after the frame has been displayed rather than 
before, but it probably does not matter.


We're already keeping it around for much longer since we only decompress 
the frame at the last millisecond anyway. That is, the timeline is this:

 1) Receive a SpiceMsgIn containing a compressed MJPEG frame 40 ms 
    before it is due to be displayed.
 2) Wrap it in a SpiceFrame for the MJPEG decoder.
 3) mjpeg_decoder_queue_frame() puts the SpiceFrame in decoder->msgq and
    calls mjpeg_decoder_schedule().
 4) mjpeg_decoder_schedule() arranges for mjpeg_decoder_decode_frame() 
    to be called in the main thread 40 ms from now.
 5) 40 ms later it's time to display the frame. Oups it has not be 
    decompressed yet. So do on the double now, blocking the main thread 
    while we're at it.
 6) 4 ms or so later the frame has been decompressed. We no longer need 
    the SpiceMsgIn message any more but still keep it.
 7) Call stream_display_frame() to display the frame, 4 ms late!
 8) Microseconds later (presumably), call free_spice_frame() to free 
    both the SpiceMsgIn message and the SpiceFrame structure.

My main beef is with sitting on the compressed message for 40 ms doing 
nothing with it and then decompressing it only when it's essentially too 
late. But then MJPEG is really fast to decompress so it does not really 
matter and I understand why it's done this way (it's simpler, 
minimises memory usage).

So freeing SpiceMsgIn in step 6 or 8 makes no significant difference.


In contrast, with GStreamer, depending on the codec and the GStreamer 
version and bugs, decoding could take a lot more time, possibly into the 
tens of ms. So GStreamer decoder does things differently. It goes like 
this:

 1) Receive a SpiceMsgIn containing a compressed frame 40 ms 
    before it is due to be displayed.
 2) Wrap it in a SpiceFrame for the decoder.
 3) spice_gst_decoder_queue_frame() pushes the compressed frame 
    proper (frame->data) to the GStreamer pipeline for decoding. But it 
    still needs to know the where and when to display the decoded frame 
    later to it create a SpiceMetaFrame and pushes that to the 
    decoding_queue. And it returns control immediately to the caller.
 4) At some unknown point GStreamer no longer needs the compressed frame 
    data and frees it through frame->unref_data().
 5) Some colorspace conversions later GStreamer calls new_sample() with 
    the decompressed frame. We attach it to the corresponding 
    SpiceMetaFrame structure (which we pop) from the decoding_queue.
 6) But it's not time to display it yet so we attach the 
    decompressed frame to the SpiceMetaFrame, push it to the 
    display_queue and call schedule_frame().
 7) schedule_frame() arranges for display_frame() to be called in what 
    may now be only 30 ms away.
 8) display_frame() pops the first SpiceMetaFrame from the display_queue and 
    calls stream_display_frame() right on time.
 9) display_frame() then frees the SpiceMetaFrame and the SpiceFrame and 
    decompressed frame with it.

So as I was saying earlier the compressed frame is freed in step 4 and 
SpiceGstFrame, SpiceFrame and GstSample in step 9, many milliseconds 
later.

The advantages are that the main thread is not blocked while decoding, 
and displaying the frame is not delayed by the longer and less 
predictable decoding times.

And the drawbacks are that the frames are queued in their decompressed 
form rather than the compressed one (increase memory usage), and the 
code is a bit more complex.