[v2] pipe-sink: auto-drain the pipe on sink resume

Submitted by Samo Poga─Źnik on Feb. 28, 2018, 5:57 p.m.

Details

Message ID 1519840678-13262-1-git-send-email-samo_pogacnik@t-2.net
State New
Headers show
Series "pipe-sink: auto-drain the pipe on sink resume" ( rev: 2 ) in PulseAudio

Not browsing as part of any series.

Commit Message

Samo Poga─Źnik Feb. 28, 2018, 5:57 p.m.
Added option auto_drain_pipe_on_resume to enable draining any remaining
data from the pipe upon every pipe-sink resume out of suspend.

When a pipe reader fails, the pipe sink fills up the pipe and starts
dropping instead of writing new data. Old data remains in the pipe to
be consumed by the eventually recovered or replaced reader. By each new
drop a gap between the pipe content and new data to be written grows.
If the sink suspends while dropping, resuming from suspend is going to
clear the pipe and start writing new data into an empty pipe, thus
removing the gap (old, potentially irrelevant data).

The change asures that every pipe-sink resume from suspend makes its
first write into an empty pipe.
---
 src/modules/module-pipe-sink.c | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

Patch hide | download patch | download mbox

diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c
index 995785e..10ce882 100644
--- a/src/modules/module-pipe-sink.c
+++ b/src/modules/module-pipe-sink.c
@@ -62,6 +62,7 @@  PA_MODULE_USAGE(
         "channels=<number of channels> "
         "channel_map=<channel map> "
         "use_system_clock_for_timing=<yes or no> "
+        "auto_drain_pipe_on_resume=<yes or no> "
 );
 
 #define DEFAULT_FILE_NAME "fifo_output"
@@ -82,16 +83,19 @@  struct userdata {
     size_t buffer_size;
     size_t bytes_dropped;
     bool fifo_error;
+    uint8_t *drain_buffer;
 
     pa_memchunk memchunk;
 
     pa_rtpoll_item *rtpoll_item;
 
+    int read_type;
     int write_type;
     pa_usec_t block_usec;
     pa_usec_t timestamp;
 
     bool use_system_clock_for_timing;
+    bool auto_drain_pipe_on_resume;
 };
 
 static const char* const valid_modargs[] = {
@@ -103,17 +107,39 @@  static const char* const valid_modargs[] = {
     "channels",
     "channel_map",
     "use_system_clock_for_timing",
+    "auto_drain_pipe_on_resume",
     NULL
 };
 
+static ssize_t drain_pipe_sink(struct userdata *u) {
+    ssize_t l, drained = 0;
+
+    pa_assert(u);
+
+    do {
+        l = pa_read(u->fd, u->drain_buffer, u->buffer_size, &u->read_type);
+
+        if (l > 0)
+            drained += l;
+
+    } while ((l > 0) || (errno == EINTR));
+
+    return drained;
+}
+
 static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
     struct userdata *u = PA_SINK(o)->userdata;
 
     switch (code) {
         case PA_SINK_MESSAGE_SET_STATE:
             if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) {
-                if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data)))
+                if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) {
                     u->timestamp = pa_rtclock_now();
+                    if (u->auto_drain_pipe_on_resume) {
+                        ssize_t d = drain_pipe_sink(u);
+                        pa_log_debug("Pipe-sink resume from suspend: auto-drained %zd bytes from the pipe", d);
+                    }
+                }
             } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) {
                 if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) {
                     /* Clear potential FIFO error flag */
@@ -443,11 +469,17 @@  int pa__init(pa_module *m) {
         goto fail;
     }
 
+    if (pa_modargs_get_value_boolean(ma, "auto_drain_pipe_on_resume", &u->auto_drain_pipe_on_resume) < 0) {
+        pa_log("Failed to parse auto_drain_pipe_on_resume argument.");
+        goto fail;
+    }
+
     if (pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll) < 0) {
         pa_log("pa_thread_mq_init() failed.");
         goto fail;
     }
 
+    u->read_type = 0;
     u->write_type = 0;
 
     u->filename = pa_runtime_path(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME));
@@ -525,6 +557,9 @@  int pa__init(pa_module *m) {
     }
     pa_sink_set_max_request(u->sink, u->buffer_size);
 
+    if (u->auto_drain_pipe_on_resume)
+        u->drain_buffer = (uint8_t *)pa_xmalloc(u->buffer_size);
+
     u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1);
     pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
     pollfd->fd = u->fd;