[Spice-devel,v5,18/20] spice-gtk: Avoid copying the compressed message data in the GStreamer decoder.

Submitted by Francois Gouget on Aug. 27, 2015, 7:03 p.m.

Details

Message ID alpine.DEB.2.20.1508272043490.22172@amboise
State New
Headers show

Not browsing as part of any series.

Commit Message

Francois Gouget Aug. 27, 2015, 7:03 p.m.
Signed-off-by: Francois Gouget <fgouget@codeweavers.com>
---

Changes since take 2:
 - None.

 src/channel-display-gst.c | 89 ++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 84 insertions(+), 5 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
index 3024356..59ce3e1 100644
--- a/src/channel-display-gst.c
+++ b/src/channel-display-gst.c
@@ -48,11 +48,43 @@  struct GstDecoder {
 
     /* ---------- Output frame data ---------- */
 
+    GMutex pipeline_mutex;
+    GCond pipeline_cond;
+    int pipeline_wait;
+    uint32_t samples_count;
+
     GstSample *sample;
     GstMapInfo mapinfo;
 };
 
 
+/* Signals that the pipeline is done processing the last buffer we gave it.
+ *
+ * @decoder:   The video decoder object.
+ * @samples:   How many samples to add to the available samples count.
+ */
+static void signal_pipeline(GstDecoder *decoder, uint32_t samples)
+{
+    g_mutex_lock(&decoder->pipeline_mutex);
+    decoder->pipeline_wait = 0;
+    decoder->samples_count += samples;
+    g_cond_signal(&decoder->pipeline_cond);
+    g_mutex_unlock(&decoder->pipeline_mutex);
+}
+
+static void appsrc_need_data_cb(GstAppSrc *src, guint length, gpointer user_data)
+{
+    GstDecoder *decoder = (GstDecoder*)user_data;
+    signal_pipeline(decoder, 0);
+}
+
+static GstFlowReturn appsink_new_sample_cb(GstAppSink *appsrc, gpointer user_data)
+{
+    GstDecoder *decoder = (GstDecoder*)user_data;
+    signal_pipeline(decoder, 1);
+    return GST_FLOW_OK;
+}
+
 /* ---------- GStreamer pipeline ---------- */
 
 static void reset_pipeline(GstDecoder *decoder)
@@ -66,10 +98,18 @@  static void reset_pipeline(GstDecoder *decoder)
     gst_object_unref(decoder->appsink);
     gst_object_unref(decoder->pipeline);
     decoder->pipeline = NULL;
+
+    g_mutex_clear(&decoder->pipeline_mutex);
+    g_cond_clear(&decoder->pipeline_cond);
 }
 
 static gboolean construct_pipeline(GstDecoder *decoder)
 {
+    g_mutex_init(&decoder->pipeline_mutex);
+    g_cond_init(&decoder->pipeline_cond);
+    decoder->pipeline_wait = 1;
+    decoder->samples_count = 0;
+
     const gchar *src_caps, *gstdec_name;
     switch (decoder->base.codec_type) {
     case SPICE_VIDEO_CODEC_TYPE_MJPEG:
@@ -101,7 +141,12 @@  static gboolean construct_pipeline(GstDecoder *decoder)
     }
 
     decoder->appsrc = GST_APP_SRC(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "src"));
+    GstAppSrcCallbacks appsrc_cbs = {&appsrc_need_data_cb, NULL, NULL};
+    gst_app_src_set_callbacks(decoder->appsrc, &appsrc_cbs, decoder, NULL);
+
     decoder->appsink = GST_APP_SINK(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "sink"));
+    GstAppSinkCallbacks appsink_cbs = {NULL, NULL, &appsink_new_sample_cb};
+    gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs, decoder, NULL);
 
     if (gst_element_set_state(decoder->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
         SPICE_DEBUG("GStreamer error: Unable to set the pipeline to the playing state.");
@@ -112,6 +157,11 @@  static gboolean construct_pipeline(GstDecoder *decoder)
     return TRUE;
 }
 
+static void release_msg_in(gpointer data)
+{
+    spice_msg_in_unref((SpiceMsgIn*)data);
+}
+
 static gboolean push_compressed_buffer(GstDecoder *decoder,
                                        SpiceMsgIn *frame_msg)
 {
@@ -122,10 +172,11 @@  static gboolean push_compressed_buffer(GstDecoder *decoder,
         return FALSE;
     }
 
-    // TODO.  Grr.  Seems like a wasted alloc
-    gpointer d = g_malloc(size);
-    memcpy(d, data, size);
-    GstBuffer *buffer = gst_buffer_new_wrapped(d, size);
+    /* Reference frame_msg so it stays around until our 'deallocator' releases it */
+    spice_msg_in_ref(frame_msg);
+    GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY,
+                                                    data, size, 0, size,
+                                                    frame_msg, &release_msg_in);
 
     if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
         SPICE_DEBUG("GStreamer error: unable to push frame of size %d", size);
@@ -195,8 +246,36 @@  static uint8_t* gst_decoder_decode_frame(GstDecoder *decoder,
      */
     release_last_frame(decoder);
 
+    /* The pipeline may have called appsrc_need_data_cb() after we got the last
+     * output frame. This would cause us to return prematurely so reset
+     * pipeline_wait so we do wait for it to process this buffer.
+     */
+    g_mutex_lock(&decoder->pipeline_mutex);
+    decoder->pipeline_wait = 1;
+    g_mutex_unlock(&decoder->pipeline_mutex);
+    /* Note that it's possible for appsrc_need_data_cb() to get called between
+     * now and the pipeline wait. But this will at most cause a one frame delay.
+     */
+
     if (push_compressed_buffer(decoder, frame_msg)) {
-        return pull_raw_frame(decoder);
+        /* Wait for the pipeline to either produce a decoded frame, or ask
+         * for more data which means an error happened.
+         */
+        g_mutex_lock(&decoder->pipeline_mutex);
+        while (decoder->pipeline_wait) {
+            g_cond_wait(&decoder->pipeline_cond, &decoder->pipeline_mutex);
+        }
+        decoder->pipeline_wait = 1;
+        uint32_t samples = decoder->samples_count;
+        if (samples) {
+            decoder->samples_count--;
+        }
+        g_mutex_unlock(&decoder->pipeline_mutex);
+
+        /* If a decoded frame waits for us, return it */
+        if (samples) {
+            return pull_raw_frame(decoder);
+        }
     }
     return NULL;
 }

Comments

On Thu, Aug 27, 2015 at 09:03:09PM +0200, Francois Gouget wrote:
> Signed-off-by: Francois Gouget <fgouget@codeweavers.com>
> ---
> 
> Changes since take 2:
>  - None.
> 
>  src/channel-display-gst.c | 89 ++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 84 insertions(+), 5 deletions(-)
> 
> diff --git a/src/channel-display-gst.c b/src/channel-display-gst.c
> index 3024356..59ce3e1 100644
> --- a/src/channel-display-gst.c
> +++ b/src/channel-display-gst.c
> @@ -48,11 +48,43 @@ struct GstDecoder {
>  
>      /* ---------- Output frame data ---------- */
>  
> +    GMutex pipeline_mutex;
> +    GCond pipeline_cond;
> +    int pipeline_wait;

Could be a gboolean from the look of it.

> +    uint32_t samples_count;
> +
>      GstSample *sample;
>      GstMapInfo mapinfo;
>  };
>  
>  
> +/* Signals that the pipeline is done processing the last buffer we gave it.
> + *
> + * @decoder:   The video decoder object.
> + * @samples:   How many samples to add to the available samples count.
> + */
> +static void signal_pipeline(GstDecoder *decoder, uint32_t samples)
> +{
> +    g_mutex_lock(&decoder->pipeline_mutex);
> +    decoder->pipeline_wait = 0;
> +    decoder->samples_count += samples;
> +    g_cond_signal(&decoder->pipeline_cond);
> +    g_mutex_unlock(&decoder->pipeline_mutex);
> +}
> +
> +static void appsrc_need_data_cb(GstAppSrc *src, guint length, gpointer user_data)
> +{
> +    GstDecoder *decoder = (GstDecoder*)user_data;
> +    signal_pipeline(decoder, 0);
> +}
> +
> +static GstFlowReturn appsink_new_sample_cb(GstAppSink *appsrc, gpointer user_data)
> +{
> +    GstDecoder *decoder = (GstDecoder*)user_data;
> +    signal_pipeline(decoder, 1);
> +    return GST_FLOW_OK;
> +}
> +
>  /* ---------- GStreamer pipeline ---------- */
>  
>  static void reset_pipeline(GstDecoder *decoder)
> @@ -66,10 +98,18 @@ static void reset_pipeline(GstDecoder *decoder)
>      gst_object_unref(decoder->appsink);
>      gst_object_unref(decoder->pipeline);
>      decoder->pipeline = NULL;
> +
> +    g_mutex_clear(&decoder->pipeline_mutex);
> +    g_cond_clear(&decoder->pipeline_cond);
>  }
>  
>  static gboolean construct_pipeline(GstDecoder *decoder)
>  {
> +    g_mutex_init(&decoder->pipeline_mutex);
> +    g_cond_init(&decoder->pipeline_cond);

These symbols are not available in el6 glib2, so we'll need to add a compat
implementation for them somehow, but we can fix it later.

> +    decoder->pipeline_wait = 1;
> +    decoder->samples_count = 0;
> +
>      const gchar *src_caps, *gstdec_name;
>      switch (decoder->base.codec_type) {
>      case SPICE_VIDEO_CODEC_TYPE_MJPEG:
> @@ -101,7 +141,12 @@ static gboolean construct_pipeline(GstDecoder *decoder)
>      }
>  
>      decoder->appsrc = GST_APP_SRC(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "src"));
> +    GstAppSrcCallbacks appsrc_cbs = {&appsrc_need_data_cb, NULL, NULL};
> +    gst_app_src_set_callbacks(decoder->appsrc, &appsrc_cbs, decoder, NULL);
> +
>      decoder->appsink = GST_APP_SINK(gst_bin_get_by_name(GST_BIN(decoder->pipeline), "sink"));
> +    GstAppSinkCallbacks appsink_cbs = {NULL, NULL, &appsink_new_sample_cb};
> +    gst_app_sink_set_callbacks(decoder->appsink, &appsink_cbs, decoder, NULL);
>  
>      if (gst_element_set_state(decoder->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
>          SPICE_DEBUG("GStreamer error: Unable to set the pipeline to the playing state.");
> @@ -112,6 +157,11 @@ static gboolean construct_pipeline(GstDecoder *decoder)
>      return TRUE;
>  }
>  
> +static void release_msg_in(gpointer data)
> +{
> +    spice_msg_in_unref((SpiceMsgIn*)data);
> +}
> +
>  static gboolean push_compressed_buffer(GstDecoder *decoder,
>                                         SpiceMsgIn *frame_msg)
>  {
> @@ -122,10 +172,11 @@ static gboolean push_compressed_buffer(GstDecoder *decoder,
>          return FALSE;
>      }
>  
> -    // TODO.  Grr.  Seems like a wasted alloc
> -    gpointer d = g_malloc(size);
> -    memcpy(d, data, size);
> -    GstBuffer *buffer = gst_buffer_new_wrapped(d, size);
> +    /* Reference frame_msg so it stays around until our 'deallocator' releases it */
> +    spice_msg_in_ref(frame_msg);
> +    GstBuffer *buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY,
> +                                                    data, size, 0, size,
> +                                                    frame_msg, &release_msg_in);
>  
>      if (gst_app_src_push_buffer(decoder->appsrc, buffer) != GST_FLOW_OK) {
>          SPICE_DEBUG("GStreamer error: unable to push frame of size %d", size);
> @@ -195,8 +246,36 @@ static uint8_t* gst_decoder_decode_frame(GstDecoder *decoder,
>       */
>      release_last_frame(decoder);
>  
> +    /* The pipeline may have called appsrc_need_data_cb() after we got the last
> +     * output frame. This would cause us to return prematurely so reset
> +     * pipeline_wait so we do wait for it to process this buffer.
> +     */
> +    g_mutex_lock(&decoder->pipeline_mutex);
> +    decoder->pipeline_wait = 1;
> +    g_mutex_unlock(&decoder->pipeline_mutex);
> +    /* Note that it's possible for appsrc_need_data_cb() to get called between
> +     * now and the pipeline wait. But this will at most cause a one frame delay.
> +     */
> +
>      if (push_compressed_buffer(decoder, frame_msg)) {
> -        return pull_raw_frame(decoder);
> +        /* Wait for the pipeline to either produce a decoded frame, or ask
> +         * for more data which means an error happened.
> +         */
> +        g_mutex_lock(&decoder->pipeline_mutex);
> +        while (decoder->pipeline_wait) {
> +            g_cond_wait(&decoder->pipeline_cond, &decoder->pipeline_mutex);
> +        }
> +        decoder->pipeline_wait = 1;
> +        uint32_t samples = decoder->samples_count;
> +        if (samples) {
> +            decoder->samples_count--;
> +        }
> +        g_mutex_unlock(&decoder->pipeline_mutex);
> +
> +        /* If a decoded frame waits for us, return it */
> +        if (samples) {
> +            return pull_raw_frame(decoder);
> +        }
>      }
>      return NULL;

Always worried to add some threading, but looks ok :)

Christophe
On Mon, 21 Sep 2015, Christophe Fergeau wrote:

> Do we want to add gst 0.10 code at this point? It has been 
> unmaintained upstream for 2.5 years
> http://lists.freedesktop.org/archives/gstreamer-announce/2013-March/000273.html
[...]

On Mon, 21 Sep 2015, Christophe Fergeau wrote:
[...]
> +    g_mutex_init(&decoder->pipeline_mutex);
> +    g_cond_init(&decoder->pipeline_cond);
>
> These symbols are not available in el6 glib2, so we'll need to add a compat
> implementation for them somehow, but we can fix it later.

As far as I can tell the latest RHEL 6 does not haev GStreamer 1.0. So 
either you don't care about el6 support and then this issue is moot, or 
you do care about it and then GStreamer 0.10 support will be needed.