[pulseaudio-discuss,8/8] rtp: Add a GStreamer-based RTP implementation

Submitted by Arun Raghavan on Feb. 29, 2016, 10:16 a.m.

Details

Message ID 1456740996-13908-9-git-send-email-arun@accosted.net
State New
Headers show
Series "Add GStreamer-based RTP support" ( rev: 1 ) in PulseAudio

Not browsing as part of any series.

Commit Message

Arun Raghavan Feb. 29, 2016, 10:16 a.m.
From: Arun Raghavan <git@arunraghavan.net>

This adds a GStreamer-based RTP implementation to replace our own. The
original implementation is retained for cases where it is not possible
to include GStreamer as a dependency.

The idea with this is to be able to start supporting more advanced RTP
features such as RTP, non-PCM audio, and potentially synchronised
playback.
---
 configure.ac                      |  17 ++
 src/Makefile.am                   |  16 +-
 src/modules/rtp/module-rtp-recv.c |   2 +-
 src/modules/rtp/module-rtp-send.c |   2 +-
 src/modules/rtp/rtp-common.c      |  97 ++++++++
 src/modules/rtp/rtp-gstreamer.c   | 475 ++++++++++++++++++++++++++++++++++++++
 src/modules/rtp/rtp-native.c      | 379 ++++++++++++++++++++++++++++++
 src/modules/rtp/rtp.c             | 451 ------------------------------------
 src/modules/rtp/rtp.h             |   4 +-
 9 files changed, 984 insertions(+), 459 deletions(-)
 create mode 100644 src/modules/rtp/rtp-common.c
 create mode 100644 src/modules/rtp/rtp-gstreamer.c
 create mode 100644 src/modules/rtp/rtp-native.c
 delete mode 100644 src/modules/rtp/rtp.c

Patch hide | download patch | download mbox

diff --git a/configure.ac b/configure.ac
index 8454e4c..6e440eb 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1301,6 +1301,22 @@  AC_SUBST(HAVE_SYSTEMD_JOURNAL)
 AM_CONDITIONAL([HAVE_SYSTEMD_JOURNAL], [test "x$HAVE_SYSTEMD_JOURNAL" = x1])
 AS_IF([test "x$HAVE_SYSTEMD_JOURNAL" = "x1"], AC_DEFINE([HAVE_SYSTEMD_JOURNAL], 1, [Have SYSTEMDJOURNAL?]))
 
+#### GStreamer-based RTP support (optional) ####
+
+AC_ARG_ENABLE([gstreamer],
+    AS_HELP_STRING([--disable-gstreamer],[Disable optional GStreamer-based RTP support]))
+
+AS_IF([test "x$enable_gstreamer" != "xno"],
+    [PKG_CHECK_MODULES(GSTREAMER, [ gstreamer-1.0 gstreamer-app-1.0 gstreamer-rtp-1.0 gio-2.0 ],
+                       HAVE_GSTREAMER=1, HAVE_GSTREAMER=0)],
+    HAVE_GSTREAMER=0)
+
+AS_IF([test "x$enable_gstreamer" = "xyes" && test "x$HAVE_GSTREAMER" = "x0"],
+    [AC_MSG_ERROR([*** GStreamer 1.0 support not found])])
+
+AM_CONDITIONAL([HAVE_GSTREAMER], [test "x$HAVE_GSTREAMER" = x1])
+AS_IF([test "x$HAVE_GSTREAMER" = "x1"], AC_DEFINE([HAVE_GSTREAMER], 1, [Have GStreamer?]))
+
 #### Build and Install man pages ####
 
 AC_ARG_ENABLE([manpages],
@@ -1647,6 +1663,7 @@  echo "
     Enable speex (resampler, AEC): ${ENABLE_SPEEX}
     Enable soxr (resampler):       ${ENABLE_SOXR}
     Enable WebRTC echo canceller:  ${ENABLE_WEBRTC}
+    Enable GStreamer-based RTP:    $}HAVE_GSTREAMER}
     Enable gcov coverage:          ${ENABLE_GCOV}
     Enable unit tests:             ${ENABLE_TESTS}
     Database
diff --git a/src/Makefile.am b/src/Makefile.am
index aa96999..85ac0da 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1124,13 +1124,21 @@  libprotocol_esound_la_LIBADD = $(AM_LIBADD) libpulsecore-@PA_MAJORMINOR@.la libp
 endif
 
 librtp_la_SOURCES = \
-		modules/rtp/rtp.c modules/rtp/rtp.h \
+		modules/rtp/rtp-common.c modules/rtp/rtp.h \
 		modules/rtp/sdp.c modules/rtp/sdp.h \
 		modules/rtp/sap.c modules/rtp/sap.h \
 		modules/rtp/rtsp_client.c modules/rtp/rtsp_client.h \
 		modules/rtp/headerlist.c modules/rtp/headerlist.h
+librtp_la_CFLAGS = $(AM_CFLAGS)
 librtp_la_LDFLAGS = $(AM_LDFLAGS) $(AM_LIBLDFLAGS) -avoid-version
 librtp_la_LIBADD = $(AM_LIBADD) libpulsecore-@PA_MAJORMINOR@.la libpulsecommon-@PA_MAJORMINOR@.la libpulse.la
+if HAVE_GSTREAMER
+librtp_la_SOURCES += modules/rtp/rtp-gstreamer.c
+librtp_la_CFLAGS += $(GSTREAMER_CFLAGS)
+librtp_la_LIBADD += $(GSTREAMER_LIBS)
+else
+librtp_la_SOURCES += modules/rtp/rtp-native.c
+endif
 
 libraop_la_SOURCES = \
         modules/raop/raop_client.c modules/raop/raop_client.h \
@@ -2067,12 +2075,12 @@  endif
 module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
 module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
 module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
-module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
+module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
 
 module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
 module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
 module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
-module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
+module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
 
 # JACK
 
@@ -2185,7 +2193,7 @@  module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
 module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
 module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
 module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
-module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
+module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)
 
 module_raop_discover_la_SOURCES = modules/raop/module-raop-discover.c
 module_raop_discover_la_LDFLAGS = $(MODULE_LDFLAGS)
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index 1ee9c91..7a74aa4 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -570,7 +570,7 @@  static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
 
     pa_memblock_unref(silence.memblock);
 
-    if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, pa_frame_size(&s->sdp_info.sample_spec))))
+    if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
         goto fail;
 
     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c
index 6110455..6797e5a 100644
--- a/src/modules/rtp/module-rtp-send.c
+++ b/src/modules/rtp/module-rtp-send.c
@@ -486,7 +486,7 @@  int pa__init(pa_module*m) {
 
     pa_xfree(n);
 
-    if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, pa_frame_size(&ss))))
+    if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, &ss)))
         goto fail;
     pa_sap_context_init_send(&u->sap_context, sap_fd, p);
 
diff --git a/src/modules/rtp/rtp-common.c b/src/modules/rtp/rtp-common.c
new file mode 100644
index 0000000..65e2c7a
--- /dev/null
+++ b/src/modules/rtp/rtp-common.c
@@ -0,0 +1,97 @@ 
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2006 Lennart Poettering
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "rtp.h"
+
+#include <pulsecore/core-util.h>
+
+uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
+    pa_assert(ss);
+
+    if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2)
+        return 10;
+    if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1)
+        return 11;
+
+    return 127;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
+    pa_assert(ss);
+
+    switch (payload) {
+        case 10:
+            ss->channels = 2;
+            ss->format = PA_SAMPLE_S16BE;
+            ss->rate = 44100;
+            break;
+
+        case 11:
+            ss->channels = 1;
+            ss->format = PA_SAMPLE_S16BE;
+            ss->rate = 44100;
+            break;
+
+        default:
+            return NULL;
+    }
+
+    return ss;
+}
+
+pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
+    pa_assert(ss);
+
+    if (!pa_rtp_sample_spec_valid(ss))
+        ss->format = PA_SAMPLE_S16BE;
+
+    pa_assert(pa_rtp_sample_spec_valid(ss));
+    return ss;
+}
+
+int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
+    pa_assert(ss);
+
+    if (!pa_sample_spec_valid(ss))
+        return 0;
+
+    return ss->format == PA_SAMPLE_S16BE;
+}
+
+const char* pa_rtp_format_to_string(pa_sample_format_t f) {
+    switch (f) {
+        case PA_SAMPLE_S16BE:
+            return "L16";
+        default:
+            return NULL;
+    }
+}
+
+pa_sample_format_t pa_rtp_string_to_format(const char *s) {
+    pa_assert(s);
+
+    if (pa_streq(s, "L16"))
+        return PA_SAMPLE_S16BE;
+    else
+        return PA_SAMPLE_INVALID;
+}
diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c
new file mode 100644
index 0000000..413d0e4
--- /dev/null
+++ b/src/modules/rtp/rtp-gstreamer.c
@@ -0,0 +1,475 @@ 
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2016 Arun Raghavan <mail@arunraghavan.net>
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <pulse/timeval.h>
+#include <pulsecore/fdsem.h>
+#include <pulsecore/core-rtclock.h>
+
+#include "rtp.h"
+
+#include <gio/gio.h>
+
+#include <gst/gst.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappsink.h>
+#include <gst/rtp/gstrtpbuffer.h>
+
+#define MAKE_ELEMENT_NAMED(v, e, n)                     \
+    v = gst_element_factory_make(e, n);                 \
+    if (!v) {                                           \
+        pa_log("Could not create %s element", e);       \
+        goto fail;                                      \
+    }
+
+#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+
+typedef struct pa_rtp_context {
+    pa_fdsem *fdsem;
+    pa_sample_spec ss;
+
+    GstElement *pipeline;
+    GstElement *appsrc;
+    GstElement *appsink;
+
+    uint32_t last_timestamp;
+} pa_rtp_context;
+
+static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
+    if (ss->format != PA_SAMPLE_S16BE)
+        return NULL;
+
+    return gst_caps_new_simple("audio/x-raw",
+            "format", G_TYPE_STRING, "S16BE",
+            "rate", G_TYPE_INT, (int) ss->rate,
+            "channels", G_TYPE_INT, (int) ss->channels,
+            "layout", G_TYPE_STRING, "interleaved",
+            NULL);
+}
+static bool init_send_pipeline(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+    GstElement *appsrc = NULL, *pay = NULL, *capsf = NULL, *rtpbin = NULL, *sink = NULL;
+    GstCaps *caps;
+
+    MAKE_ELEMENT(appsrc, "appsrc");
+    MAKE_ELEMENT(pay, "rtpL16pay");
+    MAKE_ELEMENT(capsf, "capsfilter");
+    MAKE_ELEMENT(rtpbin, "rtpbin");
+    MAKE_ELEMENT(sink, "fdsink");
+
+    c->pipeline = gst_pipeline_new(NULL);
+
+    gst_bin_add_many(GST_BIN(c->pipeline), appsrc, pay, capsf, rtpbin, sink, NULL);
+
+    caps = caps_from_sample_spec(ss);
+    if (!caps) {
+        pa_log("Unsupported format to payload");
+        goto fail;
+    }
+
+    g_object_set(appsrc, "caps", caps, "is-live", TRUE, "blocksize", mtu, "format", 3 /* time */, NULL);
+    g_object_set(pay, "mtu", mtu, NULL);
+    g_object_set(sink, "fd", fd, "enable-last-sample", FALSE, NULL);
+
+    gst_caps_unref(caps);
+
+    /* Force the payload type that we want */
+    caps = gst_caps_new_simple("application/x-rtp", "payload", G_TYPE_INT, (int) payload, NULL);
+    g_object_set(capsf, "caps", caps, NULL);
+    gst_caps_unref(caps);
+
+    if (!gst_element_link(appsrc, pay) ||
+        !gst_element_link(pay, capsf) ||
+        !gst_element_link_pads(capsf, "src", rtpbin, "send_rtp_sink_0") ||
+        !gst_element_link_pads(rtpbin, "send_rtp_src_0", sink, "sink")) {
+
+        pa_log("Could not set up send pipeline");
+        goto fail;
+    }
+
+    if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+        pa_log("Could not start pipeline");
+        goto fail;
+    }
+
+    c->appsrc = gst_object_ref(appsrc);
+
+    return true;
+
+fail:
+    if (c->pipeline) {
+        gst_object_unref(c->pipeline);
+    } else {
+        /* These weren't yet added to pipeline, so we still have a ref */
+        if (appsrc)
+            gst_object_unref(appsrc);
+        if (pay)
+            gst_object_unref(pay);
+        if (capsf)
+            gst_object_unref(capsf);
+        if (rtpbin)
+            gst_object_unref(rtpbin);
+        if (sink)
+            gst_object_unref(sink);
+    }
+
+    return false;
+}
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+    pa_rtp_context *c = NULL;
+    GError *error = NULL;
+
+    pa_assert(fd >= 0);
+
+    c = pa_xnew0(pa_rtp_context, 1);
+
+    c->fdsem = pa_fdsem_new();
+    c->ss = *ss;
+
+    if (!gst_init_check(NULL, NULL, &error)) {
+        pa_log_error("Could not initialise GStreamer: %s", error->message);
+        g_error_free(error);
+        goto fail;
+    }
+
+    if (!init_send_pipeline(c, fd, payload, mtu, ss))
+        goto fail;
+
+    return c;
+
+fail:
+    pa_xfree(c);
+    return NULL;
+}
+
+static bool process_bus_messages(pa_rtp_context *c) {
+    GstBus *bus;
+    GstMessage *message;
+    bool ret = true;
+
+    bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+
+    while (ret && (message = gst_bus_pop(bus))) {
+        if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
+            GError *error = NULL;
+
+            ret = false;
+
+            gst_message_parse_error(message, &error, NULL);
+            pa_log("Got an error: %s", error->message);
+
+            g_error_free(error);
+
+            pa_fdsem_post(c->fdsem);
+        }
+
+        gst_message_unref(message);
+    }
+
+    gst_object_unref(bus);
+
+    return ret;
+}
+
+static void free_buffer(pa_memblock *memblock) {
+    pa_memblock_release(memblock);
+    pa_memblock_unref(memblock);
+}
+
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
+    pa_memchunk chunk = { 0, };
+    GstBuffer *buf;
+    void *data;
+    bool stop = false;
+    int ret = 0;
+
+    pa_assert(c);
+    pa_assert(q);
+
+    if (!process_bus_messages(c))
+        return -1;
+
+    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
+        pa_assert(chunk.memblock);
+
+        data = pa_memblock_acquire(chunk.memblock);
+
+        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
+                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
+                                          (GDestroyNotify) free_buffer);
+
+        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+            pa_log_error("Could not push buffer");
+            stop = true;
+            ret = -1;
+        }
+
+        pa_memblockq_drop(q, chunk.length);
+    }
+
+    return ret;
+}
+
+static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
+    if (ss->format != PA_SAMPLE_S16BE)
+        return NULL;
+
+    return gst_caps_new_simple("application/x-rtp",
+            "media", G_TYPE_STRING, "audio",
+            "encoding-name", G_TYPE_STRING, "L16",
+            "clock-rate", G_TYPE_INT, (int) ss->rate,
+            "payload", G_TYPE_INT, (int) pa_rtp_payload_from_sample_spec(ss),
+            "layout", G_TYPE_STRING, "interleaved",
+            NULL);
+}
+
+static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
+    pa_rtp_context *c = (pa_rtp_context *) userdata;
+    GstElement *depay;
+    GstPad *sinkpad;
+    GstPadLinkReturn ret;
+
+    depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
+    pa_assert(depay);
+
+    sinkpad = gst_element_get_static_pad(depay, "sink");
+
+    ret = gst_pad_link(pad, sinkpad);
+    if (ret != GST_PAD_LINK_OK) {
+        GstBus *bus;
+        GError *error;
+
+        bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
+        error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
+        gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
+
+        g_error_free(error);
+        gst_object_unref(bus);
+    }
+
+    gst_object_unref(sinkpad);
+    gst_object_unref(depay);
+}
+
+static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
+    GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
+    GstCaps *caps;
+    GSocket *socket;
+    GError *error = NULL;
+
+    MAKE_ELEMENT(udpsrc, "udpsrc");
+    MAKE_ELEMENT(rtpbin, "rtpbin");
+    MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
+    MAKE_ELEMENT(appsink, "appsink");
+
+    c->pipeline = gst_pipeline_new(NULL);
+
+    gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
+
+    socket = g_socket_new_from_fd(fd, &error);
+    if (error) {
+        pa_log("Could not create socket: %s", error->message);
+        g_error_free(error);
+        goto fail;
+    }
+
+    caps = rtp_caps_from_sample_spec(ss);
+    if (!caps) {
+        pa_log("Unsupported format to payload");
+        goto fail;
+    }
+
+    g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
+    g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
+    g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
+
+    gst_caps_unref(caps);
+    g_object_unref(socket);
+
+    if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
+        !gst_element_link(depay, appsink)) {
+
+        pa_log("Could not set up send pipeline");
+        goto fail;
+    }
+
+    g_signal_connect(G_OBJECT(rtpbin), "pad-added", G_CALLBACK(on_pad_added), c);
+
+    if (gst_element_set_state(c->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
+        pa_log("Could not start pipeline");
+        goto fail;
+    }
+
+    c->appsink = gst_object_ref(appsink);
+
+    return true;
+
+fail:
+    if (c->pipeline) {
+        gst_object_unref(c->pipeline);
+    } else {
+        /* These weren't yet added to pipeline, so we still have a ref */
+        if (udpsrc)
+            gst_object_unref(udpsrc);
+        if (depay)
+            gst_object_unref(depay);
+        if (rtpbin)
+            gst_object_unref(rtpbin);
+        if (appsink)
+            gst_object_unref(appsink);
+    }
+
+    return false;
+}
+
+static void appsink_eos(GstAppSink *appsink, gpointer userdata) {
+    pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+    pa_fdsem_post(c->fdsem);
+}
+
+static GstFlowReturn appsink_new_sample(GstAppSink *appsink, gpointer userdata) {
+    pa_rtp_context *c = (pa_rtp_context *) userdata;
+
+    pa_fdsem_post(c->fdsem);
+
+    return GST_FLOW_OK;
+}
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+    pa_rtp_context *c = NULL;
+    GstAppSinkCallbacks callbacks = { 0, };
+    GError *error = NULL;
+
+    pa_assert(fd >= 0);
+
+    c = pa_xnew0(pa_rtp_context, 1);
+
+    c->fdsem = pa_fdsem_new();
+    c->ss = *ss;
+
+    if (!gst_init_check(NULL, NULL, &error)) {
+        pa_log_error("Could not initialise GStreamer: %s", error->message);
+        g_error_free(error);
+        goto fail;
+    }
+
+    if (!init_receive_pipeline(c, fd, ss))
+        goto fail;
+
+    callbacks.eos = appsink_eos;
+    callbacks.new_sample = appsink_new_sample;
+    gst_app_sink_set_callbacks(GST_APP_SINK(c->appsink), &callbacks, c, NULL);
+
+    return c;
+
+fail:
+    pa_xfree(c);
+    return NULL;
+}
+
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
+    GstSample *sample = NULL;
+    GstBuffer *buf;
+    GstMapInfo info;
+    void *data;
+
+    if (!process_bus_messages(c))
+        goto fail;
+
+    sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
+    if (!sample) {
+        pa_log_warn("Could not get any more data");
+        goto fail;
+    }
+
+    buf = gst_sample_get_buffer(sample);
+
+    if (GST_BUFFER_IS_DISCONT(buf))
+        pa_log_info("Discontinuity detected, possibly lost some packets");
+
+    if (!gst_buffer_map(buf, &info, GST_MAP_READ))
+        goto fail;
+
+    pa_assert(pa_mempool_block_size_max(pool) >= info.size);
+
+    chunk->memblock = pa_memblock_new(pool, info.size);
+    chunk->index = 0;
+    chunk->length = info.size;
+
+    data = pa_memblock_acquire_chunk(chunk);
+    /* TODO: we could probably just provide an allocator and avoid a memcpy */
+    memcpy(data, info.data, info.size);
+    pa_memblock_release(chunk->memblock);
+
+    /* When buffer-mode = none, the buffer PTS is the RTP timestamp, converted
+     * to time units (instead of clock-rate units as is in the header) and
+     * wraparound-corrected, and the DTS is the pipeline clock timestamp from
+     * when the buffer was acquired at the source (this is actually the running
+     * time which is why we need to add base time). */
+    *rtp_tstamp = gst_util_uint64_scale_int(GST_BUFFER_PTS(buf), c->ss.rate, GST_SECOND) & 0xFFFFFFFFU;
+    pa_timeval_rtstore(tstamp, (GST_BUFFER_DTS(buf) + gst_element_get_base_time(c->pipeline)) / GST_USECOND, false);
+
+    gst_buffer_unmap(buf, &info);
+    gst_sample_unref(sample);
+
+    return 0;
+
+fail:
+    if (sample)
+        gst_sample_unref(sample);
+
+    if (chunk->memblock)
+        pa_memblock_unref(chunk->memblock);
+
+    return -1;
+}
+
+void pa_rtp_context_destroy(pa_rtp_context *c) {
+    pa_assert(c);
+
+    if (c->appsrc) {
+        gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
+        gst_object_unref(c->appsrc);
+    }
+
+    if (c->appsink)
+        gst_object_unref(c->appsink);
+
+    if (c->pipeline) {
+        gst_element_set_state(c->pipeline, GST_STATE_NULL);
+        gst_object_unref(c->pipeline);
+    }
+
+    if (c->fdsem)
+        pa_fdsem_free(c->fdsem);
+
+    pa_xfree(c);
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+    return pa_rtpoll_item_new_fdsem(rtpoll, PA_RTPOLL_LATE, c->fdsem);
+}
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+    return pa_frame_size(&c->ss);
+}
diff --git a/src/modules/rtp/rtp-native.c b/src/modules/rtp/rtp-native.c
new file mode 100644
index 0000000..5c1a8f2
--- /dev/null
+++ b/src/modules/rtp/rtp-native.c
@@ -0,0 +1,379 @@ 
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2006 Lennart Poettering
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#ifdef HAVE_SYS_UIO_H
+#include <sys/uio.h>
+#endif
+
+#include <pulsecore/core-error.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/arpa-inet.h>
+#include <pulsecore/poll.h>
+
+#include "rtp.h"
+
+typedef struct pa_rtp_context {
+    int fd;
+    uint16_t sequence;
+    uint32_t timestamp;
+    uint32_t ssrc;
+    uint8_t payload;
+    size_t frame_size;
+    size_t mtu;
+
+    pa_memchunk memchunk;
+} pa_rtp_context;
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
+    pa_rtp_context *c;
+
+    pa_assert(fd >= 0);
+
+    c = pa_xnew0(pa_rtp_context, 1);
+
+    c->fd = fd;
+    c->sequence = (uint16_t) (rand()*rand());
+    c->timestamp = 0;
+    c->ssrc = (uint32_t) (rand()*rand());
+    c->payload = (uint8_t) (payload & 127U);
+    c->frame_size = pa_frame_size(ss);
+    c->mtu = mtu;
+
+    pa_memchunk_reset(&c->memchunk);
+
+    return c;
+}
+
+#define MAX_IOVECS 16
+
+int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
+    struct iovec iov[MAX_IOVECS];
+    pa_memblock* mb[MAX_IOVECS];
+    int iov_idx = 1;
+    size_t n = 0;
+
+    pa_assert(c);
+    pa_assert(q);
+
+    if (pa_memblockq_get_length(q) < c->mtu)
+        return 0;
+
+    for (;;) {
+        int r;
+        pa_memchunk chunk;
+
+        pa_memchunk_reset(&chunk);
+
+        if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+
+            size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+            pa_assert(chunk.memblock);
+
+            iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk);
+            iov[iov_idx].iov_len = k;
+            mb[iov_idx] = chunk.memblock;
+            iov_idx ++;
+
+            n += k;
+            pa_memblockq_drop(q, k);
+        }
+
+        pa_assert(n % c->frame_size == 0);
+
+        if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
+            uint32_t header[3];
+            struct msghdr m;
+            ssize_t k;
+            int i;
+
+            if (n > 0) {
+                header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence));
+                header[1] = htonl(c->timestamp);
+                header[2] = htonl(c->ssrc);
+
+                iov[0].iov_base = (void*)header;
+                iov[0].iov_len = sizeof(header);
+
+                m.msg_name = NULL;
+                m.msg_namelen = 0;
+                m.msg_iov = iov;
+                m.msg_iovlen = (size_t) iov_idx;
+                m.msg_control = NULL;
+                m.msg_controllen = 0;
+                m.msg_flags = 0;
+
+                k = sendmsg(c->fd, &m, MSG_DONTWAIT);
+
+                for (i = 1; i < iov_idx; i++) {
+                    pa_memblock_release(mb[i]);
+                    pa_memblock_unref(mb[i]);
+                }
+
+                c->sequence++;
+            } else
+                k = 0;
+
+            c->timestamp += (unsigned) (n/c->frame_size);
+
+            if (k < 0) {
+                if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
+                    pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
+                return -1;
+            }
+
+            if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+                break;
+
+            n = 0;
+            iov_idx = 1;
+        }
+    }
+
+    return 0;
+}
+
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss) {
+    pa_rtp_context *c;
+
+    c = pa_xnew0(pa_rtp_context, 1);
+
+    c->fd = fd;
+    c->payload = payload;
+    c->frame_size = pa_frame_size(ss);
+
+    pa_memchunk_reset(&c->memchunk);
+
+    return c;
+}
+
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
+    int size;
+    struct msghdr m;
+    struct cmsghdr *cm;
+    struct iovec iov;
+    uint32_t header;
+    uint32_t ssrc;
+    uint8_t payload;
+    unsigned cc;
+    ssize_t r;
+    uint8_t aux[1024];
+    bool found_tstamp = false;
+
+    pa_assert(c);
+    pa_assert(chunk);
+
+    pa_memchunk_reset(chunk);
+
+    if (ioctl(c->fd, FIONREAD, &size) < 0) {
+        pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
+        goto fail;
+    }
+
+    if (size <= 0) {
+        /* size can be 0 due to any of the following reasons:
+         *
+         * 1. Somebody sent us a perfectly valid zero-length UDP packet.
+         * 2. Somebody sent us a UDP packet with a bad CRC.
+         *
+         * It is unknown whether size can actually be less than zero.
+         *
+         * In the first case, the packet has to be read out, otherwise the
+         * kernel will tell us again and again about it, thus preventing
+         * reception of any further packets. So let's just read it out
+         * now and discard it later, when comparing the number of bytes
+         * received (0) with the number of bytes wanted (1, see below).
+         *
+         * In the second case, recvmsg() will fail, thus allowing us to
+         * return the error.
+         *
+         * Just to avoid passing zero-sized memchunks and NULL pointers to
+         * recvmsg(), let's force allocation of at least one byte by setting
+         * size to 1.
+         */
+        size = 1;
+    }
+
+    if (c->memchunk.length < (unsigned) size) {
+        size_t l;
+
+        if (c->memchunk.memblock)
+            pa_memblock_unref(c->memchunk.memblock);
+
+        l = PA_MAX((size_t) size, pa_mempool_block_size_max(pool));
+
+        c->memchunk.memblock = pa_memblock_new(pool, l);
+        c->memchunk.index = 0;
+        c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock);
+    }
+
+    pa_assert(c->memchunk.length >= (size_t) size);
+
+    chunk->memblock = pa_memblock_ref(c->memchunk.memblock);
+    chunk->index = c->memchunk.index;
+
+    iov.iov_base = pa_memblock_acquire_chunk(chunk);
+    iov.iov_len = (size_t) size;
+
+    m.msg_name = NULL;
+    m.msg_namelen = 0;
+    m.msg_iov = &iov;
+    m.msg_iovlen = 1;
+    m.msg_control = aux;
+    m.msg_controllen = sizeof(aux);
+    m.msg_flags = 0;
+
+    r = recvmsg(c->fd, &m, 0);
+
+    if (r != size) {
+        if (r < 0 && errno != EAGAIN && errno != EINTR)
+            pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
+
+        goto fail;
+    }
+
+    if (size < 12) {
+        pa_log_warn("RTP packet too short.");
+        goto fail;
+    }
+
+    memcpy(&header, iov.iov_base, sizeof(uint32_t));
+    memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
+    memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
+
+    pa_memblock_release(chunk->memblock);
+
+    header = ntohl(header);
+    *rtp_tstamp = ntohl(*rtp_tstamp);
+    ssrc = ntohl(c->ssrc);
+
+    if ((header >> 30) != 2) {
+        pa_log_warn("Unsupported RTP version.");
+        goto fail;
+    }
+
+    if ((header >> 29) & 1) {
+        pa_log_warn("RTP padding not supported.");
+        goto fail;
+    }
+
+    if ((header >> 28) & 1) {
+        pa_log_warn("RTP header extensions not supported.");
+        goto fail;
+    }
+
+    if (ssrc != c->ssrc) {
+        pa_log_debug("Got unexpected SSRC");
+        goto fail;
+    }
+
+    cc = (header >> 24) & 0xF;
+    payload = (uint8_t) ((header >> 16) & 127U);
+    c->sequence = (uint16_t) (header & 0xFFFFU);
+
+    if (payload != c->payload) {
+        pa_log_debug("Got unexpected payload: %u", payload);
+        goto fail;
+    }
+
+    if (12 + cc*4 > (unsigned) size) {
+        pa_log_warn("RTP packet too short. (CSRC)");
+        goto fail;
+    }
+
+    chunk->index += 12 + cc*4;
+    chunk->length = (size_t) size - 12 + cc*4;
+
+    if (chunk->length % c->frame_size != 0) {
+        pa_log_warn("Bad RTP packet size.");
+        goto fail;
+    }
+
+    c->memchunk.index = chunk->index + chunk->length;
+    c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock) - c->memchunk.index;
+
+    if (c->memchunk.length <= 0) {
+        pa_memblock_unref(c->memchunk.memblock);
+        pa_memchunk_reset(&c->memchunk);
+    }
+
+    for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
+        if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) {
+            memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval));
+            found_tstamp = true;
+            break;
+        }
+
+    if (!found_tstamp) {
+        pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!");
+        pa_zero(*tstamp);
+    }
+
+    return 0;
+
+fail:
+    if (chunk->memblock)
+        pa_memblock_unref(chunk->memblock);
+
+    return -1;
+}
+void pa_rtp_context_destroy(pa_rtp_context *c) {
+    pa_assert(c);
+
+    pa_assert_se(pa_close(c->fd) == 0);
+
+    if (c->memchunk.memblock)
+        pa_memblock_unref(c->memchunk.memblock);
+
+    pa_xfree(c);
+}
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+    return c->frame_size;
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+    pa_rtpoll_item *item;
+    struct pollfd *p;
+
+    item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
+
+    p = pa_rtpoll_item_get_pollfd(item, NULL);
+    p->fd = c->fd;
+    p->events = POLLIN;
+    p->revents = 0;
+
+    return item;
+}
diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c
deleted file mode 100644
index 5a24a03..0000000
--- a/src/modules/rtp/rtp.c
+++ /dev/null
@@ -1,451 +0,0 @@ 
-/***
-  This file is part of PulseAudio.
-
-  Copyright 2006 Lennart Poettering
-
-  PulseAudio is free software; you can redistribute it and/or modify
-  it under the terms of the GNU Lesser General Public License as published
-  by the Free Software Foundation; either version 2.1 of the License,
-  or (at your option) any later version.
-
-  PulseAudio is distributed in the hope that it will be useful, but
-  WITHOUT ANY WARRANTY; without even the implied warranty of
-  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  General Public License for more details.
-
-  You should have received a copy of the GNU Lesser General Public License
-  along with PulseAudio; if not, see <http://www.gnu.org/licenses/>.
-***/
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
-#include <unistd.h>
-#include <sys/ioctl.h>
-
-#ifdef HAVE_SYS_FILIO_H
-#include <sys/filio.h>
-#endif
-
-#ifdef HAVE_SYS_UIO_H
-#include <sys/uio.h>
-#endif
-
-#include <pulsecore/core-error.h>
-#include <pulsecore/log.h>
-#include <pulsecore/macro.h>
-#include <pulsecore/core-util.h>
-#include <pulsecore/arpa-inet.h>
-#include <pulsecore/poll.h>
-
-#include "rtp.h"
-
-typedef struct pa_rtp_context {
-    int fd;
-    uint16_t sequence;
-    uint32_t timestamp;
-    uint32_t ssrc;
-    uint8_t payload;
-    size_t frame_size;
-    size_t mtu;
-
-    pa_memchunk memchunk;
-} pa_rtp_context;
-
-pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size) {
-    pa_rtp_context *c;
-
-    pa_assert(fd >= 0);
-
-    c = pa_xnew0(pa_rtp_context, 1);
-
-    c->fd = fd;
-    c->sequence = (uint16_t) (rand()*rand());
-    c->timestamp = 0;
-    c->ssrc = (uint32_t) (rand()*rand());
-    c->payload = (uint8_t) (payload & 127U);
-    c->frame_size = frame_size;
-    c->mtu = mtu;
-
-    pa_memchunk_reset(&c->memchunk);
-
-    return c;
-}
-
-#define MAX_IOVECS 16
-
-int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
-    struct iovec iov[MAX_IOVECS];
-    pa_memblock* mb[MAX_IOVECS];
-    int iov_idx = 1;
-    size_t n = 0;
-
-    pa_assert(c);
-    pa_assert(q);
-
-    if (pa_memblockq_get_length(q) < c->mtu)
-        return 0;
-
-    for (;;) {
-        int r;
-        pa_memchunk chunk;
-
-        pa_memchunk_reset(&chunk);
-
-        if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
-
-            size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
-
-            pa_assert(chunk.memblock);
-
-            iov[iov_idx].iov_base = pa_memblock_acquire_chunk(&chunk);
-            iov[iov_idx].iov_len = k;
-            mb[iov_idx] = chunk.memblock;
-            iov_idx ++;
-
-            n += k;
-            pa_memblockq_drop(q, k);
-        }
-
-        pa_assert(n % c->frame_size == 0);
-
-        if (r < 0 || n >= c->mtu || iov_idx >= MAX_IOVECS) {
-            uint32_t header[3];
-            struct msghdr m;
-            ssize_t k;
-            int i;
-
-            if (n > 0) {
-                header[0] = htonl(((uint32_t) 2 << 30) | ((uint32_t) c->payload << 16) | ((uint32_t) c->sequence));
-                header[1] = htonl(c->timestamp);
-                header[2] = htonl(c->ssrc);
-
-                iov[0].iov_base = (void*)header;
-                iov[0].iov_len = sizeof(header);
-
-                m.msg_name = NULL;
-                m.msg_namelen = 0;
-                m.msg_iov = iov;
-                m.msg_iovlen = (size_t) iov_idx;
-                m.msg_control = NULL;
-                m.msg_controllen = 0;
-                m.msg_flags = 0;
-
-                k = sendmsg(c->fd, &m, MSG_DONTWAIT);
-
-                for (i = 1; i < iov_idx; i++) {
-                    pa_memblock_release(mb[i]);
-                    pa_memblock_unref(mb[i]);
-                }
-
-                c->sequence++;
-            } else
-                k = 0;
-
-            c->timestamp += (unsigned) (n/c->frame_size);
-
-            if (k < 0) {
-                if (errno != EAGAIN && errno != EINTR) /* If the queue is full, just ignore it */
-                    pa_log("sendmsg() failed: %s", pa_cstrerror(errno));
-                return -1;
-            }
-
-            if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
-                break;
-
-            n = 0;
-            iov_idx = 1;
-        }
-    }
-
-    return 0;
-}
-
-pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size) {
-    pa_rtp_context *c;
-
-    c = pa_xnew0(pa_rtp_context, 1);
-
-    c->fd = fd;
-    c->payload = payload;
-    c->frame_size = frame_size;
-
-    pa_memchunk_reset(&c->memchunk);
-
-    return c;
-}
-
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
-    int size;
-    struct msghdr m;
-    struct cmsghdr *cm;
-    struct iovec iov;
-    uint32_t header;
-    uint32_t ssrc;
-    uint8_t payload;
-    unsigned cc;
-    ssize_t r;
-    uint8_t aux[1024];
-    bool found_tstamp = false;
-
-    pa_assert(c);
-    pa_assert(chunk);
-
-    pa_memchunk_reset(chunk);
-
-    if (ioctl(c->fd, FIONREAD, &size) < 0) {
-        pa_log_warn("FIONREAD failed: %s", pa_cstrerror(errno));
-        goto fail;
-    }
-
-    if (size <= 0) {
-        /* size can be 0 due to any of the following reasons:
-         *
-         * 1. Somebody sent us a perfectly valid zero-length UDP packet.
-         * 2. Somebody sent us a UDP packet with a bad CRC.
-         *
-         * It is unknown whether size can actually be less than zero.
-         *
-         * In the first case, the packet has to be read out, otherwise the
-         * kernel will tell us again and again about it, thus preventing
-         * reception of any further packets. So let's just read it out
-         * now and discard it later, when comparing the number of bytes
-         * received (0) with the number of bytes wanted (1, see below).
-         *
-         * In the second case, recvmsg() will fail, thus allowing us to
-         * return the error.
-         *
-         * Just to avoid passing zero-sized memchunks and NULL pointers to
-         * recvmsg(), let's force allocation of at least one byte by setting
-         * size to 1.
-         */
-        size = 1;
-    }
-
-    if (c->memchunk.length < (unsigned) size) {
-        size_t l;
-
-        if (c->memchunk.memblock)
-            pa_memblock_unref(c->memchunk.memblock);
-
-        l = PA_MAX((size_t) size, pa_mempool_block_size_max(pool));
-
-        c->memchunk.memblock = pa_memblock_new(pool, l);
-        c->memchunk.index = 0;
-        c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock);
-    }
-
-    pa_assert(c->memchunk.length >= (size_t) size);
-
-    chunk->memblock = pa_memblock_ref(c->memchunk.memblock);
-    chunk->index = c->memchunk.index;
-
-    iov.iov_base = pa_memblock_acquire_chunk(chunk);
-    iov.iov_len = (size_t) size;
-
-    m.msg_name = NULL;
-    m.msg_namelen = 0;
-    m.msg_iov = &iov;
-    m.msg_iovlen = 1;
-    m.msg_control = aux;
-    m.msg_controllen = sizeof(aux);
-    m.msg_flags = 0;
-
-    r = recvmsg(c->fd, &m, 0);
-
-    if (r != size) {
-        if (r < 0 && errno != EAGAIN && errno != EINTR)
-            pa_log_warn("recvmsg() failed: %s", r < 0 ? pa_cstrerror(errno) : "size mismatch");
-
-        goto fail;
-    }
-
-    if (size < 12) {
-        pa_log_warn("RTP packet too short.");
-        goto fail;
-    }
-
-    memcpy(&header, iov.iov_base, sizeof(uint32_t));
-    memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
-    memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
-
-    pa_memblock_release(chunk->memblock);
-
-    header = ntohl(header);
-    *rtp_tstamp = ntohl(*rtp_tstamp);
-    ssrc = ntohl(c->ssrc);
-
-    if ((header >> 30) != 2) {
-        pa_log_warn("Unsupported RTP version.");
-        goto fail;
-    }
-
-    if ((header >> 29) & 1) {
-        pa_log_warn("RTP padding not supported.");
-        goto fail;
-    }
-
-    if ((header >> 28) & 1) {
-        pa_log_warn("RTP header extensions not supported.");
-        goto fail;
-    }
-
-    if (ssrc != c->ssrc) {
-        pa_log_debug("Got unexpected SSRC");
-        goto fail;
-    }
-
-    cc = (header >> 24) & 0xF;
-    payload = (uint8_t) ((header >> 16) & 127U);
-    c->sequence = (uint16_t) (header & 0xFFFFU);
-
-    if (payload != c->payload) {
-        pa_log_debug("Got unexpected payload: %u", payload);
-        goto fail;
-    }
-
-    if (12 + cc*4 > (unsigned) size) {
-        pa_log_warn("RTP packet too short. (CSRC)");
-        goto fail;
-    }
-
-    chunk->index += 12 + cc*4;
-    chunk->length = (size_t) size - 12 + cc*4;
-
-    if (chunk->length % c->frame_size != 0) {
-        pa_log_warn("Bad RTP packet size.");
-        goto fail;
-    }
-
-    c->memchunk.index = chunk->index + chunk->length;
-    c->memchunk.length = pa_memblock_get_length(c->memchunk.memblock) - c->memchunk.index;
-
-    if (c->memchunk.length <= 0) {
-        pa_memblock_unref(c->memchunk.memblock);
-        pa_memchunk_reset(&c->memchunk);
-    }
-
-    for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
-        if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_TIMESTAMP) {
-            memcpy(tstamp, CMSG_DATA(cm), sizeof(struct timeval));
-            found_tstamp = true;
-            break;
-        }
-
-    if (!found_tstamp) {
-        pa_log_warn("Couldn't find SCM_TIMESTAMP data in auxiliary recvmsg() data!");
-        pa_zero(*tstamp);
-    }
-
-    return 0;
-
-fail:
-    if (chunk->memblock)
-        pa_memblock_unref(chunk->memblock);
-
-    return -1;
-}
-
-uint8_t pa_rtp_payload_from_sample_spec(const pa_sample_spec *ss) {
-    pa_assert(ss);
-
-    if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 2)
-        return 10;
-    if (ss->format == PA_SAMPLE_S16BE && ss->rate == 44100 && ss->channels == 1)
-        return 11;
-
-    return 127;
-}
-
-pa_sample_spec *pa_rtp_sample_spec_from_payload(uint8_t payload, pa_sample_spec *ss) {
-    pa_assert(ss);
-
-    switch (payload) {
-        case 10:
-            ss->channels = 2;
-            ss->format = PA_SAMPLE_S16BE;
-            ss->rate = 44100;
-            break;
-
-        case 11:
-            ss->channels = 1;
-            ss->format = PA_SAMPLE_S16BE;
-            ss->rate = 44100;
-            break;
-
-        default:
-            return NULL;
-    }
-
-    return ss;
-}
-
-pa_sample_spec *pa_rtp_sample_spec_fixup(pa_sample_spec * ss) {
-    pa_assert(ss);
-
-    if (!pa_rtp_sample_spec_valid(ss))
-        ss->format = PA_SAMPLE_S16BE;
-
-    pa_assert(pa_rtp_sample_spec_valid(ss));
-    return ss;
-}
-
-int pa_rtp_sample_spec_valid(const pa_sample_spec *ss) {
-    pa_assert(ss);
-
-    if (!pa_sample_spec_valid(ss))
-        return 0;
-
-    return ss->format == PA_SAMPLE_S16BE;
-}
-
-void pa_rtp_context_destroy(pa_rtp_context *c) {
-    pa_assert(c);
-
-    pa_assert_se(pa_close(c->fd) == 0);
-
-    if (c->memchunk.memblock)
-        pa_memblock_unref(c->memchunk.memblock);
-
-    pa_xfree(c);
-}
-
-const char* pa_rtp_format_to_string(pa_sample_format_t f) {
-    switch (f) {
-        case PA_SAMPLE_S16BE:
-            return "L16";
-        default:
-            return NULL;
-    }
-}
-
-pa_sample_format_t pa_rtp_string_to_format(const char *s) {
-    pa_assert(s);
-
-    if (pa_streq(s, "L16"))
-        return PA_SAMPLE_S16BE;
-    else
-        return PA_SAMPLE_INVALID;
-}
-
-size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
-    return c->frame_size;
-}
-
-pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
-    pa_rtpoll_item *item;
-    struct pollfd *p;
-
-    item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
-
-    p = pa_rtpoll_item_get_pollfd(item, NULL);
-    p->fd = c->fd;
-    p->events = POLLIN;
-    p->revents = 0;
-
-    return item;
-}
diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h
index 1ddc794..c8ded8d 100644
--- a/src/modules/rtp/rtp.h
+++ b/src/modules/rtp/rtp.h
@@ -29,13 +29,13 @@ 
 
 typedef struct pa_rtp_context pa_rtp_context;
 
-pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size);
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss);
 
 /* If the memblockq doesn't have a silence memchunk set, then the caller must
  * guarantee that the current read index doesn't point to a hole. */
 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q);
 
-pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size);
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample_spec *ss);
 int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp);
 
 void pa_rtp_context_destroy(pa_rtp_context *c);

Comments

On Monday 29 February 2016 15:46:36 arun@accosted.net wrote:
> @@ -1647,6 +1663,7 @@ echo "
>      Enable speex (resampler, AEC): ${ENABLE_SPEEX}
>      Enable soxr (resampler):       ${ENABLE_SOXR}
>      Enable WebRTC echo canceller:  ${ENABLE_WEBRTC}
> +    Enable GStreamer-based RTP:    $}HAVE_GSTREAMER}
                                       ^
                                       {
On 29 February 2016 at 17:07, Pali Rohár <pali.rohar@gmail.com> wrote:

> On Monday 29 February 2016 15:46:36 arun@accosted.net wrote:
> > @@ -1647,6 +1663,7 @@ echo "
> >      Enable speex (resampler, AEC): ${ENABLE_SPEEX}
> >      Enable soxr (resampler):       ${ENABLE_SOXR}
> >      Enable WebRTC echo canceller:  ${ENABLE_WEBRTC}
> > +    Enable GStreamer-based RTP:    $}HAVE_GSTREAMER}
>                                        ^
>                                        {
>
>
Fixed, thanks.

-- Arun
On Mon, 2016-02-29 at 15:46 +0530, arun@accosted.net wrote:
>  module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
>  module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>  
>  module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
>  module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>  
>  # JACK
>  
> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
>  module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
>  module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
>  module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)

Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only
librtp contains GStreamer code.

> +typedef struct pa_rtp_context {

The typedef is already done in rtp.h, no need to do it again.

> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
> +    pa_rtp_context *c = NULL;
> +    GError *error = NULL;
> +
> +    pa_assert(fd >= 0);
> +
> +    c = pa_xnew0(pa_rtp_context, 1);
> +
> +    c->fdsem = pa_fdsem_new();

As far as I can tell, the fdsem is only needed when receiving data, not
when sending.

> +    c->ss = *ss;
> +
> +    if (!gst_init_check(NULL, NULL, &error)) {
> +        pa_log_error("Could not initialise GStreamer: %s", error->message);
> +        g_error_free(error);
> +        goto fail;
> +    }
> +
> +    if (!init_send_pipeline(c, fd, payload, mtu, ss))
> +        goto fail;
> +
> +    return c;
> +
> +fail:
> +    pa_xfree(c);

You should call pa_rtp_context_free() to be sure that everything gets
properly deinitialized. Now you're leaking the fdsem. The same comment
applies to pa_rtp_context_new_recv() too.

> +static bool process_bus_messages(pa_rtp_context *c) {
> +    GstBus *bus;
> +    GstMessage *message;
> +    bool ret = true;
> +
> +    bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
> +
> +    while (ret && (message = gst_bus_pop(bus))) {
> +        if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
> +            GError *error = NULL;
> +
> +            ret = false;
> +
> +            gst_message_parse_error(message, &error, NULL);
> +            pa_log("Got an error: %s", error->message);
> +
> +            g_error_free(error);
> +
> +            pa_fdsem_post(c->fdsem);

What's the purpose of this? If I understand correctly, the fdsem is
used to wake up the pulseaudio source IO thread when we receive data in
a gstreamer thread, but this code runs in the IO thread, so this seems
pointless.

(By the way, it would be good to have comments in each function about
which thread they run in.)

> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
> +    pa_memchunk chunk = { 0, };
> +    GstBuffer *buf;
> +    void *data;
> +    bool stop = false;
> +    int ret = 0;
> +
> +    pa_assert(c);
> +    pa_assert(q);
> +
> +    if (!process_bus_messages(c))
> +        return -1;
> +
> +    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
> +        pa_assert(chunk.memblock);
> +
> +        data = pa_memblock_acquire(chunk.memblock);
> +
> +        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
> +                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
> +                                          (GDestroyNotify) free_buffer);
> +
> +        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
> +            pa_log_error("Could not push buffer");
> +            stop = true;
> +            ret = -1;
> +        }
> +
> +        pa_memblockq_drop(q, chunk.length);
> +    }
> +
> +    return ret;
> +}

I wonder about the error handling in this function. module-rtp-send.c,
which calls this function, doesn't care about the return value.
Unloading the module would make sense to me, but if you don't do that,
how do you ensure that we don't end up spamming errors to the log
infinitely if the pipeline stops working?

> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
> +    pa_rtp_context *c = (pa_rtp_context *) userdata;
> +    GstElement *depay;
> +    GstPad *sinkpad;
> +    GstPadLinkReturn ret;
> +
> +    depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
> +    pa_assert(depay);
> +
> +    sinkpad = gst_element_get_static_pad(depay, "sink");
> +
> +    ret = gst_pad_link(pad, sinkpad);
> +    if (ret != GST_PAD_LINK_OK) {
> +        GstBus *bus;
> +        GError *error;
> +
> +        bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
> +        error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
> +        gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));

It's not clear to me how the messages are dispatched. How does the sink
IO thread get notified of this message? Messages are processed in
pa_rtp_recv(), which is I think is called when we post to the fdsem,
but we don't call pa_fdsem_post() at least in this function. Does the
error produce an EOS event, which then will trigger a pa_fdsem_post()
call?

> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
> +    GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
> +    GstCaps *caps;
> +    GSocket *socket;
> +    GError *error = NULL;
> +
> +    MAKE_ELEMENT(udpsrc, "udpsrc");
> +    MAKE_ELEMENT(rtpbin, "rtpbin");
> +    MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
> +    MAKE_ELEMENT(appsink, "appsink");
> +
> +    c->pipeline = gst_pipeline_new(NULL);
> +
> +    gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
> +
> +    socket = g_socket_new_from_fd(fd, &error);
> +    if (error) {
> +        pa_log("Could not create socket: %s", error->message);
> +        g_error_free(error);
> +        goto fail;
> +    }
> +
> +    caps = rtp_caps_from_sample_spec(ss);
> +    if (!caps) {
> +        pa_log("Unsupported format to payload");
> +        goto fail;
> +    }
> +
> +    g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
> +    g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
> +    g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
> +
> +    gst_caps_unref(caps);
> +    g_object_unref(socket);
> +
> +    if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
> +        !gst_element_link(depay, appsink)) {
> +
> +        pa_log("Could not set up send pipeline");

s/send/receive/

> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
> +    GstSample *sample = NULL;
> +    GstBuffer *buf;
> +    GstMapInfo info;
> +    void *data;
> +
> +    if (!process_bus_messages(c))
> +        goto fail;
> +
> +    sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
> +    if (!sample) {
> +        pa_log_warn("Could not get any more data");
> +        goto fail;
> +    }
> +
> +    buf = gst_sample_get_buffer(sample);
> +
> +    if (GST_BUFFER_IS_DISCONT(buf))
> +        pa_log_info("Discontinuity detected, possibly lost some packets");
> +
> +    if (!gst_buffer_map(buf, &info, GST_MAP_READ))
> +        goto fail;

It would be good to log something here.
On 21 April 2016 at 21:33, Tanu Kaskinen <tanuk@iki.fi> wrote:
> On Mon, 2016-02-29 at 15:46 +0530, arun@accosted.net wrote:
>>  module_rtp_send_la_SOURCES = modules/rtp/module-rtp-send.c
>>  module_rtp_send_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_rtp_send_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_send_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_send_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>>  module_rtp_recv_la_SOURCES = modules/rtp/module-rtp-recv.c
>>  module_rtp_recv_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_rtp_recv_la_LIBADD = $(MODULE_LIBADD) librtp.la
>> -module_rtp_recv_la_CFLAGS = $(AM_CFLAGS)
>> +module_rtp_recv_la_CFLAGS = $(AM_CFLAGS) $(GSTREAMER_CFLAGS)
>>
>>  # JACK
>>
>> @@ -2185,7 +2193,7 @@ module_bluez5_device_la_CFLAGS = $(AM_CFLAGS) $(SBC_CFLAGS)
>>  module_raop_sink_la_SOURCES = modules/raop/module-raop-sink.c
>>  module_raop_sink_la_LDFLAGS = $(MODULE_LDFLAGS)
>>  module_raop_sink_la_LIBADD = $(MODULE_LIBADD) librtp.la libraop.la
>> -module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp
>> +module_raop_sink_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/src/modules/rtp $(GSTREAMER_CFLAGS)
>
> Adding GSTREAMER_CFLAGS to the module CFLAGS seems unnecessary. Only
> librtp contains GStreamer code.
>
>> +typedef struct pa_rtp_context {
>
> The typedef is already done in rtp.h, no need to do it again.
>
>> +pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, const pa_sample_spec *ss) {
>> +    pa_rtp_context *c = NULL;
>> +    GError *error = NULL;
>> +
>> +    pa_assert(fd >= 0);
>> +
>> +    c = pa_xnew0(pa_rtp_context, 1);
>> +
>> +    c->fdsem = pa_fdsem_new();
>
> As far as I can tell, the fdsem is only needed when receiving data, not
> when sending.

Right, fixed.

>> +    c->ss = *ss;
>> +
>> +    if (!gst_init_check(NULL, NULL, &error)) {
>> +        pa_log_error("Could not initialise GStreamer: %s", error->message);
>> +        g_error_free(error);
>> +        goto fail;
>> +    }
>> +
>> +    if (!init_send_pipeline(c, fd, payload, mtu, ss))
>> +        goto fail;
>> +
>> +    return c;
>> +
>> +fail:
>> +    pa_xfree(c);
>
> You should call pa_rtp_context_free() to be sure that everything gets
> properly deinitialized. Now you're leaking the fdsem. The same comment
> applies to pa_rtp_context_new_recv() too.

Done.

>> +static bool process_bus_messages(pa_rtp_context *c) {
>> +    GstBus *bus;
>> +    GstMessage *message;
>> +    bool ret = true;
>> +
>> +    bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> +
>> +    while (ret && (message = gst_bus_pop(bus))) {
>> +        if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
>> +            GError *error = NULL;
>> +
>> +            ret = false;
>> +
>> +            gst_message_parse_error(message, &error, NULL);
>> +            pa_log("Got an error: %s", error->message);
>> +
>> +            g_error_free(error);
>> +
>> +            pa_fdsem_post(c->fdsem);
>
> What's the purpose of this? If I understand correctly, the fdsem is
> used to wake up the pulseaudio source IO thread when we receive data in
> a gstreamer thread, but this code runs in the IO thread, so this seems
> pointless.
>
> (By the way, it would be good to have comments in each function about
> which thread they run in.)

Added comments about this. And you're right, this one is not needed.

>
>> +int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
>> +    pa_memchunk chunk = { 0, };
>> +    GstBuffer *buf;
>> +    void *data;
>> +    bool stop = false;
>> +    int ret = 0;
>> +
>> +    pa_assert(c);
>> +    pa_assert(q);
>> +
>> +    if (!process_bus_messages(c))
>> +        return -1;
>> +
>> +    while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
>> +        pa_assert(chunk.memblock);
>> +
>> +        data = pa_memblock_acquire(chunk.memblock);
>> +
>> +        buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
>> +                                          data, chunk.length, chunk.index, chunk.length, chunk.memblock,
>> +                                          (GDestroyNotify) free_buffer);
>> +
>> +        if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
>> +            pa_log_error("Could not push buffer");
>> +            stop = true;
>> +            ret = -1;
>> +        }
>> +
>> +        pa_memblockq_drop(q, chunk.length);
>> +    }
>> +
>> +    return ret;
>> +}
>
> I wonder about the error handling in this function. module-rtp-send.c,
> which calls this function, doesn't care about the return value.
> Unloading the module would make sense to me, but if you don't do that,
> how do you ensure that we don't end up spamming errors to the log
> infinitely if the pipeline stops working?

I can add another patch on top of this to unload the module of send fails.

>> +static void on_pad_added(GstElement *element, GstPad *pad, gpointer userdata) {
>> +    pa_rtp_context *c = (pa_rtp_context *) userdata;
>> +    GstElement *depay;
>> +    GstPad *sinkpad;
>> +    GstPadLinkReturn ret;
>> +
>> +    depay = gst_bin_get_by_name(GST_BIN(c->pipeline), "depay");
>> +    pa_assert(depay);
>> +
>> +    sinkpad = gst_element_get_static_pad(depay, "sink");
>> +
>> +    ret = gst_pad_link(pad, sinkpad);
>> +    if (ret != GST_PAD_LINK_OK) {
>> +        GstBus *bus;
>> +        GError *error;
>> +
>> +        bus = gst_pipeline_get_bus(GST_PIPELINE(c->pipeline));
>> +        error = g_error_new(GST_CORE_ERROR, GST_CORE_ERROR_PAD, "Could not link rtpbin to depayloader");
>> +        gst_bus_post(bus, gst_message_new_error(GST_OBJECT(c->pipeline), error, NULL));
>
> It's not clear to me how the messages are dispatched. How does the sink
> IO thread get notified of this message? Messages are processed in
> pa_rtp_recv(), which is I think is called when we post to the fdsem,
> but we don't call pa_fdsem_post() at least in this function. Does the
> error produce an EOS event, which then will trigger a pa_fdsem_post()
> call?

No, we need to call pa_fdsem_post(). Will fix that.

>> +static bool init_receive_pipeline(pa_rtp_context *c, int fd, const pa_sample_spec *ss) {
>> +    GstElement *udpsrc = NULL, *rtpbin = NULL, *depay = NULL, *appsink = NULL;
>> +    GstCaps *caps;
>> +    GSocket *socket;
>> +    GError *error = NULL;
>> +
>> +    MAKE_ELEMENT(udpsrc, "udpsrc");
>> +    MAKE_ELEMENT(rtpbin, "rtpbin");
>> +    MAKE_ELEMENT_NAMED(depay, "rtpL16depay", "depay");
>> +    MAKE_ELEMENT(appsink, "appsink");
>> +
>> +    c->pipeline = gst_pipeline_new(NULL);
>> +
>> +    gst_bin_add_many(GST_BIN(c->pipeline), udpsrc, rtpbin, depay, appsink, NULL);
>> +
>> +    socket = g_socket_new_from_fd(fd, &error);
>> +    if (error) {
>> +        pa_log("Could not create socket: %s", error->message);
>> +        g_error_free(error);
>> +        goto fail;
>> +    }
>> +
>> +    caps = rtp_caps_from_sample_spec(ss);
>> +    if (!caps) {
>> +        pa_log("Unsupported format to payload");
>> +        goto fail;
>> +    }
>> +
>> +    g_object_set(udpsrc, "socket", socket, "caps", caps, "auto-multicast" /* caller handles this */, FALSE, NULL);
>> +    g_object_set(rtpbin, "latency", 0, "buffer-mode", 0 /* none */, NULL);
>> +    g_object_set(appsink, "sync", FALSE, "enable-last-sample", FALSE, NULL);
>> +
>> +    gst_caps_unref(caps);
>> +    g_object_unref(socket);
>> +
>> +    if (!gst_element_link_pads(udpsrc, "src", rtpbin, "recv_rtp_sink_0") ||
>> +        !gst_element_link(depay, appsink)) {
>> +
>> +        pa_log("Could not set up send pipeline");
>
> s/send/receive/
>
>> +int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
>> +    GstSample *sample = NULL;
>> +    GstBuffer *buf;
>> +    GstMapInfo info;
>> +    void *data;
>> +
>> +    if (!process_bus_messages(c))
>> +        goto fail;
>> +
>> +    sample = gst_app_sink_pull_sample(GST_APP_SINK(c->appsink));
>> +    if (!sample) {
>> +        pa_log_warn("Could not get any more data");
>> +        goto fail;
>> +    }
>> +
>> +    buf = gst_sample_get_buffer(sample);
>> +
>> +    if (GST_BUFFER_IS_DISCONT(buf))
>> +        pa_log_info("Discontinuity detected, possibly lost some packets");
>> +
>> +    if (!gst_buffer_map(buf, &info, GST_MAP_READ))
>> +        goto fail;
>
> It would be good to log something here.

Sure. This is just a sanity check though, and should never happen.

-- Arun