[6/8] tunnel*: redo 'put sink/source after auth..'

Submitted by Yclept Nemo on July 17, 2018, 12:25 a.m.

Details

Message ID 20180717002544.5183-7-orbisvicis@gmail.com
State New
Headers show
Series "*** Overview ***" ( rev: 1 ) in PulseAudio

Not browsing as part of any series.

Commit Message

Yclept Nemo July 17, 2018, 12:25 a.m.
Turns out sinks and sources must be 'put' right after they've been
created otherwise they'll receive message whose handlers assert 'put'.
---
 src/modules/module-tunnel-sink-new.c   | 283 ++++++++++++++++----------
 src/modules/module-tunnel-source-new.c | 355 +++++++++++++++++++--------------
 src/modules/module-tunnel.c            | 190 +++++++++++-------
 3 files changed, 499 insertions(+), 329 deletions(-)

Patch hide | download patch | download mbox

diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index fd24e690..e25e3931 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -60,16 +60,23 @@  PA_MODULE_USAGE(
 #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
 
-enum {
-    SINK_MESSAGE_PUT = PA_SINK_MESSAGE_MAX,
-};
-
 static void stream_state_cb(pa_stream *stream, void *userdata);
 static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
 static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
 static void context_state_cb(pa_context *c, void *userdata);
 static void sink_update_requested_latency_cb(pa_sink *s);
 
+enum {
+    TUNNEL_MESSAGE_CREATE_SINK,
+    TUNNEL_MESSAGE_CREATE_STREAM,
+};
+
+typedef struct {
+    pa_msgobject parent;
+} tunnel_msg;
+
+PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
+
 struct userdata {
     pa_module *module;
     pa_sink *sink;
@@ -85,12 +92,20 @@  struct userdata {
     bool update_stream_bufferattr_after_connect;
 
     bool connected;
+    bool created;
 
     char *cookie_file;
     char *remote_server;
     char *remote_sink_name;
+
+    tunnel_msg *msg;
+
+    pa_sink_new_data *sink_data;
 };
 
+static int stream_create(struct userdata *u);
+static int sink_create(struct userdata *u);
+
 static const char* const valid_modargs[] = {
     "sink_name",
     "sink_properties",
@@ -144,6 +159,42 @@  static pa_proplist* tunnel_new_proplist(struct userdata *u) {
     return proplist;
 }
 
+static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+    struct userdata *u = data;
+
+    pa_assert(u);
+
+    switch (code) {
+
+        /* Delivered from the IO thread, handled in the main thread. */
+        case TUNNEL_MESSAGE_CREATE_SINK:
+            pa_log_debug("Creating sink.");
+            if (sink_create(u) < 0) {
+                pa_module_unload_request(u->module, true);
+                return -1;
+            }
+            pa_sink_new_data_done(u->sink_data);
+            pa_xfree(u->sink_data);
+            u->sink_data = NULL;
+            pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL);
+            break;
+
+        /* Delivered from the main thread, handled in the IO thread. */
+        case TUNNEL_MESSAGE_CREATE_STREAM:
+            u->created = true;
+            pa_log_debug("Creating stream.");
+            if (stream_create(u) < 0) {
+                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+                return -1;
+            }
+            u->connected = true;
+            break;
+
+    }
+
+    return 0;
+}
+
 static void thread_func(void *userdata) {
     struct userdata *u = userdata;
     pa_proplist *proplist;
@@ -173,7 +224,7 @@  static void thread_func(void *userdata) {
                            u->remote_server,
                            PA_CONTEXT_NOAUTOSPAWN,
                            NULL) < 0) {
-        pa_log("Failed to connect libpulse context");
+        pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
         goto fail;
     }
 
@@ -187,7 +238,7 @@  static void thread_func(void *userdata) {
                 goto fail;
         }
 
-        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+        if (PA_UNLIKELY(u->created && u->sink->thread_info.rewind_requested))
             pa_sink_process_rewind(u->sink, 0);
 
         if (u->connected &&
@@ -244,6 +295,34 @@  finish:
     pa_log_debug("Thread shutting down");
 }
 
+static void context_state_cb(pa_context *c, void *userdata) {
+    struct userdata *u = userdata;
+    pa_assert(u);
+
+    switch (pa_context_get_state(c)) {
+        case PA_CONTEXT_UNCONNECTED:
+        case PA_CONTEXT_CONNECTING:
+        case PA_CONTEXT_AUTHORIZING:
+        case PA_CONTEXT_SETTING_NAME:
+            break;
+        case PA_CONTEXT_READY: {
+            pa_log_debug("Context successfully connected.");
+            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK, u, 0, NULL, NULL);
+            break;
+        }
+        case PA_CONTEXT_FAILED:
+            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
+            u->connected = false;
+            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            break;
+        case PA_CONTEXT_TERMINATED:
+            pa_log_debug("Context terminated.");
+            u->connected = false;
+            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            break;
+    }
+}
+
 static void stream_state_cb(pa_stream *stream, void *userdata) {
     struct userdata *u = userdata;
 
@@ -251,7 +330,7 @@  static void stream_state_cb(pa_stream *stream, void *userdata) {
 
     switch (pa_stream_get_state(stream)) {
         case PA_STREAM_FAILED:
-            pa_log_error("Stream failed.");
+            pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
             u->connected = false;
             u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
             break;
@@ -291,78 +370,53 @@  static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *user
     stream_changed_buffer_attr_cb(stream, userdata);
 }
 
-static void context_state_cb(pa_context *c, void *userdata) {
-    struct userdata *u = userdata;
-    pa_assert(u);
+/* Handled in the IO thread. */
+static int stream_create(struct userdata *u) {
+    pa_proplist *proplist;
+    pa_buffer_attr bufferattr;
+    pa_usec_t requested_latency;
+    char *username = pa_get_user_name_malloc();
+    char *hostname = pa_get_host_name_malloc();
+    /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
+    char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
+    pa_xfree(hostname);
+    pa_xfree(username);
 
-    switch (pa_context_get_state(c)) {
-        case PA_CONTEXT_UNCONNECTED:
-        case PA_CONTEXT_CONNECTING:
-        case PA_CONTEXT_AUTHORIZING:
-        case PA_CONTEXT_SETTING_NAME:
-            break;
-        case PA_CONTEXT_READY: {
-            pa_proplist *proplist;
-            pa_buffer_attr bufferattr;
-            pa_usec_t requested_latency;
-            char *username = pa_get_user_name_malloc();
-            char *hostname = pa_get_host_name_malloc();
-            /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis@lazus' */
-            char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
-            pa_xfree(hostname);
-            pa_xfree(username);
-
-            pa_log_debug("Connection successful. Creating stream.");
-            pa_assert(!u->stream);
-
-            proplist = tunnel_new_proplist(u);
-            u->stream = pa_stream_new_with_proplist(u->context,
-                                                    stream_name,
-                                                    &u->sink->sample_spec,
-                                                    &u->sink->channel_map,
-                                                    proplist);
-            pa_proplist_free(proplist);
-            pa_xfree(stream_name);
+    pa_assert(!u->stream);
 
-            if (!u->stream) {
-                pa_log_error("Could not create a stream.");
-                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-                return;
-            }
+    proplist = tunnel_new_proplist(u);
+    u->stream = pa_stream_new_with_proplist(u->context,
+                                            stream_name,
+                                            &u->sink->sample_spec,
+                                            &u->sink->channel_map,
+                                            proplist);
+    pa_proplist_free(proplist);
+    pa_xfree(stream_name);
 
-            requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
-            if (requested_latency == (pa_usec_t) -1)
-                requested_latency = u->sink->thread_info.max_latency;
-
-            reset_bufferattr(&bufferattr);
-            bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
-
-            pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
-            pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, userdata);
-            if (pa_stream_connect_playback(u->stream,
-                                           u->remote_sink_name,
-                                           &bufferattr,
-                                           PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,
-                                           NULL,
-                                           NULL) < 0) {
-                pa_log_error("Could not connect stream.");
-                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            }
-            u->connected = true;
-            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL, NULL);
-            break;
-        }
-        case PA_CONTEXT_FAILED:
-            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
-            u->connected = false;
-            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            break;
-        case PA_CONTEXT_TERMINATED:
-            pa_log_debug("Context terminated.");
-            u->connected = false;
-            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            break;
+    if (!u->stream) {
+        pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
+        return -1;
+    }
+
+    requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
+    if (requested_latency == (pa_usec_t) -1)
+        requested_latency = u->sink->thread_info.max_latency;
+
+    reset_bufferattr(&bufferattr);
+    bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
+
+    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
+    pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
+    if (pa_stream_connect_playback(u->stream,
+                                   u->remote_sink_name,
+                                   &bufferattr,
+                                   PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,
+                                   NULL,
+                                   NULL) < 0) {
+        pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context)));
+        return -1;
     }
+    return 0;
 }
 
 static void sink_update_requested_latency_cb(pa_sink *s) {
@@ -435,13 +489,6 @@  static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
             *((int64_t*) data) = remote_latency;
             return 0;
         }
-
-        /* Delivered from the IO thread, handled in the main thread. */
-        case SINK_MESSAGE_PUT: {
-            if (u->connected)
-                pa_sink_put(u->sink);
-            return 0;
-        }
     }
     return pa_sink_process_msg(o, code, data, offset, chunk);
 }
@@ -480,10 +527,31 @@  static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state,
     return 0;
 }
 
+/* Handled in the main thread. */
+static int sink_create(struct userdata *u) {
+    if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
+        pa_log("Failed to create sink.");
+        return -1;
+    }
+
+    u->sink->userdata = u;
+    u->sink->parent.process_msg = sink_process_msg_cb;
+    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
+    u->sink->update_requested_latency = sink_update_requested_latency_cb;
+    pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
+
+    /* set thread message queue */
+    pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
+    pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+    pa_sink_put(u->sink);
+
+    return 0;
+}
+
 int pa__init(pa_module *m) {
     struct userdata *u = NULL;
     pa_modargs *ma = NULL;
-    pa_sink_new_data sink_data;
     pa_sample_spec ss;
     pa_channel_map map;
     const char *remote_server = NULL;
@@ -539,47 +607,36 @@  int pa__init(pa_module *m) {
      * with module-tunnel-sink-new. */
     u->rtpoll = pa_rtpoll_new();
 
-    /* Create sink */
-    pa_sink_new_data_init(&sink_data);
-    sink_data.driver = __FILE__;
-    sink_data.module = m;
+    /* Create sink data */
+    u->sink_data = pa_xnew(pa_sink_new_data, 1);
+    pa_sink_new_data_init(u->sink_data);
+    u->sink_data->driver = __FILE__;
+    u->sink_data->module = m;
 
     default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
     sink_name = pa_modargs_get_value(ma, "sink_name", default_sink_name);
 
-    pa_sink_new_data_set_name(&sink_data, sink_name);
-    pa_sink_new_data_set_sample_spec(&sink_data, &ss);
-    pa_sink_new_data_set_channel_map(&sink_data, &map);
+    pa_sink_new_data_set_name(u->sink_data, sink_name);
+    pa_sink_new_data_set_sample_spec(u->sink_data, &ss);
+    pa_sink_new_data_set_channel_map(u->sink_data, &map);
 
-    pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
-    pa_proplist_setf(sink_data.proplist,
+    pa_proplist_sets(u->sink_data->proplist, PA_PROP_DEVICE_CLASS, "sound");
+    pa_proplist_setf(u->sink_data->proplist,
                      PA_PROP_DEVICE_DESCRIPTION,
                      _("Tunnel to %s/%s"),
                      remote_server,
                      pa_strempty(u->remote_sink_name));
 
-    if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
+    if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) {
         pa_log("Invalid properties");
-        pa_sink_new_data_done(&sink_data);
-        goto fail;
-    }
-    if (!(u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
-        pa_log("Failed to create sink.");
-        pa_sink_new_data_done(&sink_data);
         goto fail;
     }
 
-    pa_sink_new_data_done(&sink_data);
-    u->sink->userdata = u;
-    u->sink->parent.process_msg = sink_process_msg_cb;
-    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
-    u->sink->update_requested_latency = sink_update_requested_latency_cb;
-    pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
-
-    /* set thread message queue */
-    pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
-    pa_sink_set_rtpoll(u->sink, u->rtpoll);
+    /* Setup initial message handler */
+    u->msg = pa_msgobject_new(tunnel_msg);
+    u->msg->parent.process_msg = tunnel_process_msg_cb;
 
+    /* start IO thread */
     if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
@@ -635,6 +692,14 @@  void pa__done(pa_module *m) {
     if (u->remote_server)
         pa_xfree(u->remote_server);
 
+    if (u->msg)
+        pa_xfree(u->msg);
+
+    if (u->sink_data) {
+        pa_sink_new_data_done(u->sink_data);
+        pa_xfree(u->sink_data);
+    }
+
     if (u->sink)
         pa_sink_unref(u->sink);
 
diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index bff2f6a1..df9c1757 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -59,15 +59,22 @@  PA_MODULE_USAGE(
 
 #define TUNNEL_THREAD_FAILED_MAINLOOP 1
 
-enum {
-    SOURCE_MESSAGE_PUT = PA_SOURCE_MESSAGE_MAX,
-};
-
 static void stream_state_cb(pa_stream *stream, void *userdata);
 static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
 static void context_state_cb(pa_context *c, void *userdata);
 static void source_update_requested_latency_cb(pa_source *s);
 
+enum {
+    TUNNEL_MESSAGE_CREATE_SOURCE,
+    TUNNEL_MESSAGE_CREATE_STREAM,
+};
+
+typedef struct {
+    pa_msgobject parent;
+} tunnel_msg;
+
+PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
+
 struct userdata {
     pa_module *module;
     pa_source *source;
@@ -87,8 +94,16 @@  struct userdata {
     char *cookie_file;
     char *remote_server;
     char *remote_source_name;
+
+    tunnel_msg *msg;
+
+    pa_source_new_data *source_data;
 };
 
+static int stream_create(struct userdata *u);
+static int source_create(struct userdata *u);
+static void read_new_samples(struct userdata *u);
+
 static const char* const valid_modargs[] = {
     "source_name",
     "source_properties",
@@ -133,63 +148,39 @@  static pa_proplist* tunnel_new_proplist(struct userdata *u) {
     return proplist;
 }
 
-static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
-    struct userdata *u = userdata;
-    u->new_data = true;
-}
-
-/* called from io context to read samples from the stream into our source */
-static void read_new_samples(struct userdata *u) {
-    const void *p;
-    size_t readable = 0;
-    pa_memchunk memchunk;
+static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+    struct userdata *u = data;
 
     pa_assert(u);
-    u->new_data = false;
-
-    pa_memchunk_reset(&memchunk);
-
-    if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
-        return;
 
-    readable = pa_stream_readable_size(u->stream);
-    while (readable > 0) {
-        size_t nbytes = 0;
-        if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
-            pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
-            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            return;
-        }
-
-        if (PA_LIKELY(p)) {
-            /* we have valid data */
-            memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
-            memchunk.length = nbytes;
-            memchunk.index = 0;
-
-            pa_source_post(u->source, &memchunk);
-            pa_memblock_unref_fixed(memchunk.memblock);
-        } else {
-            size_t bytes_to_generate = nbytes;
-
-            /* we have a hole. generate silence */
-            memchunk = u->source->silence;
-            pa_memblock_ref(memchunk.memblock);
-
-            while (bytes_to_generate > 0) {
-                if (bytes_to_generate < memchunk.length)
-                    memchunk.length = bytes_to_generate;
+    switch (code) {
 
-                pa_source_post(u->source, &memchunk);
-                bytes_to_generate -= memchunk.length;
+        /* Delivered from the IO thread, handled in the main thread. */
+        case TUNNEL_MESSAGE_CREATE_SOURCE:
+            pa_log_debug("Creating source.");
+            if (source_create(u) < 0) {
+                pa_module_unload_request(u->module, true);
+                return -1;
             }
+            pa_source_new_data_done(u->source_data);
+            pa_xfree(u->source_data);
+            u->source_data = NULL;
+            pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL);
+            break;
 
-            pa_memblock_unref(memchunk.memblock);
-        }
+        /* Delivered from the main thread, handled in the IO thread. */
+        case TUNNEL_MESSAGE_CREATE_STREAM:
+            pa_log_debug("Creating stream.");
+            if (stream_create(u) < 0) {
+                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+                return -1;
+            }
+            u->connected = true;
+            break;
 
-        pa_stream_drop(u->stream);
-        readable -= nbytes;
     }
+
+    return 0;
 }
 
 static void thread_func(void *userdata) {
@@ -259,6 +250,34 @@  finish:
     pa_log_debug("Thread shutting down");
 }
 
+static void context_state_cb(pa_context *c, void *userdata) {
+    struct userdata *u = userdata;
+    pa_assert(u);
+
+    switch (pa_context_get_state(c)) {
+        case PA_CONTEXT_UNCONNECTED:
+        case PA_CONTEXT_CONNECTING:
+        case PA_CONTEXT_AUTHORIZING:
+        case PA_CONTEXT_SETTING_NAME:
+            break;
+        case PA_CONTEXT_READY: {
+            pa_log_debug("Context successfully connected.");
+            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE, u, 0, NULL, NULL);
+            break;
+        }
+        case PA_CONTEXT_FAILED:
+            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
+            u->connected = false;
+            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            break;
+        case PA_CONTEXT_TERMINATED:
+            pa_log_debug("Context terminated.");
+            u->connected = false;
+            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            break;
+    }
+}
+
 static void stream_state_cb(pa_stream *stream, void *userdata) {
     struct userdata *u = userdata;
 
@@ -289,76 +308,110 @@  static void stream_state_cb(pa_stream *stream, void *userdata) {
     }
 }
 
-static void context_state_cb(pa_context *c, void *userdata) {
+static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
     struct userdata *u = userdata;
+    u->new_data = true;
+}
+
+/* called from io context to read samples from the stream into our source */
+static void read_new_samples(struct userdata *u) {
+    const void *p;
+    size_t readable = 0;
+    pa_memchunk memchunk;
+
     pa_assert(u);
+    u->new_data = false;
 
-    switch (pa_context_get_state(c)) {
-        case PA_CONTEXT_UNCONNECTED:
-        case PA_CONTEXT_CONNECTING:
-        case PA_CONTEXT_AUTHORIZING:
-        case PA_CONTEXT_SETTING_NAME:
-            break;
-        case PA_CONTEXT_READY: {
-            pa_proplist *proplist;
-            pa_buffer_attr bufferattr;
-            pa_usec_t requested_latency;
-            char *username = pa_get_user_name_malloc();
-            char *hostname = pa_get_host_name_malloc();
-            /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
-            char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
-            pa_xfree(username);
-            pa_xfree(hostname);
-
-            pa_log_debug("Connection successful. Creating stream.");
-            pa_assert(!u->stream);
-
-            proplist = tunnel_new_proplist(u);
-            u->stream = pa_stream_new_with_proplist(u->context,
-                                                    stream_name,
-                                                    &u->source->sample_spec,
-                                                    &u->source->channel_map,
-                                                    proplist);
-            pa_proplist_free(proplist);
-            pa_xfree(stream_name);
+    pa_memchunk_reset(&memchunk);
 
-            if (!u->stream) {
-                pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
-                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-                return;
-            }
+    if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
+        return;
 
-            requested_latency = pa_source_get_requested_latency_within_thread(u->source);
-            if (requested_latency == (uint32_t) -1)
-                requested_latency = u->source->thread_info.max_latency;
+    readable = pa_stream_readable_size(u->stream);
+    while (readable > 0) {
+        size_t nbytes = 0;
+        if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
+            pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
+            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            return;
+        }
 
-            reset_bufferattr(&bufferattr);
-            bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
+        if (PA_LIKELY(p)) {
+            /* we have valid data */
+            memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
+            memchunk.length = nbytes;
+            memchunk.index = 0;
 
-            pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
-            pa_stream_set_read_callback(u->stream, stream_read_cb, userdata);
-            if (pa_stream_connect_record(u->stream,
-                                         u->remote_source_name,
-                                         &bufferattr,
-                                         PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED) < 0) {
-                pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
-                u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+            pa_source_post(u->source, &memchunk);
+            pa_memblock_unref_fixed(memchunk.memblock);
+        } else {
+            size_t bytes_to_generate = nbytes;
+
+            /* we have a hole. generate silence */
+            memchunk = u->source->silence;
+            pa_memblock_ref(memchunk.memblock);
+
+            while (bytes_to_generate > 0) {
+                if (bytes_to_generate < memchunk.length)
+                    memchunk.length = bytes_to_generate;
+
+                pa_source_post(u->source, &memchunk);
+                bytes_to_generate -= memchunk.length;
             }
-            u->connected = true;
-            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL, NULL);
-            break;
+
+            pa_memblock_unref(memchunk.memblock);
         }
-        case PA_CONTEXT_FAILED:
-            pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
-            u->connected = false;
-            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            break;
-        case PA_CONTEXT_TERMINATED:
-            pa_log_debug("Context terminated.");
-            u->connected = false;
-            u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
-            break;
+
+        pa_stream_drop(u->stream);
+        readable -= nbytes;
+    }
+}
+
+/* Handled in the IO thread. */
+static int stream_create(struct userdata *u) {
+    pa_proplist *proplist;
+    pa_buffer_attr bufferattr;
+    pa_usec_t requested_latency;
+    char *username = pa_get_user_name_malloc();
+    char *hostname = pa_get_host_name_malloc();
+    /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis@lazus' */
+    char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
+    pa_xfree(username);
+    pa_xfree(hostname);
+
+    pa_assert(!u->stream);
+
+    proplist = tunnel_new_proplist(u);
+    u->stream = pa_stream_new_with_proplist(u->context,
+                                            stream_name,
+                                            &u->source->sample_spec,
+                                            &u->source->channel_map,
+                                            proplist);
+    pa_proplist_free(proplist);
+    pa_xfree(stream_name);
+
+    if (!u->stream) {
+        pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
+        return -1;
     }
+
+    requested_latency = pa_source_get_requested_latency_within_thread(u->source);
+    if (requested_latency == (pa_usec_t) -1)
+        requested_latency = u->source->thread_info.max_latency;
+
+    reset_bufferattr(&bufferattr);
+    bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
+
+    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
+    pa_stream_set_read_callback(u->stream, stream_read_cb, u);
+    if (pa_stream_connect_record(u->stream,
+                                 u->remote_source_name,
+                                 &bufferattr,
+                                 PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_START_CORKED) < 0) {
+        pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context)));
+        return -1;
+    }
+    return 0;
 }
 
 static void source_update_requested_latency_cb(pa_source *s) {
@@ -434,13 +487,6 @@  static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t
 
             return 0;
         }
-
-        /* Delivered from the IO thread, handled in the main thread. */
-        case SOURCE_MESSAGE_PUT: {
-            if (u->connected)
-                pa_source_put(u->source);
-            return 0;
-        }
     }
     return pa_source_process_msg(o, code, data, offset, chunk);
 }
@@ -479,10 +525,30 @@  static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_
     return 0;
 }
 
+/* Handled in the main thread. */
+static int source_create(struct userdata *u) {
+    if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
+        pa_log("Failed to create source.");
+        return -1;
+    }
+
+    u->source->userdata = u;
+    u->source->parent.process_msg = source_process_msg_cb;
+    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
+    u->source->update_requested_latency = source_update_requested_latency_cb;
+
+    /* set thread message queue */
+    pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
+    pa_source_set_rtpoll(u->source, u->rtpoll);
+
+    pa_source_put(u->source);
+
+    return 0;
+}
+
 int pa__init(pa_module *m) {
     struct userdata *u = NULL;
     pa_modargs *ma = NULL;
-    pa_source_new_data source_data;
     pa_sample_spec ss;
     pa_channel_map map;
     const char *remote_server = NULL;
@@ -535,45 +601,36 @@  int pa__init(pa_module *m) {
      * only works because it calls pa_asyncmsq_process_one(). */
     u->rtpoll = pa_rtpoll_new();
 
-    /* Create source */
-    pa_source_new_data_init(&source_data);
-    source_data.driver = __FILE__;
-    source_data.module = m;
+    /* Create source data */
+    u->source_data = pa_xnew(pa_source_new_data, 1);
+    pa_source_new_data_init(u->source_data);
+    u->source_data->driver = __FILE__;
+    u->source_data->module = m;
 
     default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
     source_name = pa_modargs_get_value(ma, "source_name", default_source_name);
 
-    pa_source_new_data_set_name(&source_data, source_name);
-    pa_source_new_data_set_sample_spec(&source_data, &ss);
-    pa_source_new_data_set_channel_map(&source_data, &map);
+    pa_source_new_data_set_name(u->source_data, source_name);
+    pa_source_new_data_set_sample_spec(u->source_data, &ss);
+    pa_source_new_data_set_channel_map(u->source_data, &map);
 
-    pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
-    pa_proplist_setf(source_data.proplist,
+    pa_proplist_sets(u->source_data->proplist, PA_PROP_DEVICE_CLASS, "sound");
+    pa_proplist_setf(u->source_data->proplist,
                      PA_PROP_DEVICE_DESCRIPTION,
                      _("Tunnel to %s/%s"),
                      remote_server,
                      pa_strempty(u->remote_source_name));
 
-    if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
+    if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) {
         pa_log("Invalid properties");
-        pa_source_new_data_done(&source_data);
-        goto fail;
-    }
-    if (!(u->source = pa_source_new(m->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
-        pa_log("Failed to create source.");
-        pa_source_new_data_done(&source_data);
         goto fail;
     }
 
-    pa_source_new_data_done(&source_data);
-    u->source->userdata = u;
-    u->source->parent.process_msg = source_process_msg_cb;
-    u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
-    u->source->update_requested_latency = source_update_requested_latency_cb;
-
-    pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
-    pa_source_set_rtpoll(u->source, u->rtpoll);
+    /* setup initial message handler */
+    u->msg = pa_msgobject_new(tunnel_msg);
+    u->msg->parent.process_msg = tunnel_process_msg_cb;
 
+    /* start IO thread */
     if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
         pa_log("Failed to create thread.");
         goto fail;
@@ -629,6 +686,14 @@  void pa__done(pa_module *m) {
     if (u->remote_server)
         pa_xfree(u->remote_server);
 
+    if (u->msg)
+        pa_xfree(u->msg);
+
+    if (u->source_data) {
+        pa_source_new_data_done(u->source_data);
+        pa_xfree(u->source_data);
+    }
+
     if (u->source)
         pa_source_unref(u->source);
 
diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 960f8533..9ea1ab33 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -131,7 +131,8 @@  enum {
     SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
     SINK_MESSAGE_REMOTE_SUSPEND,
     SINK_MESSAGE_UPDATE_LATENCY,
-    SINK_MESSAGE_POST
+    SINK_MESSAGE_POST,
+    SINK_MESSAGE_CREATED,
 };
 
 #define DEFAULT_TLENGTH_MSEC 150
@@ -142,7 +143,7 @@  enum {
 enum {
     SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
     SOURCE_MESSAGE_REMOTE_SUSPEND,
-    SOURCE_MESSAGE_UPDATE_LATENCY
+    SOURCE_MESSAGE_UPDATE_LATENCY,
 };
 
 #define DEFAULT_FRAGSIZE_MSEC 25
@@ -196,10 +197,13 @@  struct userdata {
 
     char *server_name;
 #ifdef TUNNEL_SINK
+    pa_sink_new_data *sink_data;
     char *sink_name;
     pa_sink *sink;
     size_t requested_bytes;
+    bool sink_created;
 #else
+    pa_source_new_data *source_data;
     char *source_name;
     pa_source *source;
     pa_mcalign *mcalign;
@@ -241,6 +245,11 @@  struct userdata {
 };
 
 static void request_latency(struct userdata *u);
+#ifdef TUNNEL_SINK
+static int sink_create(struct userdata *u);
+#else
+static int source_create(struct userdata *u);
+#endif
 
 /* Called from main context */
 static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
@@ -550,6 +559,11 @@  static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
             return 0;
         }
 
+        case SINK_MESSAGE_CREATED:
+
+            u->sink_created = true;
+            return 0;
+
         case SINK_MESSAGE_POST:
 
             /* OK, This might be a bit confusing. This message is
@@ -717,7 +731,7 @@  static void thread_func(void *userdata) {
         int ret;
 
 #ifdef TUNNEL_SINK
-        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+        if (PA_UNLIKELY(u->sink_created && u->sink->thread_info.rewind_requested))
             pa_sink_process_rewind(u->sink, 0);
 #endif
 
@@ -1599,9 +1613,17 @@  static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
     }
 
 #ifdef TUNNEL_SINK
-    pa_sink_put(u->sink);
+    if (sink_create(u) < 0)
+        goto fail;
+    pa_sink_new_data_done(u->sink_data);
+    pa_xfree(u->sink_data);
+    u->sink_data = NULL;
 #else
-    pa_source_put(u->source);
+    if (source_create(u) < 0)
+        goto fail;
+    pa_source_new_data_done(u->source_data);
+    pa_xfree(u->source_data);
+    u->source_data = NULL;
 #endif
 
     /* Starting with protocol version 13 the MSB of the version tag
@@ -1922,6 +1944,58 @@  static void sink_set_mute(pa_sink *sink) {
 
 #endif
 
+#ifdef TUNNEL_SINK
+/* Called from main context */
+static int sink_create(struct userdata *u) {
+    if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_NETWORK|PA_SINK_LATENCY))) {
+        pa_log("Failed to create sink.");
+        return -1;
+    }
+
+    u->sink->userdata = u;
+    u->sink->parent.process_msg = sink_process_msg;
+    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+    pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
+    pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
+
+    u->sink->refresh_volume = u->sink->refresh_muted = false;
+
+/*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
+
+    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
+    pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+    pa_sink_put(u->sink);
+
+    pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_CREATED, NULL, 0, NULL, NULL);
+
+    return 0;
+}
+#else
+/* Called from main context */
+static int source_create(struct userdata *u) {
+    if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY))) {
+        pa_log("Failed to create source.");
+        return -1;
+    }
+
+    u->source->userdata = u;
+    u->source->parent.process_msg = source_process_msg;
+    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
+
+/*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
+
+    pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
+    pa_source_set_rtpoll(u->source, u->rtpoll);
+
+    u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
+
+    pa_source_put(u->source);
+
+    return 0;
+}
+#endif
+
 int pa__init(pa_module*m) {
     pa_modargs *ma = NULL;
     struct userdata *u = NULL;
@@ -1930,11 +2004,6 @@  int pa__init(pa_module*m) {
     pa_sample_spec ss;
     pa_channel_map map;
     char *dn = NULL;
-#ifdef TUNNEL_SINK
-    pa_sink_new_data data;
-#else
-    pa_source_new_data data;
-#endif
     bool automatic;
 #ifdef HAVE_X11
     xcb_connection_t *xcb = NULL;
@@ -2130,90 +2199,49 @@  int pa__init(pa_module*m) {
     pa_socket_client_set_callback(u->client, on_connection, u);
 
 #ifdef TUNNEL_SINK
+    u->sink_data = pa_xnew(pa_sink_new_data, 1);
+    pa_sink_new_data_init(u->sink_data);
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
         dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);
 
-    pa_sink_new_data_init(&data);
-    data.driver = __FILE__;
-    data.module = m;
-    data.namereg_fail = false;
-    pa_sink_new_data_set_name(&data, dn);
-    pa_sink_new_data_set_sample_spec(&data, &ss);
-    pa_sink_new_data_set_channel_map(&data, &map);
-    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
-    pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
+    u->sink_data->driver = __FILE__;
+    u->sink_data->module = m;
+    u->sink_data->namereg_fail = false;
+    pa_sink_new_data_set_name(u->sink_data, dn);
+    pa_sink_new_data_set_sample_spec(u->sink_data, &ss);
+    pa_sink_new_data_set_channel_map(u->sink_data, &map);
+    pa_proplist_setf(u->sink_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
+    pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.server", u->server_name);
     if (u->sink_name)
-        pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
+        pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.sink", u->sink_name);
 
-    if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+    if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) {
         pa_log("Invalid properties");
-        pa_sink_new_data_done(&data);
-        goto fail;
-    }
-
-    u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
-    pa_sink_new_data_done(&data);
-
-    if (!u->sink) {
-        pa_log("Failed to create sink.");
         goto fail;
     }
-
-    u->sink->parent.process_msg = sink_process_msg;
-    u->sink->userdata = u;
-    u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
-    pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
-    pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
-
-    u->sink->refresh_volume = u->sink->refresh_muted = false;
-
-/*     pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
-
-    pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
-    pa_sink_set_rtpoll(u->sink, u->rtpoll);
-
 #else
+    u->source_data = pa_xnew(pa_source_new_data, 1);
+    pa_source_new_data_init(u->source_data);
 
     if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
         dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);
 
-    pa_source_new_data_init(&data);
-    data.driver = __FILE__;
-    data.module = m;
-    data.namereg_fail = false;
-    pa_source_new_data_set_name(&data, dn);
-    pa_source_new_data_set_sample_spec(&data, &ss);
-    pa_source_new_data_set_channel_map(&data, &map);
-    pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
-    pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
+    u->source_data->driver = __FILE__;
+    u->source_data->module = m;
+    u->source_data->namereg_fail = false;
+    pa_source_new_data_set_name(u->source_data, dn);
+    pa_source_new_data_set_sample_spec(u->source_data, &ss);
+    pa_source_new_data_set_channel_map(u->source_data, &map);
+    pa_proplist_setf(u->source_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
+    pa_proplist_sets(u->source_data->proplist, "tunnel.remote.server", u->server_name);
     if (u->source_name)
-        pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
+        pa_proplist_sets(u->source_data->proplist, "tunnel.remote.source", u->source_name);
 
-    if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+    if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) {
         pa_log("Invalid properties");
-        pa_source_new_data_done(&data);
-        goto fail;
-    }
-
-    u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
-    pa_source_new_data_done(&data);
-
-    if (!u->source) {
-        pa_log("Failed to create source.");
         goto fail;
     }
-
-    u->source->parent.process_msg = source_process_msg;
-    u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
-    u->source->userdata = u;
-
-/*     pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
-
-    pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
-    pa_source_set_rtpoll(u->source, u->rtpoll);
-
-    u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
 #endif
 
     u->time_event = NULL;
@@ -2300,6 +2328,18 @@  void pa__done(pa_module*m) {
         pa_source_unref(u->source);
 #endif
 
+#ifdef TUNNEL_SINK
+    if (u->sink_data) {
+        pa_sink_new_data_done(u->sink_data);
+        pa_xfree(u->sink_data);
+    }
+#else
+    if (u->source_data) {
+        pa_source_new_data_done(u->source_data);
+        pa_xfree(u->source_data);
+    }
+#endif
+
     if (u->rtpoll)
         pa_rtpoll_free(u->rtpoll);