[pulseaudio-discuss,5/8] rtp: Hide RTP implementation details from module-rtp-*

Submitted by Arun Raghavan on Feb. 29, 2016, 10:16 a.m.

Details

Message ID 1456740996-13908-6-git-send-email-arun@accosted.net
State New
Headers show
Series "Add GStreamer-based RTP support" ( rev: 1 ) in PulseAudio

Not browsing as part of any series.

Commit Message

Arun Raghavan Feb. 29, 2016, 10:16 a.m.
From: Arun Raghavan <git@arunraghavan.net>

This moves RTP implementation-specific information out of
module-rtp-send/recv. This is basically done by making the
pa_rtp_context structure opaque from the perspective of these modules.
We can then potentially replace the underlying RTP implementation with
something else transparently.

One RTP detail that does "leak" is the RTP timestamp. We provide this to
module-rtp-recv so that it can perform rate adjustments to match the
sender rate.
---
 src/modules/rtp/module-rtp-recv.c | 40 ++++++++-------------
 src/modules/rtp/module-rtp-send.c | 11 +++---
 src/modules/rtp/rtp.c             | 74 +++++++++++++++++++++++++++++++++------
 src/modules/rtp/rtp.h             | 22 +++++-------
 4 files changed, 91 insertions(+), 56 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index 640f672..fd25e08 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -92,12 +92,11 @@  struct session {
     pa_memblockq *memblockq;
 
     bool first_packet;
-    uint32_t ssrc;
     uint32_t offset;
 
     struct pa_sdp_info sdp_info;
 
-    pa_rtp_context rtp_context;
+    pa_rtp_context *rtp_context;
 
     pa_rtpoll_item *rtpoll_item;
 
@@ -207,6 +206,7 @@  static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
 /* Called from I/O thread context */
 static int rtpoll_work_cb(pa_rtpoll_item *i) {
     pa_memchunk chunk;
+    uint32_t timestamp;
     int64_t k, j, delta;
     struct timeval now = { 0, 0 };
     struct session *s;
@@ -226,37 +226,30 @@  static int rtpoll_work_cb(pa_rtpoll_item *i) {
 
     p->revents = 0;
 
-    if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
+    if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
         return 0;
 
-    if (s->sdp_info.payload != s->rtp_context.payload ||
-        !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
+    if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
         pa_memblock_unref(chunk.memblock);
         return 0;
     }
 
     if (!s->first_packet) {
         s->first_packet = true;
-
-        s->ssrc = s->rtp_context.ssrc;
-        s->offset = s->rtp_context.timestamp;
-    } else {
-        if (s->ssrc != s->rtp_context.ssrc) {
-            pa_memblock_unref(chunk.memblock);
-            return 0;
-        }
+        s->offset = timestamp;
     }
 
     /* Check whether there was a timestamp overflow */
-    k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
-    j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
+    k = (int64_t) timestamp - (int64_t) s->offset;
+    j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
 
     if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
         delta = k;
     else
         delta = j;
 
-    pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
+    pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
+            true);
 
     if (now.tv_sec == 0) {
         PA_ONCE_BEGIN {
@@ -276,7 +269,7 @@  static int rtpoll_work_cb(pa_rtpoll_item *i) {
     pa_memblock_unref(chunk.memblock);
 
     /* The next timestamp we expect */
-    s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
+    s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
 
     pa_atomic_store(&s->timestamp, (int) now.tv_sec);
 
@@ -385,18 +378,12 @@  static int rtpoll_work_cb(pa_rtpoll_item *i) {
 /* Called from I/O thread context */
 static void sink_input_attach(pa_sink_input *i) {
     struct session *s;
-    struct pollfd *p;
 
     pa_sink_input_assert_ref(i);
     pa_assert_se(s = i->userdata);
 
     pa_assert(!s->rtpoll_item);
-    s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
-
-    p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
-    p->fd = s->rtp_context.fd;
-    p->events = POLLIN;
-    p->revents = 0;
+    s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
 
     pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
     pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
@@ -584,7 +571,8 @@  static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
 
     pa_memblock_unref(silence.memblock);
 
-    pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
+    if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, pa_frame_size(&s->sdp_info.sample_spec))))
+        goto fail;
 
     pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
     u->n_sessions++;
@@ -619,7 +607,7 @@  static void session_free(struct session *s) {
 
     pa_memblockq_free(s->memblockq);
     pa_sdp_info_destroy(&s->sdp_info);
-    pa_rtp_context_destroy(&s->rtp_context);
+    pa_rtp_context_destroy(s->rtp_context);
 
     pa_xfree(s);
 }
diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c
index c82adc5..6110455 100644
--- a/src/modules/rtp/module-rtp-send.c
+++ b/src/modules/rtp/module-rtp-send.c
@@ -107,7 +107,7 @@  struct userdata {
     pa_source_output *source_output;
     pa_memblockq *memblockq;
 
-    pa_rtp_context rtp_context;
+    pa_rtp_context *rtp_context;
     pa_sap_context sap_context;
 
     pa_time_event *sap_event;
@@ -143,7 +143,7 @@  static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
         return;
     }
 
-    pa_rtp_send(&u->rtp_context, u->memblockq);
+    pa_rtp_send(u->rtp_context, u->memblockq);
 }
 
 static pa_source_output_flags_t get_dont_inhibit_auto_suspend_flag(pa_source *source,
@@ -486,11 +486,12 @@  int pa__init(pa_module*m) {
 
     pa_xfree(n);
 
-    if (!pa_rtp_context_init_send(&u->rtp_context, fd, payload, mtu, pa_frame_size(&ss)))
+    if (!(u->rtp_context = pa_rtp_context_new_send(fd, payload, mtu, pa_frame_size(&ss))))
         goto fail;
     pa_sap_context_init_send(&u->sap_context, sap_fd, p);
 
-    pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, SSRC=0x%08x, payload=%u, initial sequence #%u", mtu, dst_addr, port, src_addr, ttl, u->rtp_context.ssrc, payload, u->rtp_context.sequence);
+    pa_log_info("RTP stream initialized with mtu %u on %s:%u from %s ttl=%u, payload=%u",
+            mtu, dst_addr, port, src_addr, ttl, payload);
     pa_log_info("SDP-Data:\n%s\nEOF", p);
 
     pa_sap_send(&u->sap_context, 0);
@@ -537,7 +538,7 @@  void pa__done(pa_module*m) {
         pa_source_output_unref(u->source_output);
     }
 
-    pa_rtp_context_destroy(&u->rtp_context);
+    pa_rtp_context_destroy(u->rtp_context);
 
     pa_sap_send(&u->sap_context, 1);
     pa_sap_context_destroy(&u->sap_context);
diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c
index 8da658a..e372cbd 100644
--- a/src/modules/rtp/rtp.c
+++ b/src/modules/rtp/rtp.c
@@ -40,13 +40,29 @@ 
 #include <pulsecore/macro.h>
 #include <pulsecore/core-util.h>
 #include <pulsecore/arpa-inet.h>
+#include <pulsecore/poll.h>
 
 #include "rtp.h"
 
-bool pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size) {
-    pa_assert(c);
+typedef struct pa_rtp_context {
+    int fd;
+    uint16_t sequence;
+    uint32_t timestamp;
+    uint32_t ssrc;
+    uint8_t payload;
+    size_t frame_size;
+    size_t mtu;
+
+    pa_memchunk memchunk;
+} pa_rtp_context;
+
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size) {
+    pa_rtp_context *c;
+
     pa_assert(fd >= 0);
 
+    c = pa_xnew0(pa_rtp_context, 1);
+
     c->fd = fd;
     c->sequence = (uint16_t) (rand()*rand());
     c->timestamp = 0;
@@ -57,7 +73,7 @@  bool pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t
 
     pa_memchunk_reset(&c->memchunk);
 
-    return true;
+    return c;
 }
 
 #define MAX_IOVECS 16
@@ -149,22 +165,28 @@  int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
     return 0;
 }
 
-pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size) {
-    pa_assert(c);
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size) {
+    pa_rtp_context *c;
+
+    c = pa_xnew0(pa_rtp_context, 1);
 
     c->fd = fd;
+    c->payload = payload;
     c->frame_size = frame_size;
 
     pa_memchunk_reset(&c->memchunk);
+
     return c;
 }
 
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp) {
     int size;
     struct msghdr m;
     struct cmsghdr *cm;
     struct iovec iov;
     uint32_t header;
+    uint32_t ssrc;
+    uint8_t payload;
     unsigned cc;
     ssize_t r;
     uint8_t aux[1024];
@@ -249,12 +271,12 @@  int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct
     }
 
     memcpy(&header, iov.iov_base, sizeof(uint32_t));
-    memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
-    memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
+    memcpy(rtp_tstamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
+    memcpy(&ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
 
     header = ntohl(header);
-    c->timestamp = ntohl(c->timestamp);
-    c->ssrc = ntohl(c->ssrc);
+    *rtp_tstamp = ntohl(*rtp_tstamp);
+    ssrc = ntohl(c->ssrc);
 
     if ((header >> 30) != 2) {
         pa_log_warn("Unsupported RTP version.");
@@ -271,10 +293,20 @@  int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct
         goto fail;
     }
 
+    if (ssrc != c->ssrc) {
+        pa_log_debug("Got unexpected SSRC");
+        goto fail;
+    }
+
     cc = (header >> 24) & 0xF;
-    c->payload = (uint8_t) ((header >> 16) & 127U);
+    payload = (uint8_t) ((header >> 16) & 127U);
     c->sequence = (uint16_t) (header & 0xFFFFU);
 
+    if (payload != c->payload) {
+        pa_log_debug("Got unexpected payload: %u", payload);
+        goto fail;
+    }
+
     if (12 + cc*4 > (unsigned) size) {
         pa_log_warn("RTP packet too short. (CSRC)");
         goto fail;
@@ -377,6 +409,8 @@  void pa_rtp_context_destroy(pa_rtp_context *c) {
 
     if (c->memchunk.memblock)
         pa_memblock_unref(c->memchunk.memblock);
+
+    pa_xfree(c);
 }
 
 const char* pa_rtp_format_to_string(pa_sample_format_t f) {
@@ -396,3 +430,21 @@  pa_sample_format_t pa_rtp_string_to_format(const char *s) {
     else
         return PA_SAMPLE_INVALID;
 }
+
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c) {
+    return c->frame_size;
+}
+
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll) {
+    pa_rtpoll_item *item;
+    struct pollfd *p;
+
+    item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_LATE, 1);
+
+    p = pa_rtpoll_item_get_pollfd(item, NULL);
+    p->fd = c->fd;
+    p->events = POLLIN;
+    p->revents = 0;
+
+    return item;
+}
diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h
index 350f968..1ddc794 100644
--- a/src/modules/rtp/rtp.h
+++ b/src/modules/rtp/rtp.h
@@ -25,30 +25,24 @@ 
 #include <sys/types.h>
 #include <pulsecore/memblockq.h>
 #include <pulsecore/memchunk.h>
+#include <pulsecore/rtpoll.h>
 
-typedef struct pa_rtp_context {
-    int fd;
-    uint16_t sequence;
-    uint32_t timestamp;
-    uint32_t ssrc;
-    uint8_t payload;
-    size_t frame_size;
-    size_t mtu;
+typedef struct pa_rtp_context pa_rtp_context;
 
-    pa_memchunk memchunk;
-} pa_rtp_context;
-
-bool pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint8_t payload, size_t mtu, size_t frame_size);
+pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, size_t frame_size);
 
 /* If the memblockq doesn't have a silence memchunk set, then the caller must
  * guarantee that the current read index doesn't point to a hole. */
 int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q);
 
-pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, struct timeval *tstamp);
+pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, size_t frame_size);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool, uint32_t *rtp_tstamp, struct timeval *tstamp);
 
 void pa_rtp_context_destroy(pa_rtp_context *c);
 
+size_t pa_rtp_context_get_frame_size(pa_rtp_context *c);
+pa_rtpoll_item* pa_rtp_context_get_rtpoll_item(pa_rtp_context *c, pa_rtpoll *rtpoll);
+
 pa_sample_spec* pa_rtp_sample_spec_fixup(pa_sample_spec *ss);
 int pa_rtp_sample_spec_valid(const pa_sample_spec *ss);
 

Comments

On Mon, 2016-02-29 at 15:46 +0530, arun@accosted.net wrote:
> -    pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
> +    pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
> +            true);

The context's frame size is the same as
pa_frame_size(&s->sdp_info.sample_spec), so it's not really necessary
to query it from the context. To avoid the pa_frame_size() call too,
I'd add a frame_size variable to the session struct. (This is not a big
deal, though; I'm ok with the code as it is.)

> -    pa_rtp_context_destroy(&s->rtp_context);
> +    pa_rtp_context_destroy(s->rtp_context);

You renamed pa_rtp_context_init() to pa_rtp_context_new(). I think it
would be good to also rename pa_rtp_context_destroy() to
pa_rtp_context_free().