[Spice-devel,01/11] Move RedChannelClient to separate file

Submitted by Jonathon Jongsma on Aug. 9, 2016, 3:32 p.m.

Details

Message ID 1470756785-30252-2-git-send-email-jjongsma@redhat.com
State New
Headers show
Series "Backported patches from refactory (Aug 9)" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Jonathon Jongsma Aug. 9, 2016, 3:32 p.m.
Reduce direct access to RedChannelClient, and get ready to convert to
GObject.
---
 server/Makefile.am                  |    3 +
 server/cursor-channel.c             |    2 +
 server/dcc-private.h                |    1 +
 server/dcc.h                        |    2 +-
 server/display-channel.c            |    3 +-
 server/inputs-channel-client.c      |    1 +
 server/inputs-channel.c             |   15 +-
 server/main-channel-client.c        |   23 +-
 server/main-channel.c               |   39 +-
 server/red-channel-client-private.h |   78 ++
 server/red-channel-client.c         | 1626 +++++++++++++++++++++++++++++++
 server/red-channel-client.h         |  177 ++++
 server/red-channel.c                | 1835 +++--------------------------------
 server/red-channel.h                |  185 +---
 server/red-qxl.c                    |   24 +-
 server/red-worker.c                 |    3 +-
 server/reds.c                       |   22 +-
 server/smartcard.c                  |    2 +-
 server/sound.c                      |    8 +-
 server/spicevmc.c                   |   31 +-
 server/stream.c                     |   11 +-
 21 files changed, 2146 insertions(+), 1945 deletions(-)
 create mode 100644 server/red-channel-client-private.h
 create mode 100644 server/red-channel-client.c
 create mode 100644 server/red-channel-client.h

Patch hide | download patch | download mbox

diff --git a/server/Makefile.am b/server/Makefile.am
index 24e6e21..a249046 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -95,6 +95,9 @@  libserver_la_SOURCES =				\
 	mjpeg-encoder.c				\
 	red-channel.c				\
 	red-channel.h				\
+	red-channel-client.c			\
+	red-channel-client.h			\
+	red-channel-client-private.h		\
 	red-common.h				\
 	dispatcher.c				\
 	dispatcher.h				\
diff --git a/server/cursor-channel.c b/server/cursor-channel.c
index dac929a..83ce61b 100644
--- a/server/cursor-channel.c
+++ b/server/cursor-channel.c
@@ -21,7 +21,9 @@ 
 
 #include <glib.h>
 #include <common/generated_server_marshallers.h>
+
 #include "cursor-channel.h"
+#include "red-channel-client-private.h"
 #include "cache-item.h"
 
 #define CLIENT_CURSOR_CACHE_SIZE 256
diff --git a/server/dcc-private.h b/server/dcc-private.h
index 02b51dd..dc5750b 100644
--- a/server/dcc-private.h
+++ b/server/dcc-private.h
@@ -22,6 +22,7 @@ 
 #include "dcc.h"
 #include "image-encoders.h"
 #include "stream.h"
+#include "red-channel-client-private.h"
 
 struct DisplayChannelClient {
     RedChannelClient base;
diff --git a/server/dcc.h b/server/dcc.h
index 672fb2f..0659ce7 100644
--- a/server/dcc.h
+++ b/server/dcc.h
@@ -59,7 +59,7 @@  typedef struct FreeList {
 
 typedef struct DisplayChannelClient DisplayChannelClient;
 
-#define DCC_TO_DC(dcc) ((DisplayChannel*)((RedChannelClient*)dcc)->channel)
+#define DCC_TO_DC(dcc) ((DisplayChannel*)red_channel_client_get_channel((RedChannelClient*)dcc))
 #define RCC_TO_DCC(rcc) ((DisplayChannelClient*)rcc)
 
 typedef struct RedSurfaceCreateItem {
diff --git a/server/display-channel.c b/server/display-channel.c
index 4f52b79..71acfd0 100644
--- a/server/display-channel.c
+++ b/server/display-channel.c
@@ -1858,8 +1858,7 @@  static void on_disconnect(RedChannelClient *rcc)
 
 static int handle_migrate_flush_mark(RedChannelClient *rcc)
 {
-    DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base);
-    RedChannel *channel = RED_CHANNEL(display_channel);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
     red_channel_pipes_add_type(channel, RED_PIPE_ITEM_TYPE_MIGRATE_DATA);
     return TRUE;
diff --git a/server/inputs-channel-client.c b/server/inputs-channel-client.c
index 2f8acec..547ca43 100644
--- a/server/inputs-channel-client.c
+++ b/server/inputs-channel-client.c
@@ -21,6 +21,7 @@ 
 #include "inputs-channel-client.h"
 #include "inputs-channel.h"
 #include "migration-protocol.h"
+#include "red-channel-client-private.h"
 
 struct InputsChannelClient {
     RedChannelClient base;
diff --git a/server/inputs-channel.c b/server/inputs-channel.c
index 69d0391..da0f027 100644
--- a/server/inputs-channel.c
+++ b/server/inputs-channel.c
@@ -39,6 +39,7 @@ 
 #include "reds.h"
 #include "reds-stream.h"
 #include "red-channel.h"
+#include "red-channel-client.h"
 #include "inputs-channel-client.h"
 #include "main-channel-client.h"
 #include "inputs-channel.h"
@@ -155,7 +156,7 @@  static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
                                                  uint16_t type,
                                                  uint32_t size)
 {
-    InputsChannel *inputs_channel = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+    InputsChannel *inputs_channel = (InputsChannel*)red_channel_client_get_channel(rcc);
 
     if (size > RECEIVE_BUF_SIZE) {
         spice_printerr("error: too large incoming message");
@@ -263,7 +264,7 @@  static void inputs_channel_send_item(RedChannelClient *rcc, RedPipeItem *base)
             red_channel_client_init_send_data(rcc, SPICE_MSG_INPUTS_MOUSE_MOTION_ACK, base);
             break;
         case RED_PIPE_ITEM_MIGRATE_DATA:
-            ((InputsChannel*)rcc->channel)->src_during_migrate = FALSE;
+            ((InputsChannel*)red_channel_client_get_channel(rcc))->src_during_migrate = FALSE;
             inputs_channel_client_send_migrate_data(rcc, m, base);
             break;
         default:
@@ -276,7 +277,7 @@  static void inputs_channel_send_item(RedChannelClient *rcc, RedPipeItem *base)
 static int inputs_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type,
                                         void *message)
 {
-    InputsChannel *inputs_channel = (InputsChannel *)rcc->channel;
+    InputsChannel *inputs_channel = (InputsChannel *)red_channel_client_get_channel(rcc);
     InputsChannelClient *icc = (InputsChannelClient *)rcc;
     uint32_t i;
     RedsState *reds = red_channel_get_server(&inputs_channel->base);
@@ -462,13 +463,13 @@  static void inputs_channel_on_disconnect(RedChannelClient *rcc)
     if (!rcc) {
         return;
     }
-    inputs_release_keys(SPICE_CONTAINEROF(rcc->channel, InputsChannel, base));
+    inputs_release_keys((InputsChannel*)red_channel_client_get_channel(rcc));
 }
 
 static void inputs_pipe_add_init(RedChannelClient *rcc)
 {
     RedInputsInitPipeItem *item = spice_malloc(sizeof(RedInputsInitPipeItem));
-    InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+    InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
 
     red_pipe_item_init(&item->base, RED_PIPE_ITEM_INPUTS_INIT);
     item->modifiers = kbd_get_leds(inputs_channel_get_keyboard(inputs));
@@ -515,7 +516,7 @@  static void inputs_connect(RedChannel *channel, RedClient *client,
 
 static void inputs_migrate(RedChannelClient *rcc)
 {
-    InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+    InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
     inputs->src_during_migrate = TRUE;
     red_channel_client_default_migrate(rcc);
 }
@@ -552,7 +553,7 @@  static int inputs_channel_handle_migrate_data(RedChannelClient *rcc,
                                               void *message)
 {
     InputsChannelClient *icc = (InputsChannelClient*)rcc;
-    InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base);
+    InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc);
     SpiceMigrateDataHeader *header;
     SpiceMigrateDataInputs *mig_data;
 
diff --git a/server/main-channel-client.c b/server/main-channel-client.c
index acf1bd4..879cd77 100644
--- a/server/main-channel-client.c
+++ b/server/main-channel-client.c
@@ -23,7 +23,9 @@ 
 #include "main-channel-client.h"
 #include <common/generated_server_marshallers.h>
 
+#include "main-channel-client.h"
 #include "main-channel.h"
+#include "red-channel-client-private.h"
 #include "reds.h"
 
 #define NET_TEST_WARMUP_BYTES 0
@@ -636,9 +638,10 @@  static void main_channel_marshall_channels(RedChannelClient *rcc,
                                            RedPipeItem *item)
 {
     SpiceMsgChannels* channels_info;
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
     red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_CHANNELS_LIST, item);
-    channels_info = reds_msg_channels_new(rcc->channel->reds);
+    channels_info = reds_msg_channels_new(channel->reds);
     spice_marshall_msg_main_channels_list(m, channels_info);
     free(channels_info);
 }
@@ -711,8 +714,9 @@  static void main_channel_marshall_migrate_data_item(RedChannelClient *rcc,
                                                     SpiceMarshaller *m,
                                                     RedPipeItem *item)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
-    reds_marshall_migrate_data(rcc->channel->reds, m); // TODO: from reds split. ugly separation.
+    reds_marshall_migrate_data(channel->reds, m); // TODO: from reds split. ugly separation.
 }
 
 static void main_channel_marshall_init(RedChannelClient *rcc,
@@ -720,7 +724,7 @@  static void main_channel_marshall_init(RedChannelClient *rcc,
                                        RedInitPipeItem *item)
 {
     SpiceMsgMainInit init; // TODO - remove this copy, make RedInitPipeItem reuse SpiceMsgMainInit
-
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
     red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_INIT, &item->base);
     init.session_id = item->connection_id;
@@ -730,7 +734,7 @@  static void main_channel_marshall_init(RedChannelClient *rcc,
     if (item->is_client_mouse_allowed) {
         init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
     }
-    init.agent_connected = reds_has_vdagent(rcc->channel->reds);
+    init.agent_connected = reds_has_vdagent(channel->reds);
     init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
     init.multi_media_time = item->multi_media_time;
     init.ram_hint = item->ram_hint;
@@ -772,11 +776,12 @@  static void main_channel_fill_migrate_dst_info(MainChannel *main_channel,
 static void main_channel_marshall_migrate_begin(SpiceMarshaller *m, RedChannelClient *rcc,
                                                 RedPipeItem *item)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     SpiceMsgMainMigrationBegin migrate;
     MainChannel *main_ch;
 
     red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN, item);
-    main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+    main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
     main_channel_fill_migrate_dst_info(main_ch, &migrate.dst_info);
     spice_marshall_msg_main_migrate_begin(m, &migrate);
 }
@@ -785,11 +790,12 @@  static void main_channel_marshall_migrate_begin_seamless(SpiceMarshaller *m,
                                                          RedChannelClient *rcc,
                                                          RedPipeItem *item)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     SpiceMsgMainMigrateBeginSeamless migrate_seamless;
     MainChannel *main_ch;
 
     red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN_SEAMLESS, item);
-    main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+    main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
     main_channel_fill_migrate_dst_info(main_ch, &migrate_seamless.dst_info);
     migrate_seamless.src_mig_version = SPICE_MIGRATION_PROTOCOL_VERSION;
     spice_marshall_msg_main_migrate_begin_seamless(m, &migrate_seamless);
@@ -809,12 +815,13 @@  static void main_channel_marshall_multi_media_time(RedChannelClient *rcc,
 static void main_channel_marshall_migrate_switch(SpiceMarshaller *m, RedChannelClient *rcc,
                                                  RedPipeItem *item)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     SpiceMsgMainMigrationSwitchHost migrate;
     MainChannel *main_ch;
 
     spice_printerr("");
     red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST, item);
-    main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+    main_ch = SPICE_CONTAINEROF(channel, MainChannel, base);
     migrate.port = main_ch->mig_target.port;
     migrate.sport = main_ch->mig_target.sport;
     migrate.host_size = strlen(main_ch->mig_target.host) + 1;
@@ -854,7 +861,7 @@  void main_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *base)
         base->type != RED_PIPE_ITEM_TYPE_MAIN_INIT) {
         spice_printerr("Init msg for client %p was not sent yet "
                        "(client is probably during semi-seamless migration). Ignoring msg type %d",
-                   rcc->client, base->type);
+                   red_channel_client_get_client(rcc), base->type);
         return;
     }
     switch (base->type) {
diff --git a/server/main-channel.c b/server/main-channel.c
index 42195e6..b25dba2 100644
--- a/server/main-channel.c
+++ b/server/main-channel.c
@@ -35,9 +35,10 @@  int main_channel_is_connected(MainChannel *main_chan)
  */
 static void main_channel_client_on_disconnect(RedChannelClient *rcc)
 {
-    RedsState *reds = red_channel_get_server(rcc->channel);
+    RedsState *reds = red_channel_get_server(red_channel_client_get_channel(rcc));
     spice_printerr("rcc=%p", rcc);
-    main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds), rcc->client);
+    main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds),
+                                      red_channel_client_get_client(rcc));
 }
 
 RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id)
@@ -48,7 +49,7 @@  RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t c
     FOREACH_CLIENT(main_chan, link, next, rcc) {
         MainChannelClient *mcc = (MainChannelClient*) rcc;
         if (main_channel_client_get_connection_id(mcc) == connection_id) {
-            return rcc->client;
+            return red_channel_client_get_client(rcc);
         }
     }
     return NULL;
@@ -56,12 +57,13 @@  RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t c
 
 static void main_channel_push_channels(MainChannelClient *mcc)
 {
-    if (red_client_during_migrate_at_target((main_channel_client_get_base(mcc))->client)) {
+    RedChannelClient *rcc = main_channel_client_get_base(mcc);
+    if (red_client_during_migrate_at_target(red_channel_client_get_client(rcc))) {
         spice_printerr("warning: ignoring unexpected SPICE_MSGC_MAIN_ATTACH_CHANNELS"
                    "during migration");
         return;
     }
-    red_channel_client_pipe_add_type(main_channel_client_get_base(mcc), RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST);
+    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST);
 }
 
 void main_channel_push_mouse_mode(MainChannel *main_chan, int current_mode,
@@ -98,11 +100,12 @@  static void main_channel_push_migrate_data_item(MainChannel *main_chan)
 static int main_channel_handle_migrate_data(RedChannelClient *rcc,
     uint32_t size, void *message)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     MainChannelClient *mcc = (MainChannelClient*)rcc;
     SpiceMigrateDataHeader *header = (SpiceMigrateDataHeader *)message;
 
     /* not supported with multi-clients */
-    spice_assert(g_list_length(rcc->channel->clients) == 1);
+    spice_assert(g_list_length(channel->clients) == 1);
 
     if (size < sizeof(SpiceMigrateDataHeader) + sizeof(SpiceMigrateDataMain)) {
         spice_printerr("bad message size %u", size);
@@ -114,7 +117,7 @@  static int main_channel_handle_migrate_data(RedChannelClient *rcc,
         spice_error("bad header");
         return FALSE;
     }
-    return reds_handle_migrate_data(rcc->channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size);
+    return reds_handle_migrate_data(channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size);
 }
 
 void main_channel_push_multi_media_time(MainChannel *main_chan, int time)
@@ -151,7 +154,8 @@  void main_channel_migrate_switch(MainChannel *main_chan, RedsMigSpice *mig_targe
 static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type,
                                       void *message)
 {
-    MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
     MainChannelClient *mcc = (MainChannelClient*)rcc;
 
     switch (type) {
@@ -163,18 +167,18 @@  static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
             return FALSE;
         }
         tokens = (SpiceMsgcMainAgentStart *)message;
-        reds_on_main_agent_start(rcc->channel->reds, mcc, tokens->num_tokens);
+        reds_on_main_agent_start(channel->reds, mcc, tokens->num_tokens);
         break;
     }
     case SPICE_MSGC_MAIN_AGENT_DATA: {
-        reds_on_main_agent_data(rcc->channel->reds, mcc, message, size);
+        reds_on_main_agent_data(channel->reds, mcc, message, size);
         break;
     }
     case SPICE_MSGC_MAIN_AGENT_TOKEN: {
         SpiceMsgcMainAgentTokens *tokens;
 
         tokens = (SpiceMsgcMainAgentTokens *)message;
-        reds_on_main_agent_tokens(rcc->channel->reds, mcc, tokens->num_tokens);
+        reds_on_main_agent_tokens(channel->reds, mcc, tokens->num_tokens);
         break;
     }
     case SPICE_MSGC_MAIN_ATTACH_CHANNELS:
@@ -198,7 +202,7 @@  static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
             ((SpiceMsgcMainMigrateDstDoSeamless *)message)->src_version);
         break;
     case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST:
-        reds_on_main_mouse_mode_request(rcc->channel->reds, message, size);
+        reds_on_main_mouse_mode_request(channel->reds, message, size);
         break;
     case SPICE_MSGC_PONG: {
         main_channel_client_handle_pong(mcc, (SpiceMsgPing *)message, size);
@@ -219,11 +223,12 @@  static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
                                                uint16_t type,
                                                uint32_t size)
 {
-    MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
     MainChannelClient *mcc = (MainChannelClient*)rcc;
 
     if (type == SPICE_MSGC_MAIN_AGENT_DATA) {
-        return reds_get_agent_data_buffer(rcc->channel->reds, mcc, size);
+        return reds_get_agent_data_buffer(channel->reds, mcc, size);
     } else {
         return main_chan->recv_buf;
     }
@@ -234,8 +239,9 @@  static void main_channel_release_msg_rcv_buf(RedChannelClient *rcc,
                                                uint32_t size,
                                                uint8_t *msg)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     if (type == SPICE_MSGC_MAIN_AGENT_DATA) {
-        reds_release_agent_data_buffer(rcc->channel->reds, msg);
+        reds_release_agent_data_buffer(channel->reds, msg);
     }
 }
 
@@ -246,8 +252,9 @@  static int main_channel_config_socket(RedChannelClient *rcc)
 
 static int main_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
 {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
     spice_debug(NULL);
-    main_channel_push_migrate_data_item(SPICE_CONTAINEROF(rcc->channel,
+    main_channel_push_migrate_data_item(SPICE_CONTAINEROF(channel,
                                         MainChannel, base));
     return TRUE;
 }
diff --git a/server/red-channel-client-private.h b/server/red-channel-client-private.h
new file mode 100644
index 0000000..4ad4d90
--- /dev/null
+++ b/server/red-channel-client-private.h
@@ -0,0 +1,78 @@ 
+/*
+    Copyright (C) 2009-2015 Red Hat, Inc.
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2.1 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _H_RED_CHANNEL_CLIENT_PRIVATE
+#define _H_RED_CHANNEL_CLIENT_PRIVATE
+
+#include "red-channel-client.h"
+#include "red-channel.h"
+
+struct RedChannelClient {
+    RedChannel *channel;
+    RedClient  *client;
+    RedsStream *stream;
+    int dummy;
+    int dummy_connected;
+
+    uint32_t refs;
+
+    struct {
+        uint32_t generation;
+        uint32_t client_generation;
+        uint32_t messages_window;
+        uint32_t client_window;
+    } ack_data;
+
+    struct {
+        SpiceMarshaller *marshaller;
+        SpiceDataHeaderOpaque header;
+        uint32_t size;
+        RedPipeItem *item;
+        int blocked;
+        uint64_t serial;
+        uint64_t last_sent_serial;
+
+        struct {
+            SpiceMarshaller *marshaller;
+            uint8_t *header_data;
+            RedPipeItem *item;
+        } main;
+
+        struct {
+            SpiceMarshaller *marshaller;
+        } urgent;
+    } send_data;
+
+    OutgoingHandler outgoing;
+    IncomingHandler incoming;
+    int during_send;
+    int id; // debugging purposes
+    Ring pipe;
+    uint32_t pipe_size;
+
+    RedChannelCapabilities remote_caps;
+    int is_mini_header;
+    gboolean destroying;
+
+    int wait_migrate_data;
+    int wait_migrate_flush_mark;
+
+    RedChannelClientLatencyMonitor latency_monitor;
+    RedChannelClientConnectivityMonitor connectivity_monitor;
+};
+
+#endif /* _H_RED_CHANNEL_CLIENT_PRIVATE */
diff --git a/server/red-channel-client.c b/server/red-channel-client.c
new file mode 100644
index 0000000..908dc85
--- /dev/null
+++ b/server/red-channel-client.c
@@ -0,0 +1,1626 @@ 
+/*
+    Copyright (C) 2009-2015 Red Hat, Inc.
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2.1 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/ioctl.h>
+#ifdef HAVE_LINUX_SOCKIOS_H
+#include <linux/sockios.h> /* SIOCOUTQ */
+#endif
+#include <common/generated_server_marshallers.h>
+
+#include "red-channel-client-private.h"
+#include "red-channel.h"
+
+#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15)
+#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10)
+
+enum QosPingState {
+    PING_STATE_NONE,
+    PING_STATE_TIMER,
+    PING_STATE_WARMUP,
+    PING_STATE_LATENCY,
+};
+
+enum ConnectivityState {
+    CONNECTIVITY_STATE_CONNECTED,
+    CONNECTIVITY_STATE_BLOCKED,
+    CONNECTIVITY_STATE_WAIT_PONG,
+    CONNECTIVITY_STATE_DISCONNECTED,
+};
+
+typedef struct RedEmptyMsgPipeItem {
+    RedPipeItem base;
+    int msg;
+} RedEmptyMsgPipeItem;
+
+typedef struct MarkerPipeItem {
+    RedPipeItem base;
+    gboolean *item_in_pipe;
+} MarkerPipeItem;
+
+static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout)
+{
+    if (!rcc->latency_monitor.timer) {
+        return;
+    }
+    if (rcc->latency_monitor.state != PING_STATE_NONE) {
+        return;
+    }
+    rcc->latency_monitor.state = PING_STATE_TIMER;
+    rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout);
+}
+
+static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc)
+{
+    if (!rcc->latency_monitor.timer) {
+        return;
+    }
+    if (rcc->latency_monitor.state != PING_STATE_TIMER) {
+        return;
+    }
+
+    rcc->channel->core->timer_cancel(rcc->latency_monitor.timer);
+    rcc->latency_monitor.state = PING_STATE_NONE;
+}
+
+static void red_channel_client_restart_ping_timer(RedChannelClient *rcc)
+{
+    uint64_t passed, timeout;
+
+    passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC;
+    timeout = PING_TEST_IDLE_NET_TIMEOUT_MS;
+    if (passed  < PING_TEST_TIMEOUT_MS) {
+        timeout += PING_TEST_TIMEOUT_MS - passed;
+    }
+
+    red_channel_client_start_ping_timer(rcc, timeout);
+}
+
+RedChannel* red_channel_client_get_channel(RedChannelClient* rcc)
+{
+    return rcc->channel;
+}
+
+IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc)
+{
+    return &rcc->incoming;
+}
+
+void red_channel_client_on_output(void *opaque, int n)
+{
+    RedChannelClient *rcc = opaque;
+
+    if (rcc->connectivity_monitor.timer) {
+        rcc->connectivity_monitor.out_bytes += n;
+    }
+    stat_inc_counter(reds, rcc->channel->out_bytes_counter, n);
+}
+
+void red_channel_client_on_input(void *opaque, int n)
+{
+    RedChannelClient *rcc = opaque;
+
+    if (rcc->connectivity_monitor.timer) {
+        rcc->connectivity_monitor.in_bytes += n;
+    }
+}
+
+int red_channel_client_get_out_msg_size(void *opaque)
+{
+    RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+    return rcc->send_data.size;
+}
+
+void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec,
+                                             int *vec_size, int pos)
+{
+    RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+    *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
+                                            vec, IOV_MAX, pos);
+}
+
+void red_channel_client_on_out_block(void *opaque)
+{
+    RedChannelClient *rcc = (RedChannelClient *)opaque;
+
+    rcc->send_data.blocked = TRUE;
+    rcc->channel->core->watch_update_mask(rcc->stream->watch,
+                                     SPICE_WATCH_EVENT_READ |
+                                     SPICE_WATCH_EVENT_WRITE);
+}
+
+static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
+{
+    return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller);
+}
+
+static void red_channel_client_reset_send_data(RedChannelClient *rcc)
+{
+    spice_marshaller_reset(rcc->send_data.marshaller);
+    rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller,
+                                                                rcc->send_data.header.header_size);
+    spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size);
+    rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0);
+    rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0);
+
+    /* Keeping the serial consecutive: resetting it if reset_send_data
+     * has been called before, but no message has been sent since then.
+     */
+    if (rcc->send_data.last_sent_serial != rcc->send_data.serial) {
+        spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1);
+        /*  When the urgent marshaller is active, the serial was incremented by
+         *  the call to reset_send_data that was made for the main marshaller.
+         *  The urgent msg receives this serial, and the main msg serial is
+         *  the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial)
+         *  should be 1 in this case*/
+        if (!red_channel_client_urgent_marshaller_is_active(rcc)) {
+            rcc->send_data.serial = rcc->send_data.last_sent_serial;
+        }
+    }
+    rcc->send_data.serial++;
+
+    if (!rcc->is_mini_header) {
+        spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller);
+        rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0);
+        rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+    }
+}
+
+static void red_channel_client_send_set_ack(RedChannelClient *rcc)
+{
+    SpiceMsgSetAck ack;
+
+    spice_assert(rcc);
+    red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
+    ack.generation = ++rcc->ack_data.generation;
+    ack.window = rcc->ack_data.client_window;
+    rcc->ack_data.messages_window = 0;
+
+    spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
+
+    red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_migrate(RedChannelClient *rcc)
+{
+    SpiceMsgMigrate migrate;
+
+    red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL);
+    migrate.flags = rcc->channel->migration_flags;
+    spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate);
+    if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) {
+        rcc->wait_migrate_flush_mark = TRUE;
+    }
+
+    red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_ping(RedChannelClient *rcc)
+{
+    SpiceMsgPing ping;
+
+    if (!rcc->latency_monitor.warmup_was_sent) { // latency test start
+        int delay_val;
+        socklen_t opt_size = sizeof(delay_val);
+
+        rcc->latency_monitor.warmup_was_sent = TRUE;
+        /*
+         * When testing latency, TCP_NODELAY must be switched on, otherwise,
+         * sending the ping message is delayed by Nagle algorithm, and the
+         * roundtrip measurment is less accurate (bigger).
+         */
+        rcc->latency_monitor.tcp_nodelay = 1;
+        if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+                       &opt_size) == -1) {
+            spice_warning("getsockopt failed, %s", strerror(errno));
+        }  else {
+            rcc->latency_monitor.tcp_nodelay = delay_val;
+            if (!delay_val) {
+                delay_val = 1;
+                if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+                               sizeof(delay_val)) == -1) {
+                   if (errno != ENOTSUP) {
+                        spice_warning("setsockopt failed, %s", strerror(errno));
+                    }
+                }
+            }
+        }
+    }
+
+    red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL);
+    ping.id = rcc->latency_monitor.id;
+    ping.timestamp = spice_get_monotonic_time_ns();
+    spice_marshall_msg_ping(rcc->send_data.marshaller, &ping);
+    red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base)
+{
+    RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base);
+
+    red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL);
+    red_channel_client_begin_send_message(rcc);
+}
+
+static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item)
+{
+    spice_assert(red_channel_client_no_item_being_sent(rcc));
+    red_channel_client_reset_send_data(rcc);
+    switch (item->type) {
+        case RED_PIPE_ITEM_TYPE_SET_ACK:
+            red_channel_client_send_set_ack(rcc);
+            break;
+        case RED_PIPE_ITEM_TYPE_MIGRATE:
+            red_channel_client_send_migrate(rcc);
+            break;
+        case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
+            red_channel_client_send_empty_msg(rcc, item);
+            break;
+        case RED_PIPE_ITEM_TYPE_PING:
+            red_channel_client_send_ping(rcc);
+            break;
+        case RED_PIPE_ITEM_TYPE_MARKER:
+            break;
+        default:
+            rcc->channel->channel_cbs.send_item(rcc, item);
+            break;
+    }
+    red_pipe_item_unref(item);
+}
+
+static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
+{
+    if (rcc->send_data.item) {
+        red_pipe_item_unref(rcc->send_data.item);
+        rcc->send_data.item = NULL;
+    }
+}
+
+static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
+{
+    spice_marshaller_reset(rcc->send_data.urgent.marshaller);
+    rcc->send_data.marshaller = rcc->send_data.main.marshaller;
+    rcc->send_data.header.data = rcc->send_data.main.header_data;
+    if (!rcc->is_mini_header) {
+        rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+    }
+    rcc->send_data.item = rcc->send_data.main.item;
+}
+
+void red_channel_client_on_out_msg_done(void *opaque)
+{
+    RedChannelClient *rcc = (RedChannelClient *)opaque;
+    int fd;
+
+    rcc->send_data.size = 0;
+
+    if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) {
+        if (reds_stream_send_msgfd(rcc->stream, fd) < 0) {
+            perror("sendfd");
+            red_channel_client_disconnect(rcc);
+            if (fd != -1)
+                close(fd);
+            return;
+        }
+        if (fd != -1)
+            close(fd);
+    }
+
+    red_channel_client_release_sent_item(rcc);
+    if (rcc->send_data.blocked) {
+        rcc->send_data.blocked = FALSE;
+        rcc->channel->core->watch_update_mask(rcc->stream->watch,
+                                         SPICE_WATCH_EVENT_READ);
+    }
+
+    if (red_channel_client_urgent_marshaller_is_active(rcc)) {
+        red_channel_client_restore_main_sender(rcc);
+        spice_assert(rcc->send_data.header.data != NULL);
+        red_channel_client_begin_send_message(rcc);
+    } else {
+        if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) {
+            /* It is possible that the socket will become idle, so we may be able to test latency */
+            red_channel_client_restart_ping_timer(rcc);
+        }
+    }
+
+}
+
+static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item)
+{
+    rcc->pipe_size--;
+    ring_remove(&item->link);
+}
+
+static void red_channel_client_set_remote_caps(RedChannelClient* rcc,
+                                               int num_common_caps, uint32_t *common_caps,
+                                               int num_caps, uint32_t *caps)
+{
+    rcc->remote_caps.num_common_caps = num_common_caps;
+    rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t));
+
+    rcc->remote_caps.num_caps = num_caps;
+    rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
+}
+
+static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc)
+{
+    rcc->remote_caps.num_common_caps = 0;
+    free(rcc->remote_caps.common_caps);
+    rcc->remote_caps.num_caps = 0;
+    free(rcc->remote_caps.caps);
+}
+
+int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap)
+{
+    return test_capability(rcc->remote_caps.common_caps,
+                           rcc->remote_caps.num_common_caps,
+                           cap);
+}
+
+int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap)
+{
+    return test_capability(rcc->remote_caps.caps,
+                           rcc->remote_caps.num_caps,
+                           cap);
+}
+
+static void red_channel_client_push_ping(RedChannelClient *rcc)
+{
+    spice_assert(rcc->latency_monitor.state == PING_STATE_NONE);
+    rcc->latency_monitor.state = PING_STATE_WARMUP;
+    rcc->latency_monitor.warmup_was_sent = FALSE;
+    rcc->latency_monitor.id = rand();
+    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
+    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
+}
+
+static void red_channel_client_ping_timer(void *opaque)
+{
+    RedChannelClient *rcc = opaque;
+
+    spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER);
+    red_channel_client_cancel_ping_timer(rcc);
+
+#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */
+    {
+        int so_unsent_size = 0;
+
+        /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */
+        if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) {
+            spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno));
+        }
+        if (so_unsent_size > 0) {
+            /* tcp snd buffer is still occupied. rescheduling ping */
+            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+        } else {
+            red_channel_client_push_ping(rcc);
+        }
+    }
+#else /* ifdef HAVE_LINUX_SOCKIOS_H */
+    /* More portable alternative code path (less accurate but avoids bogus ioctls)*/
+    red_channel_client_push_ping(rcc);
+#endif /* ifdef HAVE_LINUX_SOCKIOS_H */
+}
+
+static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
+{
+    return (rcc->channel->handle_acks &&
+            (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
+}
+
+/*
+ * When a connection is not alive (and we can't detect it via a socket error), we
+ * reach one of these 2 states:
+ * (1) Sending msgs is blocked: either writes return EAGAIN
+ *     or we are missing MSGC_ACK from the client.
+ * (2) MSG_PING was sent without receiving a MSGC_PONG in reply.
+ *
+ * The connectivity_timer callback tests if the channel's state matches one of the above.
+ * In case it does, on the next time the timer is called, it checks if the connection has
+ * been idle during the time that passed since the previous timer call. If the connection
+ * has been idle, we consider the client as disconnected.
+ */
+static void red_channel_client_connectivity_timer(void *opaque)
+{
+    RedChannelClient *rcc = opaque;
+    RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor;
+    int is_alive = TRUE;
+
+    if (monitor->state == CONNECTIVITY_STATE_BLOCKED) {
+        if (monitor->in_bytes == 0 && monitor->out_bytes == 0) {
+            if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) {
+                spice_error("mismatch between rcc-state and connectivity-state");
+            }
+            spice_debug("rcc is blocked; connection is idle");
+            is_alive = FALSE;
+        }
+    } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) {
+        if (monitor->in_bytes == 0) {
+            if (rcc->latency_monitor.state != PING_STATE_WARMUP &&
+                rcc->latency_monitor.state != PING_STATE_LATENCY) {
+                spice_error("mismatch between rcc-state and connectivity-state");
+            }
+            spice_debug("rcc waits for pong; connection is idle");
+            is_alive = FALSE;
+        }
+    }
+
+    if (is_alive) {
+        monitor->in_bytes = 0;
+        monitor->out_bytes = 0;
+        if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) {
+            monitor->state = CONNECTIVITY_STATE_BLOCKED;
+        } else if (rcc->latency_monitor.state == PING_STATE_WARMUP ||
+                   rcc->latency_monitor.state == PING_STATE_LATENCY) {
+            monitor->state = CONNECTIVITY_STATE_WAIT_PONG;
+        } else {
+             monitor->state = CONNECTIVITY_STATE_CONNECTED;
+        }
+        rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+                                        rcc->connectivity_monitor.timeout);
+    } else {
+        monitor->state = CONNECTIVITY_STATE_DISCONNECTED;
+        spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting",
+                      rcc, rcc->channel->type, rcc->channel->id, monitor->timeout);
+        red_channel_client_disconnect(rcc);
+    }
+}
+
+void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms)
+{
+    if (!red_channel_client_is_connected(rcc)) {
+        return;
+    }
+    spice_debug(NULL);
+    spice_assert(timeout_ms > 0);
+    /*
+     * If latency_monitor is not active, we activate it in order to enable
+     * periodic ping messages so that we will be be able to identify a disconnected
+     * channel-client even if there are no ongoing channel specific messages
+     * on this channel.
+     */
+    if (rcc->latency_monitor.timer == NULL) {
+        rcc->latency_monitor.timer = rcc->channel->core->timer_add(
+            rcc->channel->core, red_channel_client_ping_timer, rcc);
+        if (!red_client_during_migrate_at_target(rcc->client)) {
+            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+        }
+        rcc->latency_monitor.roundtrip = -1;
+    }
+    if (rcc->connectivity_monitor.timer == NULL) {
+        rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED;
+        rcc->connectivity_monitor.timer = rcc->channel->core->timer_add(
+            rcc->channel->core, red_channel_client_connectivity_timer, rcc);
+        rcc->connectivity_monitor.timeout = timeout_ms;
+        if (!red_client_during_migrate_at_target(rcc->client)) {
+           rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+                                           rcc->connectivity_monitor.timeout);
+        }
+    }
+}
+
+static void red_channel_client_event(int fd, int event, void *data)
+{
+    RedChannelClient *rcc = (RedChannelClient *)data;
+
+    red_channel_client_ref(rcc);
+    if (event & SPICE_WATCH_EVENT_READ) {
+        red_channel_client_receive(rcc);
+    }
+    if (event & SPICE_WATCH_EVENT_WRITE) {
+        red_channel_client_push(rcc);
+    }
+    red_channel_client_unref(rcc);
+}
+
+static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+    return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size);
+}
+
+static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+    return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size);
+}
+
+static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+    return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type);
+}
+
+static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+    return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type);
+}
+
+static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+    ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type);
+}
+
+static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+    ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type);
+}
+
+static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+    ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size);
+}
+
+static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+    ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size);
+}
+
+static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+    ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial);
+}
+
+static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+    spice_error("attempt to set header serial on mini header");
+}
+
+static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+    ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list);
+}
+
+static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+    spice_error("attempt to set header sub list on mini header");
+}
+
+static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
+                                                          full_header_set_msg_type,
+                                                          full_header_set_msg_size,
+                                                          full_header_set_msg_serial,
+                                                          full_header_set_msg_sub_list,
+                                                          full_header_get_msg_type,
+                                                          full_header_get_msg_size};
+
+static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
+                                                          mini_header_set_msg_type,
+                                                          mini_header_set_msg_size,
+                                                          mini_header_set_msg_serial,
+                                                          mini_header_set_msg_sub_list,
+                                                          mini_header_get_msg_type,
+                                                          mini_header_get_msg_size};
+
+static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient  *client)
+{
+    if (red_client_get_channel(client, channel->type, channel->id)) {
+        spice_printerr("Error client %p: duplicate channel type %d id %d",
+                       client, channel->type, channel->id);
+        return FALSE;
+    }
+    return TRUE;
+}
+
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient  *client,
+                                            RedsStream *stream,
+                                            int monitor_latency,
+                                            int num_common_caps, uint32_t *common_caps,
+                                            int num_caps, uint32_t *caps)
+{
+    RedChannelClient *rcc = NULL;
+
+    pthread_mutex_lock(&client->lock);
+    if (!red_channel_client_pre_create_validate(channel, client)) {
+        goto error;
+    }
+    spice_assert(stream && channel && size >= sizeof(RedChannelClient));
+    rcc = spice_malloc0(size);
+    rcc->stream = stream;
+    rcc->channel = channel;
+    rcc->client = client;
+    rcc->refs = 1;
+    rcc->ack_data.messages_window = ~0;  // blocks send message (maybe use send_data.blocked +
+                                             // block flags)
+    rcc->ack_data.client_generation = ~0;
+    rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
+    rcc->send_data.main.marshaller = spice_marshaller_new();
+    rcc->send_data.urgent.marshaller = spice_marshaller_new();
+
+    rcc->send_data.marshaller = rcc->send_data.main.marshaller;
+
+    rcc->incoming.opaque = rcc;
+    rcc->incoming.cb = &channel->incoming_cb;
+
+    rcc->outgoing.opaque = rcc;
+    rcc->outgoing.cb = &channel->outgoing_cb;
+    rcc->outgoing.pos = 0;
+    rcc->outgoing.size = 0;
+
+    red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+    if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+        rcc->incoming.header = mini_header_wrapper;
+        rcc->send_data.header = mini_header_wrapper;
+        rcc->is_mini_header = TRUE;
+    } else {
+        rcc->incoming.header = full_header_wrapper;
+        rcc->send_data.header = full_header_wrapper;
+        rcc->is_mini_header = FALSE;
+    }
+
+    rcc->incoming.header.data = rcc->incoming.header_buf;
+    rcc->incoming.serial = 1;
+
+    if (!channel->channel_cbs.config_socket(rcc)) {
+        goto error;
+    }
+
+    ring_init(&rcc->pipe);
+    rcc->pipe_size = 0;
+
+    stream->watch = channel->core->watch_add(channel->core,
+                                           stream->socket,
+                                           SPICE_WATCH_EVENT_READ,
+                                           red_channel_client_event, rcc);
+    rcc->id = g_list_length(channel->clients);
+    red_channel_add_client(channel, rcc);
+    red_client_add_channel(client, rcc);
+    red_channel_ref(channel);
+    pthread_mutex_unlock(&client->lock);
+
+    if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) {
+        rcc->latency_monitor.timer = channel->core->timer_add(
+            channel->core, red_channel_client_ping_timer, rcc);
+        if (!client->during_target_migrate) {
+            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+        }
+        rcc->latency_monitor.roundtrip = -1;
+    }
+
+    return rcc;
+error:
+    free(rcc);
+    reds_stream_free(stream);
+    pthread_mutex_unlock(&client->lock);
+    return NULL;
+}
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+                                                  RedChannel *channel,
+                                                  RedClient  *client,
+                                                  int num_common_caps, uint32_t *common_caps,
+                                                  int num_caps, uint32_t *caps)
+{
+    RedChannelClient *rcc = NULL;
+
+    spice_assert(size >= sizeof(RedChannelClient));
+
+    pthread_mutex_lock(&client->lock);
+    if (!red_channel_client_pre_create_validate(channel, client)) {
+        goto error;
+    }
+    rcc = spice_malloc0(size);
+    rcc->refs = 1;
+    rcc->client = client;
+    rcc->channel = channel;
+    red_channel_ref(channel);
+    red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+    if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+        rcc->incoming.header = mini_header_wrapper;
+        rcc->send_data.header = mini_header_wrapper;
+        rcc->is_mini_header = TRUE;
+    } else {
+        rcc->incoming.header = full_header_wrapper;
+        rcc->send_data.header = full_header_wrapper;
+        rcc->is_mini_header = FALSE;
+    }
+
+    rcc->incoming.header.data = rcc->incoming.header_buf;
+    rcc->incoming.serial = 1;
+    ring_init(&rcc->pipe);
+
+    rcc->dummy = TRUE;
+    rcc->dummy_connected = TRUE;
+    red_channel_add_client(channel, rcc);
+    red_client_add_channel(client, rcc);
+    pthread_mutex_unlock(&client->lock);
+    return rcc;
+error:
+    pthread_mutex_unlock(&client->lock);
+    return NULL;
+}
+
+void red_channel_client_seamless_migration_done(RedChannelClient *rcc)
+{
+    rcc->wait_migrate_data = FALSE;
+
+    if (red_client_seamless_migration_done_for_channel(rcc->client)) {
+        if (rcc->latency_monitor.timer) {
+            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+        }
+        if (rcc->connectivity_monitor.timer) {
+            rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
+                                            rcc->connectivity_monitor.timeout);
+        }
+    }
+}
+
+void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc)
+{
+    if (rcc->latency_monitor.timer) {
+        red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
+    }
+}
+
+int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc)
+{
+    return rcc->wait_migrate_data;
+}
+
+void red_channel_client_default_migrate(RedChannelClient *rcc)
+{
+    if (rcc->latency_monitor.timer) {
+        red_channel_client_cancel_ping_timer(rcc);
+        rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
+        rcc->latency_monitor.timer = NULL;
+    }
+    if (rcc->connectivity_monitor.timer) {
+       rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
+        rcc->connectivity_monitor.timer = NULL;
+    }
+    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE);
+}
+
+void red_channel_client_ref(RedChannelClient *rcc)
+{
+    rcc->refs++;
+}
+
+void red_channel_client_unref(RedChannelClient *rcc)
+{
+    if (--rcc->refs != 0) {
+        return;
+    }
+
+    spice_debug("destroy rcc=%p", rcc);
+
+    reds_stream_free(rcc->stream);
+    rcc->stream = NULL;
+
+    if (rcc->send_data.main.marshaller) {
+        spice_marshaller_destroy(rcc->send_data.main.marshaller);
+    }
+
+    if (rcc->send_data.urgent.marshaller) {
+        spice_marshaller_destroy(rcc->send_data.urgent.marshaller);
+    }
+
+    red_channel_client_destroy_remote_caps(rcc);
+    if (rcc->channel) {
+        red_channel_unref(rcc->channel);
+    }
+    free(rcc);
+}
+
+void red_channel_client_destroy(RedChannelClient *rcc)
+{
+    rcc->destroying = TRUE;
+    red_channel_client_disconnect(rcc);
+    red_client_remove_channel(rcc);
+    red_channel_client_unref(rcc);
+}
+
+void red_channel_client_shutdown(RedChannelClient *rcc)
+{
+    if (rcc->stream && !rcc->stream->shutdown) {
+        rcc->channel->core->watch_remove(rcc->stream->watch);
+        rcc->stream->watch = NULL;
+        shutdown(rcc->stream->socket, SHUT_RDWR);
+        rcc->stream->shutdown = TRUE;
+    }
+}
+
+static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
+{
+    ssize_t n;
+
+    if (!stream) {
+        return;
+    }
+
+    if (handler->size == 0) {
+        handler->vec = handler->vec_buf;
+        handler->size = handler->cb->get_msg_size(handler->opaque);
+        if (!handler->size) {  // nothing to be sent
+            return;
+        }
+    }
+
+    for (;;) {
+        handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+        n = reds_stream_writev(stream, handler->vec, handler->vec_size);
+        if (n == -1) {
+            switch (errno) {
+            case EAGAIN:
+                handler->cb->on_block(handler->opaque);
+                return;
+            case EINTR:
+                continue;
+            case EPIPE:
+                handler->cb->on_error(handler->opaque);
+                return;
+            default:
+                spice_printerr("%s", strerror(errno));
+                handler->cb->on_error(handler->opaque);
+                return;
+            }
+        } else {
+            handler->pos += n;
+            handler->cb->on_output(handler->opaque, n);
+            if (handler->pos == handler->size) { // finished writing data
+                /* reset handler before calling on_msg_done, since it
+                 * can trigger another call to red_peer_handle_outgoing (when
+                 * switching from the urgent marshaller to the main one */
+                handler->vec = handler->vec_buf;
+                handler->pos = 0;
+                handler->size = 0;
+                handler->cb->on_msg_done(handler->opaque);
+                return;
+            }
+        }
+    }
+}
+
+/* return the number of bytes read. -1 in case of error */
+static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
+{
+    uint8_t *pos = buf;
+    while (size) {
+        int now;
+        if (stream->shutdown) {
+            return -1;
+        }
+        now = reds_stream_read(stream, pos, size);
+        if (now <= 0) {
+            if (now == 0) {
+                return -1;
+            }
+            spice_assert(now == -1);
+            if (errno == EAGAIN) {
+                break;
+            } else if (errno == EINTR) {
+                continue;
+            } else if (errno == EPIPE) {
+                return -1;
+            } else {
+                spice_printerr("%s", strerror(errno));
+                return -1;
+            }
+        } else {
+            size -= now;
+            pos += now;
+        }
+    }
+    return pos - buf;
+}
+
+// TODO: this implementation, as opposed to the old implementation in red_worker,
+// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
+// arithmetic for the case where a single cb_read could return multiple messages. But
+// this is suboptimal potentially. Profile and consider fixing.
+static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler)
+{
+    int bytes_read;
+    uint8_t *parsed;
+    size_t parsed_size;
+    message_destructor_t parsed_free;
+    uint16_t msg_type;
+    uint32_t msg_size;
+
+    /* XXX: This needs further investigation as to the underlying cause, it happened
+     * after spicec disconnect (but not with spice-gtk) repeatedly. */
+    if (!stream) {
+        return;
+    }
+
+    for (;;) {
+        int ret_handle;
+        if (handler->header_pos < handler->header.header_size) {
+            bytes_read = red_peer_receive(stream,
+                                          handler->header.data + handler->header_pos,
+                                          handler->header.header_size - handler->header_pos);
+            if (bytes_read == -1) {
+                handler->cb->on_error(handler->opaque);
+                return;
+            }
+            handler->cb->on_input(handler->opaque, bytes_read);
+            handler->header_pos += bytes_read;
+
+            if (handler->header_pos != handler->header.header_size) {
+                return;
+            }
+        }
+
+        msg_size = handler->header.get_msg_size(&handler->header);
+        msg_type = handler->header.get_msg_type(&handler->header);
+        if (handler->msg_pos < msg_size) {
+            if (!handler->msg) {
+                handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size);
+                if (handler->msg == NULL) {
+                    spice_printerr("ERROR: channel refused to allocate buffer.");
+                    handler->cb->on_error(handler->opaque);
+                    return;
+                }
+            }
+
+            bytes_read = red_peer_receive(stream,
+                                          handler->msg + handler->msg_pos,
+                                          msg_size - handler->msg_pos);
+            if (bytes_read == -1) {
+                handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+                handler->cb->on_error(handler->opaque);
+                return;
+            }
+            handler->cb->on_input(handler->opaque, bytes_read);
+            handler->msg_pos += bytes_read;
+            if (handler->msg_pos != msg_size) {
+                return;
+            }
+        }
+
+        if (handler->cb->parser) {
+            parsed = handler->cb->parser(handler->msg,
+                handler->msg + msg_size, msg_type,
+                SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
+            if (parsed == NULL) {
+                spice_printerr("failed to parse message type %d", msg_type);
+                handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+                handler->cb->on_error(handler->opaque);
+                return;
+            }
+            ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
+                                                    msg_type, parsed);
+            parsed_free(parsed);
+        } else {
+            ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size,
+                                                     handler->msg);
+        }
+        handler->msg_pos = 0;
+        handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
+        handler->msg = NULL;
+        handler->header_pos = 0;
+
+        if (!ret_handle) {
+            handler->cb->on_error(handler->opaque);
+            return;
+        }
+    }
+}
+
+void red_channel_client_receive(RedChannelClient *rcc)
+{
+    red_channel_client_ref(rcc);
+    red_peer_handle_incoming(rcc->stream, &rcc->incoming);
+    red_channel_client_unref(rcc);
+}
+
+void red_channel_client_send(RedChannelClient *rcc)
+{
+    red_channel_client_ref(rcc);
+    red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
+    red_channel_client_unref(rcc);
+}
+
+static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc)
+{
+    RedPipeItem *item;
+
+    if (!rcc || rcc->send_data.blocked
+             || red_channel_client_waiting_for_ack(rcc)
+             || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) {
+        return NULL;
+    }
+    red_channel_client_pipe_remove(rcc, item);
+    return item;
+}
+
+void red_channel_client_push(RedChannelClient *rcc)
+{
+    RedPipeItem *pipe_item;
+
+    if (!rcc->during_send) {
+        rcc->during_send = TRUE;
+    } else {
+        return;
+    }
+    red_channel_client_ref(rcc);
+    if (rcc->send_data.blocked) {
+        red_channel_client_send(rcc);
+    }
+
+    if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) {
+        rcc->send_data.blocked = TRUE;
+        spice_printerr("ERROR: an item waiting to be sent and not blocked");
+    }
+
+    while ((pipe_item = red_channel_client_pipe_item_get(rcc))) {
+        red_channel_client_send_item(rcc, pipe_item);
+    }
+    if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe)
+        && rcc->stream->watch) {
+        rcc->channel->core->watch_update_mask(rcc->stream->watch,
+                                              SPICE_WATCH_EVENT_READ);
+    }
+    rcc->during_send = FALSE;
+    red_channel_client_unref(rcc);
+}
+
+int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc)
+{
+    if (rcc->latency_monitor.roundtrip < 0) {
+        return rcc->latency_monitor.roundtrip;
+    }
+    return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC;
+}
+
+void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
+{
+    rcc->ack_data.messages_window = 0;
+    red_channel_client_push(rcc);
+}
+
+static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping)
+{
+    uint64_t now;
+
+    /* ignoring unexpected pongs, or post-migration pongs for pings that
+     * started just before migration */
+    if (ping->id != rcc->latency_monitor.id) {
+        spice_warning("ping-id (%u)!= pong-id %u",
+                      rcc->latency_monitor.id, ping->id);
+        return;
+    }
+
+    now = spice_get_monotonic_time_ns();
+
+    if (rcc->latency_monitor.state == PING_STATE_WARMUP) {
+        rcc->latency_monitor.state = PING_STATE_LATENCY;
+        return;
+    } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) {
+        spice_warning("unexpected");
+        return;
+    }
+
+    /* set TCP_NODELAY=0, in case we reverted it for the test*/
+    if (!rcc->latency_monitor.tcp_nodelay) {
+        int delay_val = 0;
+
+        if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+                       sizeof(delay_val)) == -1) {
+            if (errno != ENOTSUP) {
+                spice_warning("setsockopt failed, %s", strerror(errno));
+            }
+        }
+    }
+
+    /*
+     * The real network latency shouldn't change during the connection. However,
+     *  the measurements can be bigger than the real roundtrip due to other
+     *  threads or processes that are utilizing the network. We update the roundtrip
+     *  measurement with the minimal value we encountered till now.
+     */
+    if (rcc->latency_monitor.roundtrip < 0 ||
+        now - ping->timestamp < rcc->latency_monitor.roundtrip) {
+        rcc->latency_monitor.roundtrip = now - ping->timestamp;
+        spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC);
+    }
+
+    rcc->latency_monitor.last_pong_time = now;
+    rcc->latency_monitor.state = PING_STATE_NONE;
+    red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS);
+}
+
+static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
+{
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    if (channel->channel_cbs.handle_migrate_flush_mark) {
+        channel->channel_cbs.handle_migrate_flush_mark(rcc);
+    }
+}
+
+// TODO: the whole migration is broken with multiple clients. What do we want to do?
+// basically just
+//  1) source send mark to all
+//  2) source gets at various times the data (waits for all)
+//  3) source migrates to target
+//  4) target sends data to all
+// So need to make all the handlers work with per channel/client data (what data exactly?)
+static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
+{
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    spice_debug("channel type %d id %d rcc %p size %u",
+                channel->type, channel->id, rcc, size);
+    if (!channel->channel_cbs.handle_migrate_data) {
+        return;
+    }
+    if (!red_channel_client_is_waiting_for_migrate_data(rcc)) {
+        spice_channel_client_error(rcc, "unexpected");
+        return;
+    }
+    if (channel->channel_cbs.handle_migrate_data_get_serial) {
+        red_channel_client_set_message_serial(rcc,
+            channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message));
+    }
+    if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) {
+        spice_channel_client_error(rcc, "handle_migrate_data failed");
+        return;
+    }
+    red_channel_client_seamless_migration_done(rcc);
+}
+
+
+int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
+                                      uint16_t type, void *message)
+{
+    switch (type) {
+    case SPICE_MSGC_ACK_SYNC:
+        if (size != sizeof(uint32_t)) {
+            spice_printerr("bad message size");
+            return FALSE;
+        }
+        rcc->ack_data.client_generation = *(uint32_t *)(message);
+        break;
+    case SPICE_MSGC_ACK:
+        if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
+            rcc->ack_data.messages_window -= rcc->ack_data.client_window;
+            red_channel_client_push(rcc);
+        }
+        break;
+    case SPICE_MSGC_DISCONNECTING:
+        break;
+    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
+        if (!rcc->wait_migrate_flush_mark) {
+            spice_error("unexpected flush mark");
+            return FALSE;
+        }
+        red_channel_handle_migrate_flush_mark(rcc);
+        rcc->wait_migrate_flush_mark = FALSE;
+        break;
+    case SPICE_MSGC_MIGRATE_DATA:
+        red_channel_handle_migrate_data(rcc, size, message);
+        break;
+    case SPICE_MSGC_PONG:
+        red_channel_client_handle_pong(rcc, message);
+        break;
+    default:
+        spice_printerr("invalid message type %u", type);
+        return FALSE;
+    }
+    return TRUE;
+}
+
+void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item)
+{
+    spice_assert(red_channel_client_no_item_being_sent(rcc));
+    spice_assert(msg_type != 0);
+    rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type);
+    rcc->send_data.item = item;
+    if (item) {
+        red_pipe_item_ref(item);
+    }
+}
+
+void red_channel_client_begin_send_message(RedChannelClient *rcc)
+{
+    SpiceMarshaller *m = rcc->send_data.marshaller;
+
+    // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
+    if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) {
+        spice_printerr("BUG: header->type == 0");
+        return;
+    }
+
+    /* canceling the latency test timer till the nework is idle */
+    red_channel_client_cancel_ping_timer(rcc);
+
+    spice_marshaller_flush(m);
+    rcc->send_data.size = spice_marshaller_get_total_size(m);
+    rcc->send_data.header.set_msg_size(&rcc->send_data.header,
+                                       rcc->send_data.size - rcc->send_data.header.header_size);
+    rcc->ack_data.messages_window++;
+    rcc->send_data.last_sent_serial = rcc->send_data.serial;
+    rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
+    red_channel_client_send(rcc);
+}
+
+SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc)
+{
+    spice_assert(red_channel_client_no_item_being_sent(rcc));
+    spice_assert(rcc->send_data.header.data != NULL);
+    rcc->send_data.main.header_data = rcc->send_data.header.data;
+    rcc->send_data.main.item = rcc->send_data.item;
+
+    rcc->send_data.marshaller = rcc->send_data.urgent.marshaller;
+    rcc->send_data.item = NULL;
+    red_channel_client_reset_send_data(rcc);
+    return rcc->send_data.marshaller;
+}
+
+uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
+{
+    return rcc->send_data.serial;
+}
+
+void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
+{
+    rcc->send_data.last_sent_serial = serial;
+    rcc->send_data.serial = serial;
+}
+
+static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos)
+{
+    spice_assert(rcc && item);
+    if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) {
+        spice_debug("rcc is disconnected %p", rcc);
+        red_pipe_item_unref(item);
+        return FALSE;
+    }
+    if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) {
+        rcc->channel->core->watch_update_mask(rcc->stream->watch,
+                                         SPICE_WATCH_EVENT_READ |
+                                         SPICE_WATCH_EVENT_WRITE);
+    }
+    rcc->pipe_size++;
+    ring_add(pos, &item->link);
+    return TRUE;
+}
+
+void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item)
+{
+
+    client_pipe_add(rcc, item, &rcc->pipe);
+}
+
+void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item)
+{
+    red_channel_client_pipe_add(rcc, item);
+    red_channel_client_push(rcc);
+}
+
+void red_channel_client_pipe_add_after(RedChannelClient *rcc,
+                                       RedPipeItem *item,
+                                       RedPipeItem *pos)
+{
+    spice_assert(pos);
+    client_pipe_add(rcc, item, &pos->link);
+}
+
+int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
+                                           RedPipeItem *item)
+{
+    return ring_item_is_linked(&item->link);
+}
+
+void red_channel_client_pipe_add_tail(RedChannelClient *rcc,
+                                      RedPipeItem *item)
+{
+    client_pipe_add(rcc, item, rcc->pipe.prev);
+}
+
+void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item)
+{
+    if (client_pipe_add(rcc, item, rcc->pipe.prev)) {
+        red_channel_client_push(rcc);
+    }
+}
+
+void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)
+{
+    RedPipeItem *item = spice_new(RedPipeItem, 1);
+
+    red_pipe_item_init(item, pipe_item_type);
+    red_channel_client_pipe_add(rcc, item);
+    red_channel_client_push(rcc);
+}
+
+void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type)
+{
+    RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1);
+
+    red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG);
+    item->msg = msg_type;
+    red_channel_client_pipe_add(rcc, &item->base);
+    red_channel_client_push(rcc);
+}
+
+gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc)
+{
+    g_return_val_if_fail(rcc != NULL, TRUE);
+    return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe));
+}
+
+uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc)
+{
+    return rcc->pipe_size;
+}
+
+int red_channel_client_is_connected(RedChannelClient *rcc)
+{
+    if (!rcc->dummy) {
+        return rcc->channel
+            && (g_list_find(rcc->channel->clients, rcc) != NULL);
+    } else {
+        return rcc->dummy_connected;
+    }
+}
+
+static void red_channel_client_clear_sent_item(RedChannelClient *rcc)
+{
+    red_channel_client_release_sent_item(rcc);
+    rcc->send_data.blocked = FALSE;
+    rcc->send_data.size = 0;
+}
+
+static void red_channel_client_pipe_clear(RedChannelClient *rcc)
+{
+    RedPipeItem *item;
+
+    if (rcc) {
+        red_channel_client_clear_sent_item(rcc);
+    }
+    while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) {
+        ring_remove(&item->link);
+        red_pipe_item_unref(item);
+    }
+    rcc->pipe_size = 0;
+}
+
+void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
+{
+    rcc->ack_data.messages_window = 0;
+}
+
+void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
+{
+    rcc->ack_data.client_window = client_window;
+}
+
+void red_channel_client_push_set_ack(RedChannelClient *rcc)
+{
+    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK);
+}
+
+static void red_channel_client_disconnect_dummy(RedChannelClient *rcc)
+{
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    GList *link;
+    spice_assert(rcc->dummy);
+    if (channel && (link = g_list_find(channel->clients, rcc))) {
+        spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel,
+                       channel->type, channel->id);
+        red_channel_remove_client(channel, link->data);
+    }
+    rcc->dummy_connected = FALSE;
+}
+
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+    RedChannel *channel = rcc->channel;
+
+    if (rcc->dummy) {
+        red_channel_client_disconnect_dummy(rcc);
+        return;
+    }
+    if (!red_channel_client_is_connected(rcc)) {
+        return;
+    }
+    spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel,
+                   channel->type, channel->id);
+    red_channel_client_pipe_clear(rcc);
+    if (rcc->stream->watch) {
+        channel->core->watch_remove(rcc->stream->watch);
+        rcc->stream->watch = NULL;
+    }
+    if (rcc->latency_monitor.timer) {
+        channel->core->timer_remove(rcc->latency_monitor.timer);
+        rcc->latency_monitor.timer = NULL;
+    }
+    if (rcc->connectivity_monitor.timer) {
+        channel->core->timer_remove(rcc->connectivity_monitor.timer);
+        rcc->connectivity_monitor.timer = NULL;
+    }
+    red_channel_remove_client(channel, rcc);
+    channel->channel_cbs.on_disconnect(rcc);
+}
+
+int red_channel_client_is_blocked(RedChannelClient *rcc)
+{
+    return rcc && rcc->send_data.blocked;
+}
+
+int red_channel_client_send_message_pending(RedChannelClient *rcc)
+{
+    return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0;
+}
+
+SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
+{
+    return rcc->send_data.marshaller;
+}
+
+RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
+{
+    return rcc->stream;
+}
+
+RedClient *red_channel_client_get_client(RedChannelClient *rcc)
+{
+    return rcc->client;
+}
+
+void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list)
+{
+    rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list);
+}
+
+static void marker_pipe_item_free(RedPipeItem *base)
+{
+    MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base);
+
+    if (item->item_in_pipe) {
+        *item->item_in_pipe = FALSE;
+    }
+    free(item);
+}
+
+/* TODO: more evil sync stuff. anything with the word wait in it's name. */
+int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
+                                           RedPipeItem *item,
+                                           int64_t timeout)
+{
+    uint64_t end_time;
+    gboolean item_in_pipe;
+
+    spice_info(NULL);
+
+    if (timeout != -1) {
+        end_time = spice_get_monotonic_time_ns() + timeout;
+    } else {
+        end_time = UINT64_MAX;
+    }
+
+    MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1);
+
+    red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER,
+                            marker_pipe_item_free);
+    item_in_pipe = TRUE;
+    mark_item->item_in_pipe = &item_in_pipe;
+    red_channel_client_pipe_add_after(rcc, &mark_item->base, item);
+
+    if (red_channel_client_is_blocked(rcc)) {
+        red_channel_client_receive(rcc);
+        red_channel_client_send(rcc);
+    }
+    red_channel_client_push(rcc);
+
+    while(item_in_pipe &&
+          (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) {
+        usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
+        red_channel_client_receive(rcc);
+        red_channel_client_send(rcc);
+        red_channel_client_push(rcc);
+    }
+
+    if (item_in_pipe) {
+        // still on the queue, make sure won't overwrite the stack variable
+        mark_item->item_in_pipe = NULL;
+        spice_warning("timeout");
+        return FALSE;
+    } else {
+        return red_channel_client_wait_outgoing_item(rcc,
+                                                     timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns());
+    }
+}
+
+int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
+                                          int64_t timeout)
+{
+    uint64_t end_time;
+    int blocked;
+
+    if (!red_channel_client_is_blocked(rcc)) {
+        return TRUE;
+    }
+    if (timeout != -1) {
+        end_time = spice_get_monotonic_time_ns() + timeout;
+    } else {
+        end_time = UINT64_MAX;
+    }
+    spice_info("blocked");
+
+    do {
+        usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
+        red_channel_client_receive(rcc);
+        red_channel_client_send(rcc);
+    } while ((blocked = red_channel_client_is_blocked(rcc)) &&
+             (timeout == -1 || spice_get_monotonic_time_ns() < end_time));
+
+    if (blocked) {
+        spice_warning("timeout");
+        return FALSE;
+    } else {
+        spice_assert(red_channel_client_no_item_being_sent(rcc));
+        return TRUE;
+    }
+}
+
+void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc)
+{
+    if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) {
+        red_channel_client_disconnect(rcc);
+    } else {
+        spice_assert(red_channel_client_no_item_being_sent(rcc));
+    }
+}
+
+gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc)
+{
+    return !rcc || (rcc->send_data.size == 0);
+}
+
+void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
+                                                RedPipeItem *item)
+{
+    red_channel_client_pipe_remove(rcc, item);
+    red_pipe_item_unref(item);
+}
+
+/* client mutex should be locked before this call */
+gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc)
+{
+    gboolean ret = FALSE;
+
+    if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) {
+        rcc->wait_migrate_data = TRUE;
+        ret = TRUE;
+    }
+    spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc,
+        rcc->wait_migrate_data);
+
+    return ret;
+}
+
+void red_channel_client_set_destroying(RedChannelClient *rcc)
+{
+    rcc->destroying = TRUE;
+}
+
+gboolean red_channel_client_is_destroying(RedChannelClient *rcc)
+{
+    return rcc->destroying;
+}
diff --git a/server/red-channel-client.h b/server/red-channel-client.h
new file mode 100644
index 0000000..e254ab0
--- /dev/null
+++ b/server/red-channel-client.h
@@ -0,0 +1,177 @@ 
+/*
+    Copyright (C) 2009-2015 Red Hat, Inc.
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2.1 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _H_RED_CHANNEL_CLIENT
+#define _H_RED_CHANNEL_CLIENT
+
+#include <common/marshaller.h>
+
+#include "red-pipe-item.h"
+#include "reds-stream.h"
+
+typedef struct RedChannel RedChannel;
+typedef struct RedClient RedClient;
+typedef struct IncomingHandler IncomingHandler;
+
+typedef struct RedChannelClient RedChannelClient;
+
+/*
+ * When an error occurs over a channel, we treat it as a warning
+ * for spice-server and shutdown the channel.
+ */
+#define spice_channel_client_error(rcc, format, ...)                                     \
+    do {                                                                                 \
+        RedChannel *_ch = red_channel_client_get_channel(rcc);                           \
+        spice_warning("rcc %p type %u id %u: " format, rcc,                              \
+                    _ch->type, _ch->id, ## __VA_ARGS__);                         \
+        red_channel_client_shutdown(rcc);                                                \
+    } while (0)
+
+RedChannelClient *red_channel_client_create(int size, RedChannel *channel,
+                                            RedClient *client, RedsStream *stream,
+                                            int monitor_latency,
+                                            int num_common_caps, uint32_t *common_caps,
+                                            int num_caps, uint32_t *caps);
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+                                                  RedChannel *channel,
+                                                  RedClient  *client,
+                                                  int num_common_caps, uint32_t *common_caps,
+                                                  int num_caps, uint32_t *caps);
+
+void red_channel_client_ref(RedChannelClient *rcc);
+void red_channel_client_unref(RedChannelClient *rcc);
+
+int red_channel_client_is_connected(RedChannelClient *rcc);
+void red_channel_client_default_migrate(RedChannelClient *rcc);
+int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc);
+void red_channel_client_destroy(RedChannelClient *rcc);
+int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap);
+int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap);
+/* shutdown is the only safe thing to do out of the client/channel
+ * thread. It will not touch the rings, just shutdown the socket.
+ * It should be followed by some way to gurantee a disconnection. */
+void red_channel_client_shutdown(RedChannelClient *rcc);
+/* handles general channel msgs from the client */
+int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
+                                      uint16_t type, void *message);
+/* when preparing send_data: should call init and then use marshaller */
+void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item);
+
+uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
+void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
+
+/* When sending a msg. Should first call red_channel_client_begin_send_message.
+ * It will first send the pending urgent data, if there is any, and then
+ * the rest of the data.
+ */
+void red_channel_client_begin_send_message(RedChannelClient *rcc);
+
+/*
+ * Stores the current send data, and switches to urgent send data.
+ * When it begins the actual send, it will send first the urgent data
+ * and afterward the rest of the data.
+ * Should be called only if during the marshalling of on message,
+ * the need to send another message, before, rises.
+ * Important: the serial of the non-urgent sent data, will be succeeded.
+ * return: the urgent send data marshaller
+ */
+SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc);
+
+/* returns -1 if we don't have an estimation */
+int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc);
+
+/* Checks periodically if the connection is still alive */
+void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms);
+
+void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos);
+int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item);
+void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item);
+/* for types that use this routine -> the pipe item should be freed */
+void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type);
+void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type);
+gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc);
+uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc);
+
+void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
+void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
+void red_channel_client_push_set_ack(RedChannelClient *rcc);
+
+gboolean red_channel_client_is_blocked(RedChannelClient *rcc);
+
+/* helper for channels that have complex logic that can possibly ready a send */
+int red_channel_client_send_message_pending(RedChannelClient *rcc);
+
+gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc);
+void red_channel_client_push(RedChannelClient *rcc);
+// TODO: again - what is the context exactly? this happens in channel disconnect. but our
+// current red_channel_shutdown also closes the socket - is there a socket to close?
+// are we reading from an fd here? arghh
+void red_channel_client_receive(RedChannelClient *rcc);
+void red_channel_client_send(RedChannelClient *rcc);
+void red_channel_client_disconnect(RedChannelClient *rcc);
+
+/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
+SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
+RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
+RedClient *red_channel_client_get_client(RedChannelClient *rcc);
+
+/* Note that the header is valid only between red_channel_reset_send_data and
+ * red_channel_begin_send_message.*/
+void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list);
+
+/*
+ * blocking functions.
+ *
+ * timeout is in nano sec. -1 for no timeout.
+ *
+ * Return: TRUE if waiting succeeded. FALSE if timeout expired.
+ */
+
+int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
+                                           RedPipeItem *item,
+                                           int64_t timeout);
+int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
+                                          int64_t timeout);
+void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc);
+
+RedChannel* red_channel_client_get_channel(RedChannelClient* rcc);
+IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc);
+
+void red_channel_client_on_output(void *opaque, int n);
+void red_channel_client_on_input(void *opaque, int n);
+int red_channel_client_get_out_msg_size(void *opaque);
+void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec,
+                                             int *vec_size, int pos);
+void red_channel_client_on_out_block(void *opaque);
+void red_channel_client_on_out_msg_done(void *opaque);
+
+void red_channel_client_seamless_migration_done(RedChannelClient *rcc);
+void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc);
+void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc);
+
+gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc);
+void red_channel_client_set_destroying(RedChannelClient *rcc);
+gboolean red_channel_client_is_destroying(RedChannelClient *rcc);
+
+#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client))
+
+#endif /* _H_RED_CHANNEL_CLIENT */
diff --git a/server/red-channel.c b/server/red-channel.c
index a02c51a..f206e4d 100644
--- a/server/red-channel.c
+++ b/server/red-channel.c
@@ -22,20 +22,6 @@ 
 #include <config.h>
 #endif
 
-#include <glib.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <errno.h>
-#include <sys/ioctl.h>
-#ifdef HAVE_LINUX_SOCKIOS_H
-#include <linux/sockios.h> /* SIOCOUTQ */
-#endif
-
-#include <common/generated_server_marshallers.h>
 #include <common/ring.h>
 
 #include "red-channel.h"
@@ -44,46 +30,6 @@ 
 #include "main-dispatcher.h"
 #include "utils.h"
 
-typedef struct RedEmptyMsgPipeItem {
-    RedPipeItem base;
-    int msg;
-} RedEmptyMsgPipeItem;
-
-typedef struct MarkerPipeItem {
-    RedPipeItem base;
-    gboolean *item_in_pipe;
-} MarkerPipeItem;
-
-#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15)
-#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10)
-
-#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro
-
-enum QosPingState {
-    PING_STATE_NONE,
-    PING_STATE_TIMER,
-    PING_STATE_WARMUP,
-    PING_STATE_LATENCY,
-};
-
-enum ConnectivityState {
-    CONNECTIVITY_STATE_CONNECTED,
-    CONNECTIVITY_STATE_BLOCKED,
-    CONNECTIVITY_STATE_WAIT_PONG,
-    CONNECTIVITY_STATE_DISCONNECTED,
-};
-
-static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout);
-static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc);
-static void red_channel_client_restart_ping_timer(RedChannelClient *rcc);
-
-static void red_channel_client_event(int fd, int event, void *data);
-static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
-static void red_client_remove_channel(RedChannelClient *rcc);
-static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id);
-static void red_channel_client_restore_main_sender(RedChannelClient *rcc);
-static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc);
-
 /*
  * Lifetime of RedChannel, RedChannelClient and RedClient:
  * RedChannel is created and destroyed by the calls to
@@ -118,561 +64,23 @@  static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc);
  * If a call to red_channel_client_destroy is made from another location, it must be called
  * from the channel's thread.
 */
-static void red_channel_ref(RedChannel *channel);
-static void red_channel_unref(RedChannel *channel);
-
-static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
-{
-    return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size);
-}
-
-static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
-{
-    return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size);
-}
-
-static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
-{
-    return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type);
-}
-
-static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
-{
-    return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type);
-}
-
-static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
-{
-    ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type);
-}
-
-static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
-{
-    ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type);
-}
-
-static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
-{
-    ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size);
-}
-
-static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
-{
-    ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size);
-}
-
-static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
-{
-    ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial);
-}
-
-static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
-{
-    spice_error("attempt to set header serial on mini header");
-}
-
-static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
-{
-    ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list);
-}
-
-static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
-{
-    spice_error("attempt to set header sub list on mini header");
-}
-
-static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
-                                                          full_header_set_msg_type,
-                                                          full_header_set_msg_size,
-                                                          full_header_set_msg_serial,
-                                                          full_header_set_msg_sub_list,
-                                                          full_header_get_msg_type,
-                                                          full_header_get_msg_size};
-
-static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
-                                                          mini_header_set_msg_type,
-                                                          mini_header_set_msg_size,
-                                                          mini_header_set_msg_serial,
-                                                          mini_header_set_msg_sub_list,
-                                                          mini_header_get_msg_type,
-                                                          mini_header_get_msg_size};
-
-/* return the number of bytes read. -1 in case of error */
-static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
-{
-    uint8_t *pos = buf;
-    while (size) {
-        int now;
-        if (stream->shutdown) {
-            return -1;
-        }
-        now = reds_stream_read(stream, pos, size);
-        if (now <= 0) {
-            if (now == 0) {
-                return -1;
-            }
-            spice_assert(now == -1);
-            if (errno == EAGAIN) {
-                break;
-            } else if (errno == EINTR) {
-                continue;
-            } else if (errno == EPIPE) {
-                return -1;
-            } else {
-                spice_printerr("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            size -= now;
-            pos += now;
-        }
-    }
-    return pos - buf;
-}
-
-// TODO: this implementation, as opposed to the old implementation in red_worker,
-// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
-// arithmetic for the case where a single cb_read could return multiple messages. But
-// this is suboptimal potentially. Profile and consider fixing.
-static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler)
-{
-    int bytes_read;
-    uint8_t *parsed;
-    size_t parsed_size;
-    message_destructor_t parsed_free;
-    uint16_t msg_type;
-    uint32_t msg_size;
-
-    /* XXX: This needs further investigation as to the underlying cause, it happened
-     * after spicec disconnect (but not with spice-gtk) repeatedly. */
-    if (!stream) {
-        return;
-    }
-
-    for (;;) {
-        int ret_handle;
-        if (handler->header_pos < handler->header.header_size) {
-            bytes_read = red_peer_receive(stream,
-                                          handler->header.data + handler->header_pos,
-                                          handler->header.header_size - handler->header_pos);
-            if (bytes_read == -1) {
-                handler->cb->on_error(handler->opaque);
-                return;
-            }
-            handler->cb->on_input(handler->opaque, bytes_read);
-            handler->header_pos += bytes_read;
-
-            if (handler->header_pos != handler->header.header_size) {
-                return;
-            }
-        }
-
-        msg_size = handler->header.get_msg_size(&handler->header);
-        msg_type = handler->header.get_msg_type(&handler->header);
-        if (handler->msg_pos < msg_size) {
-            if (!handler->msg) {
-                handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size);
-                if (handler->msg == NULL) {
-                    spice_printerr("ERROR: channel refused to allocate buffer.");
-                    handler->cb->on_error(handler->opaque);
-                    return;
-                }
-            }
-
-            bytes_read = red_peer_receive(stream,
-                                          handler->msg + handler->msg_pos,
-                                          msg_size - handler->msg_pos);
-            if (bytes_read == -1) {
-                handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
-                handler->cb->on_error(handler->opaque);
-                return;
-            }
-            handler->cb->on_input(handler->opaque, bytes_read);
-            handler->msg_pos += bytes_read;
-            if (handler->msg_pos != msg_size) {
-                return;
-            }
-        }
-
-        if (handler->cb->parser) {
-            parsed = handler->cb->parser(handler->msg,
-                handler->msg + msg_size, msg_type,
-                SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
-            if (parsed == NULL) {
-                spice_printerr("failed to parse message type %d", msg_type);
-                handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
-                handler->cb->on_error(handler->opaque);
-                return;
-            }
-            ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
-                                                    msg_type, parsed);
-            parsed_free(parsed);
-        } else {
-            ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size,
-                                                     handler->msg);
-        }
-        handler->msg_pos = 0;
-        handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
-        handler->msg = NULL;
-        handler->header_pos = 0;
-
-        if (!ret_handle) {
-            handler->cb->on_error(handler->opaque);
-            return;
-        }
-    }
-}
-
-void red_channel_client_receive(RedChannelClient *rcc)
-{
-    red_channel_client_ref(rcc);
-    red_peer_handle_incoming(rcc->stream, &rcc->incoming);
-    red_channel_client_unref(rcc);
-}
 
 void red_channel_receive(RedChannel *channel)
 {
     g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL);
 }
 
-static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
-{
-    ssize_t n;
-
-    if (!stream) {
-        return;
-    }
-
-    if (handler->size == 0) {
-        handler->vec = handler->vec_buf;
-        handler->size = handler->cb->get_msg_size(handler->opaque);
-        if (!handler->size) {  // nothing to be sent
-            return;
-        }
-    }
-
-    for (;;) {
-        handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
-        n = reds_stream_writev(stream, handler->vec, handler->vec_size);
-        if (n == -1) {
-            switch (errno) {
-            case EAGAIN:
-                handler->cb->on_block(handler->opaque);
-                return;
-            case EINTR:
-                continue;
-            case EPIPE:
-                handler->cb->on_error(handler->opaque);
-                return;
-            default:
-                spice_printerr("%s", strerror(errno));
-                handler->cb->on_error(handler->opaque);
-                return;
-            }
-        } else {
-            handler->pos += n;
-            handler->cb->on_output(handler->opaque, n);
-            if (handler->pos == handler->size) { // finished writing data
-                /* reset handler before calling on_msg_done, since it
-                 * can trigger another call to red_peer_handle_outgoing (when
-                 * switching from the urgent marshaller to the main one */
-                handler->vec = handler->vec_buf;
-                handler->pos = 0;
-                handler->size = 0;
-                handler->cb->on_msg_done(handler->opaque);
-                return;
-            }
-        }
-    }
-}
-
-static void red_channel_client_on_output(void *opaque, int n)
-{
-    RedChannelClient *rcc = opaque;
-
-    if (rcc->connectivity_monitor.timer) {
-        rcc->connectivity_monitor.out_bytes += n;
-    }
-    stat_inc_counter(reds, rcc->channel->out_bytes_counter, n);
-}
-
-static void red_channel_client_on_input(void *opaque, int n)
-{
-    RedChannelClient *rcc = opaque;
-
-    if (rcc->connectivity_monitor.timer) {
-        rcc->connectivity_monitor.in_bytes += n;
-    }
-}
-
 static void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
 {
     red_channel_client_disconnect(rcc);
 }
 
-static int red_channel_client_get_out_msg_size(void *opaque)
-{
-    RedChannelClient *rcc = (RedChannelClient *)opaque;
-
-    return rcc->send_data.size;
-}
-
-static void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec,
-                                               int *vec_size, int pos)
-{
-    RedChannelClient *rcc = (RedChannelClient *)opaque;
-
-    *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller,
-                                            vec, IOV_MAX, pos);
-}
-
-static void red_channel_client_on_out_block(void *opaque)
-{
-    RedChannelClient *rcc = (RedChannelClient *)opaque;
-
-    rcc->send_data.blocked = TRUE;
-    rcc->channel->core->watch_update_mask(rcc->stream->watch,
-                                     SPICE_WATCH_EVENT_READ |
-                                     SPICE_WATCH_EVENT_WRITE);
-}
-
-static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
-{
-    return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller);
-}
-
-static void red_channel_client_reset_send_data(RedChannelClient *rcc)
-{
-    spice_marshaller_reset(rcc->send_data.marshaller);
-    rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller,
-                                                                rcc->send_data.header.header_size);
-    spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size);
-    rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0);
-    rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0);
-
-    /* Keeping the serial consecutive: resetting it if reset_send_data
-     * has been called before, but no message has been sent since then.
-     */
-    if (rcc->send_data.last_sent_serial != rcc->send_data.serial) {
-        spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1);
-        /*  When the urgent marshaller is active, the serial was incremented by
-         *  the call to reset_send_data that was made for the main marshaller.
-         *  The urgent msg receives this serial, and the main msg serial is
-         *  the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial)
-         *  should be 1 in this case*/
-        if (!red_channel_client_urgent_marshaller_is_active(rcc)) {
-            rcc->send_data.serial = rcc->send_data.last_sent_serial;
-        }
-    }
-    rcc->send_data.serial++;
-
-    if (!rcc->is_mini_header) {
-        spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller);
-        rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0);
-        rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
-    }
-}
-
-void red_channel_client_push_set_ack(RedChannelClient *rcc)
-{
-    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK);
-}
-
-static void red_channel_client_send_set_ack(RedChannelClient *rcc)
-{
-    SpiceMsgSetAck ack;
-
-    spice_assert(rcc);
-    red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL);
-    ack.generation = ++rcc->ack_data.generation;
-    ack.window = rcc->ack_data.client_window;
-    rcc->ack_data.messages_window = 0;
-
-    spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack);
-
-    red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_migrate(RedChannelClient *rcc)
-{
-    SpiceMsgMigrate migrate;
-
-    red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL);
-    migrate.flags = rcc->channel->migration_flags;
-    spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate);
-    if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) {
-        rcc->wait_migrate_flush_mark = TRUE;
-    }
-
-    red_channel_client_begin_send_message(rcc);
-}
-
-
-static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base)
-{
-    RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base);
-
-    red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL);
-    red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_ping(RedChannelClient *rcc)
-{
-    SpiceMsgPing ping;
-
-    if (!rcc->latency_monitor.warmup_was_sent) { // latency test start
-        int delay_val;
-        socklen_t opt_size = sizeof(delay_val);
-
-        rcc->latency_monitor.warmup_was_sent = TRUE;
-        /*
-         * When testing latency, TCP_NODELAY must be switched on, otherwise,
-         * sending the ping message is delayed by Nagle algorithm, and the
-         * roundtrip measurement is less accurate (bigger).
-         */
-        rcc->latency_monitor.tcp_nodelay = 1;
-        if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
-                       &opt_size) == -1) {
-            spice_warning("getsockopt failed, %s", strerror(errno));
-        }  else {
-            rcc->latency_monitor.tcp_nodelay = delay_val;
-            if (!delay_val) {
-                delay_val = 1;
-                if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
-                               sizeof(delay_val)) == -1) {
-                   if (errno != ENOTSUP) {
-                        spice_warning("setsockopt failed, %s", strerror(errno));
-                    }
-                }
-            }
-        }
-    }
-
-    red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL);
-    ping.id = rcc->latency_monitor.id;
-    ping.timestamp = spice_get_monotonic_time_ns();
-    spice_marshall_msg_ping(rcc->send_data.marshaller, &ping);
-    red_channel_client_begin_send_message(rcc);
-}
-
-static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item)
-{
-    spice_assert(red_channel_client_no_item_being_sent(rcc));
-    red_channel_client_reset_send_data(rcc);
-    switch (item->type) {
-        case RED_PIPE_ITEM_TYPE_SET_ACK:
-            red_channel_client_send_set_ack(rcc);
-            break;
-        case RED_PIPE_ITEM_TYPE_MIGRATE:
-            red_channel_client_send_migrate(rcc);
-            break;
-        case RED_PIPE_ITEM_TYPE_EMPTY_MSG:
-            red_channel_client_send_empty_msg(rcc, item);
-            break;
-        case RED_PIPE_ITEM_TYPE_PING:
-            red_channel_client_send_ping(rcc);
-            break;
-        case RED_PIPE_ITEM_TYPE_MARKER:
-            break;
-        default:
-            rcc->channel->channel_cbs.send_item(rcc, item);
-            break;
-    }
-    red_pipe_item_unref(item);
-}
-
-static inline void red_channel_client_release_sent_item(RedChannelClient *rcc)
-{
-    if (rcc->send_data.item) {
-        red_pipe_item_unref(rcc->send_data.item);
-        rcc->send_data.item = NULL;
-    }
-}
-
-static void red_channel_client_on_out_msg_done(void *opaque)
-{
-    RedChannelClient *rcc = (RedChannelClient *)opaque;
-    int fd;
-
-    rcc->send_data.size = 0;
-
-    if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) {
-        if (reds_stream_send_msgfd(rcc->stream, fd) < 0) {
-            perror("sendfd");
-            red_channel_client_disconnect(rcc);
-            if (fd != -1)
-                close(fd);
-            return;
-        }
-        if (fd != -1)
-            close(fd);
-    }
-
-    red_channel_client_release_sent_item(rcc);
-    if (rcc->send_data.blocked) {
-        rcc->send_data.blocked = FALSE;
-        rcc->channel->core->watch_update_mask(rcc->stream->watch,
-                                         SPICE_WATCH_EVENT_READ);
-    }
-
-    if (red_channel_client_urgent_marshaller_is_active(rcc)) {
-        red_channel_client_restore_main_sender(rcc);
-        spice_assert(rcc->send_data.header.data != NULL);
-        red_channel_client_begin_send_message(rcc);
-    } else {
-        if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) {
-            /* It is possible that the socket will become idle, so we may be able to test latency */
-            red_channel_client_restart_ping_timer(rcc);
-        }
-    }
-
-}
-
-static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item)
-{
-    rcc->pipe_size--;
-    ring_remove(&item->link);
-}
-
-static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
+void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
 {
     spice_assert(rcc);
     channel->clients = g_list_prepend(channel->clients, rcc);
 }
 
-static void red_channel_client_set_remote_caps(RedChannelClient* rcc,
-                                               int num_common_caps, uint32_t *common_caps,
-                                               int num_caps, uint32_t *caps)
-{
-    rcc->remote_caps.num_common_caps = num_common_caps;
-    rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t));
-
-    rcc->remote_caps.num_caps = num_caps;
-    rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
-}
-
-static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc)
-{
-    rcc->remote_caps.num_common_caps = 0;
-    free(rcc->remote_caps.common_caps);
-    rcc->remote_caps.num_caps = 0;
-    free(rcc->remote_caps.caps);
-}
-
-int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap)
-{
-    return test_capability(rcc->remote_caps.common_caps,
-                           rcc->remote_caps.num_common_caps,
-                           cap);
-}
-
-int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap)
-{
-    return test_capability(rcc->remote_caps.caps,
-                           rcc->remote_caps.num_caps,
-                           cap);
-}
-
 int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap)
 {
     GList *link, *next;
@@ -699,230 +107,8 @@  int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap)
     return TRUE;
 }
 
-static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient  *client)
-{
-    if (red_client_get_channel(client, channel->type, channel->id)) {
-        spice_printerr("Error client %p: duplicate channel type %d id %d",
-                       client, channel->type, channel->id);
-        return FALSE;
-    }
-    return TRUE;
-}
-
-static void red_channel_client_push_ping(RedChannelClient *rcc)
-{
-    spice_assert(rcc->latency_monitor.state == PING_STATE_NONE);
-    rcc->latency_monitor.state = PING_STATE_WARMUP;
-    rcc->latency_monitor.warmup_was_sent = FALSE;
-    rcc->latency_monitor.id = rand();
-    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
-    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING);
-}
-
-static void red_channel_client_ping_timer(void *opaque)
-{
-    RedChannelClient *rcc = opaque;
-
-    spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER);
-    red_channel_client_cancel_ping_timer(rcc);
-
-#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */
-    {
-        int so_unsent_size = 0;
-
-        /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */
-        if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) {
-            spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno));
-        }
-        if (so_unsent_size > 0) {
-            /* tcp snd buffer is still occupied. rescheduling ping */
-            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
-        } else {
-            red_channel_client_push_ping(rcc);
-        }
-    }
-#else /* ifdef HAVE_LINUX_SOCKIOS_H */
-    /* More portable alternative code path (less accurate but avoids bogus ioctls)*/
-    red_channel_client_push_ping(rcc);
-#endif /* ifdef HAVE_LINUX_SOCKIOS_H */
-}
-
-/*
- * When a connection is not alive (and we can't detect it via a socket error), we
- * reach one of these 2 states:
- * (1) Sending msgs is blocked: either writes return EAGAIN
- *     or we are missing MSGC_ACK from the client.
- * (2) MSG_PING was sent without receiving a MSGC_PONG in reply.
- *
- * The connectivity_timer callback tests if the channel's state matches one of the above.
- * In case it does, on the next time the timer is called, it checks if the connection has
- * been idle during the time that passed since the previous timer call. If the connection
- * has been idle, we consider the client as disconnected.
- */
-static void red_channel_client_connectivity_timer(void *opaque)
-{
-    RedChannelClient *rcc = opaque;
-    RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor;
-    int is_alive = TRUE;
-
-    if (monitor->state == CONNECTIVITY_STATE_BLOCKED) {
-        if (monitor->in_bytes == 0 && monitor->out_bytes == 0) {
-            if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) {
-                spice_error("mismatch between rcc-state and connectivity-state");
-            }
-            spice_debug("rcc is blocked; connection is idle");
-            is_alive = FALSE;
-        }
-    } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) {
-        if (monitor->in_bytes == 0) {
-            if (rcc->latency_monitor.state != PING_STATE_WARMUP &&
-                rcc->latency_monitor.state != PING_STATE_LATENCY) {
-                spice_error("mismatch between rcc-state and connectivity-state");
-            }
-            spice_debug("rcc waits for pong; connection is idle");
-            is_alive = FALSE;
-        }
-    }
-
-    if (is_alive) {
-        monitor->in_bytes = 0;
-        monitor->out_bytes = 0;
-        if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) {
-            monitor->state = CONNECTIVITY_STATE_BLOCKED;
-        } else if (rcc->latency_monitor.state == PING_STATE_WARMUP ||
-                   rcc->latency_monitor.state == PING_STATE_LATENCY) {
-            monitor->state = CONNECTIVITY_STATE_WAIT_PONG;
-        } else {
-             monitor->state = CONNECTIVITY_STATE_CONNECTED;
-        }
-        rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
-                                        rcc->connectivity_monitor.timeout);
-    } else {
-        monitor->state = CONNECTIVITY_STATE_DISCONNECTED;
-        spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting",
-                      rcc, rcc->channel->type, rcc->channel->id, monitor->timeout);
-        red_channel_client_disconnect(rcc);
-    }
-}
-
-void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms)
-{
-    if (!red_channel_client_is_connected(rcc)) {
-        return;
-    }
-    spice_debug(NULL);
-    spice_assert(timeout_ms > 0);
-    /*
-     * If latency_monitor is not active, we activate it in order to enable
-     * periodic ping messages so that we will be be able to identify a disconnected
-     * channel-client even if there are no ongoing channel specific messages
-     * on this channel.
-     */
-    if (rcc->latency_monitor.timer == NULL) {
-        rcc->latency_monitor.timer = rcc->channel->core->timer_add(
-            rcc->channel->core, red_channel_client_ping_timer, rcc);
-        if (!red_client_during_migrate_at_target(rcc->client)) {
-            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
-        }
-        rcc->latency_monitor.roundtrip = -1;
-    }
-    if (rcc->connectivity_monitor.timer == NULL) {
-        rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED;
-        rcc->connectivity_monitor.timer = rcc->channel->core->timer_add(
-            rcc->channel->core, red_channel_client_connectivity_timer, rcc);
-        rcc->connectivity_monitor.timeout = timeout_ms;
-        if (!red_client_during_migrate_at_target(rcc->client)) {
-           rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
-                                           rcc->connectivity_monitor.timeout);
-        }
-    }
-}
-
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient  *client,
-                                            RedsStream *stream,
-                                            int monitor_latency,
-                                            int num_common_caps, uint32_t *common_caps,
-                                            int num_caps, uint32_t *caps)
-{
-    RedChannelClient *rcc = NULL;
-
-    pthread_mutex_lock(&client->lock);
-    if (!red_channel_client_pre_create_validate(channel, client)) {
-        goto error;
-    }
-    spice_assert(stream && channel && size >= sizeof(RedChannelClient));
-    rcc = spice_malloc0(size);
-    rcc->stream = stream;
-    rcc->channel = channel;
-    rcc->client = client;
-    rcc->refs = 1;
-    rcc->ack_data.messages_window = ~0;  // blocks send message (maybe use send_data.blocked +
-                                             // block flags)
-    rcc->ack_data.client_generation = ~0;
-    rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
-    rcc->send_data.main.marshaller = spice_marshaller_new();
-    rcc->send_data.urgent.marshaller = spice_marshaller_new();
-
-    rcc->send_data.marshaller = rcc->send_data.main.marshaller;
-
-    rcc->incoming.opaque = rcc;
-    rcc->incoming.cb = &channel->incoming_cb;
-
-    rcc->outgoing.opaque = rcc;
-    rcc->outgoing.cb = &channel->outgoing_cb;
-    rcc->outgoing.pos = 0;
-    rcc->outgoing.size = 0;
-
-    red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
-    if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
-        rcc->incoming.header = mini_header_wrapper;
-        rcc->send_data.header = mini_header_wrapper;
-        rcc->is_mini_header = TRUE;
-    } else {
-        rcc->incoming.header = full_header_wrapper;
-        rcc->send_data.header = full_header_wrapper;
-        rcc->is_mini_header = FALSE;
-    }
-
-    rcc->incoming.header.data = rcc->incoming.header_buf;
-    rcc->incoming.serial = 1;
-
-    if (!channel->channel_cbs.config_socket(rcc)) {
-        goto error;
-    }
-
-    ring_init(&rcc->pipe);
-    rcc->pipe_size = 0;
-
-    stream->watch = channel->core->watch_add(channel->core,
-                                           stream->socket,
-                                           SPICE_WATCH_EVENT_READ,
-                                           red_channel_client_event, rcc);
-    rcc->id = g_list_length(channel->clients);
-    red_channel_add_client(channel, rcc);
-    red_client_add_channel(client, rcc);
-    red_channel_ref(channel);
-    pthread_mutex_unlock(&client->lock);
-
-    if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) {
-        rcc->latency_monitor.timer = channel->core->timer_add(
-            channel->core, red_channel_client_ping_timer, rcc);
-        if (!client->during_target_migrate) {
-            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
-        }
-        rcc->latency_monitor.roundtrip = -1;
-    }
-
-    return rcc;
-error:
-    free(rcc);
-    reds_stream_free(stream);
-    pthread_mutex_unlock(&client->lock);
-    return NULL;
-}
-
 /* returns TRUE If all channels are finished migrating, FALSE otherwise */
-static gboolean red_client_seamless_migration_done_for_channel(RedClient *client)
+gboolean red_client_seamless_migration_done_for_channel(RedClient *client)
 {
     gboolean ret = FALSE;
 
@@ -944,26 +130,6 @@  static gboolean red_client_seamless_migration_done_for_channel(RedClient *client
     return ret;
 }
 
-static void red_channel_client_seamless_migration_done(RedChannelClient *rcc)
-{
-    rcc->wait_migrate_data = FALSE;
-
-    if (red_client_seamless_migration_done_for_channel(rcc->client)) {
-        if (rcc->latency_monitor.timer) {
-            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
-        }
-        if (rcc->connectivity_monitor.timer) {
-            rcc->channel->core->timer_start(rcc->connectivity_monitor.timer,
-                                            rcc->connectivity_monitor.timeout);
-        }
-    }
-}
-
-int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc)
-{
-    return rcc->wait_migrate_data;
-}
-
 int red_channel_is_waiting_for_migrate_data(RedChannel *channel)
 {
     RedChannelClient *rcc;
@@ -995,20 +161,6 @@  static void red_channel_client_default_disconnect(RedChannelClient *base)
     red_channel_client_disconnect(base);
 }
 
-void red_channel_client_default_migrate(RedChannelClient *rcc)
-{
-    if (rcc->latency_monitor.timer) {
-        red_channel_client_cancel_ping_timer(rcc);
-        rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
-        rcc->latency_monitor.timer = NULL;
-    }
-    if (rcc->connectivity_monitor.timer) {
-       rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
-        rcc->connectivity_monitor.timer = NULL;
-    }
-    red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE);
-}
-
 RedChannel *red_channel_create(int size,
                                RedsState *reds,
                                const SpiceCoreInterfaceInternal *core,
@@ -1113,583 +265,151 @@  RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, u
     spice_debug("channel type %d id %d thread_id 0x%lx",
                 channel->type, channel->id, channel->thread_id);
 
-    channel->out_bytes_counter = 0;
-
-    return channel;
-}
-
-static int do_nothing_handle_message(RedChannelClient *rcc,
-                                     uint16_t type,
-                                     uint32_t size,
-                                     uint8_t *msg)
-{
-    return TRUE;
-}
-
-RedChannel *red_channel_create_parser(int size,
-                                      RedsState *reds,
-                                      const SpiceCoreInterfaceInternal *core,
-                                      uint32_t type, uint32_t id,
-                                      int handle_acks,
-                                      spice_parse_channel_func_t parser,
-                                      channel_handle_parsed_proc handle_parsed,
-                                      const ChannelCbs *channel_cbs,
-                                      uint32_t migration_flags)
-{
-    RedChannel *channel = red_channel_create(size, reds, core, type, id,
-                                             handle_acks,
-                                             do_nothing_handle_message,
-                                             channel_cbs,
-                                             migration_flags);
-
-    if (channel == NULL) {
-        return NULL;
-    }
-    channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
-    channel->incoming_cb.parser = parser;
-
-    return channel;
-}
-
-void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat)
-{
-    spice_return_if_fail(channel != NULL);
-    spice_return_if_fail(channel->stat == 0);
-
-#ifdef RED_STATISTICS
-    channel->stat = stat;
-    channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE);
-#endif
-}
-
-void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data)
-{
-    spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN);
-    channel->client_cbs.connect = client_cbs->connect;
-
-    if (client_cbs->disconnect) {
-        channel->client_cbs.disconnect = client_cbs->disconnect;
-    }
-
-    if (client_cbs->migrate) {
-        channel->client_cbs.migrate = client_cbs->migrate;
-    }
-    channel->data = cbs_data;
-}
-
-int test_capability(const uint32_t *caps, int num_caps, uint32_t cap)
-{
-    uint32_t index = cap / 32;
-    if (num_caps < index + 1) {
-        return FALSE;
-    }
-
-    return (caps[index] & (1 << (cap % 32))) != 0;
-}
-
-static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap)
-{
-    int nbefore, n;
-
-    nbefore = *num_caps;
-    n = cap / 32;
-    *num_caps = MAX(*num_caps, n + 1);
-    *caps = spice_renew(uint32_t, *caps, *num_caps);
-    memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t));
-    (*caps)[n] |= (1 << (cap % 32));
-}
-
-void red_channel_set_common_cap(RedChannel *channel, uint32_t cap)
-{
-    add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap);
-}
-
-void red_channel_set_cap(RedChannel *channel, uint32_t cap)
-{
-    add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap);
-}
-
-static void red_channel_ref(RedChannel *channel)
-{
-    channel->refs++;
-}
-
-static void red_channel_unref(RedChannel *channel)
-{
-    if (!--channel->refs) {
-        if (channel->local_caps.num_common_caps) {
-            free(channel->local_caps.common_caps);
-        }
-
-        if (channel->local_caps.num_caps) {
-            free(channel->local_caps.caps);
-        }
-
-        free(channel);
-    }
-}
-
-void red_channel_client_ref(RedChannelClient *rcc)
-{
-    rcc->refs++;
-}
-
-void red_channel_client_unref(RedChannelClient *rcc)
-{
-    if (!--rcc->refs) {
-        spice_debug("destroy rcc=%p", rcc);
-
-        reds_stream_free(rcc->stream);
-        rcc->stream = NULL;
-
-        if (rcc->send_data.main.marshaller) {
-            spice_marshaller_destroy(rcc->send_data.main.marshaller);
-        }
-
-        if (rcc->send_data.urgent.marshaller) {
-            spice_marshaller_destroy(rcc->send_data.urgent.marshaller);
-        }
-
-        red_channel_client_destroy_remote_caps(rcc);
-        if (rcc->channel) {
-            red_channel_unref(rcc->channel);
-        }
-        free(rcc);
-    }
-}
-
-void red_channel_client_destroy(RedChannelClient *rcc)
-{
-    rcc->destroying = 1;
-    red_channel_client_disconnect(rcc);
-    red_client_remove_channel(rcc);
-    red_channel_client_unref(rcc);
-}
-
-void red_channel_destroy(RedChannel *channel)
-{
-    if (!channel) {
-        return;
-    }
-
-    g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL);
-    red_channel_unref(channel);
-}
-
-void red_channel_client_shutdown(RedChannelClient *rcc)
-{
-    if (rcc->stream && !rcc->stream->shutdown) {
-        rcc->channel->core->watch_remove(rcc->stream->watch);
-        rcc->stream->watch = NULL;
-        shutdown(rcc->stream->socket, SHUT_RDWR);
-        rcc->stream->shutdown = TRUE;
-    }
-}
-
-void red_channel_client_send(RedChannelClient *rcc)
-{
-    red_channel_client_ref(rcc);
-    red_peer_handle_outgoing(rcc->stream, &rcc->outgoing);
-    red_channel_client_unref(rcc);
-}
-
-void red_channel_send(RedChannel *channel)
-{
-    g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL);
-}
-
-static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc)
-{
-    return (rcc->channel->handle_acks &&
-            (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2));
-}
-
-static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc)
-{
-    RedPipeItem *item;
-
-    if (!rcc || rcc->send_data.blocked
-             || red_channel_client_waiting_for_ack(rcc)
-             || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) {
-        return NULL;
-    }
-    red_channel_client_pipe_remove(rcc, item);
-    return item;
-}
-
-void red_channel_client_push(RedChannelClient *rcc)
-{
-    RedPipeItem *pipe_item;
-
-    if (!rcc->during_send) {
-        rcc->during_send = TRUE;
-    } else {
-        return;
-    }
-    red_channel_client_ref(rcc);
-    if (rcc->send_data.blocked) {
-        red_channel_client_send(rcc);
-    }
-
-    if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) {
-        rcc->send_data.blocked = TRUE;
-        spice_printerr("ERROR: an item waiting to be sent and not blocked");
-    }
-
-    while ((pipe_item = red_channel_client_pipe_item_get(rcc))) {
-        red_channel_client_send_item(rcc, pipe_item);
-    }
-    if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe)
-        && rcc->stream->watch) {
-        rcc->channel->core->watch_update_mask(rcc->stream->watch,
-                                              SPICE_WATCH_EVENT_READ);
-    }
-    rcc->during_send = FALSE;
-    red_channel_client_unref(rcc);
-}
-
-void red_channel_push(RedChannel *channel)
-{
-    if (!channel) {
-        return;
-    }
-
-    g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL);
-}
-
-int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc)
-{
-    if (rcc->latency_monitor.roundtrip < 0) {
-        return rcc->latency_monitor.roundtrip;
-    }
-    return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC;
-}
-
-static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
-{
-    rcc->ack_data.messages_window = 0;
-    red_channel_client_push(rcc);
-}
-
-// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
-// specific
-void red_channel_init_outgoing_messages_window(RedChannel *channel)
-{
-    g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL);
-}
-
-static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc)
-{
-    if (rcc->channel->channel_cbs.handle_migrate_flush_mark) {
-        rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc);
-    }
-}
-
-// TODO: the whole migration is broken with multiple clients. What do we want to do?
-// basically just
-//  1) source send mark to all
-//  2) source gets at various times the data (waits for all)
-//  3) source migrates to target
-//  4) target sends data to all
-// So need to make all the handlers work with per channel/client data (what data exactly?)
-static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message)
-{
-    spice_debug("channel type %d id %d rcc %p size %u",
-                rcc->channel->type, rcc->channel->id, rcc, size);
-    if (!rcc->channel->channel_cbs.handle_migrate_data) {
-        return;
-    }
-    if (!red_channel_client_is_waiting_for_migrate_data(rcc)) {
-        spice_channel_client_error(rcc, "unexpected");
-        return;
-    }
-    if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) {
-        red_channel_client_set_message_serial(rcc,
-            rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message));
-    }
-    if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) {
-        spice_channel_client_error(rcc, "handle_migrate_data failed");
-        return;
-    }
-    red_channel_client_seamless_migration_done(rcc);
-}
-
-static void red_channel_client_restart_ping_timer(RedChannelClient *rcc)
-{
-    uint64_t passed, timeout;
-
-    passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC;
-    timeout = PING_TEST_IDLE_NET_TIMEOUT_MS;
-    if (passed  < PING_TEST_TIMEOUT_MS) {
-        timeout += PING_TEST_TIMEOUT_MS - passed;
-    }
-
-    red_channel_client_start_ping_timer(rcc, timeout);
-}
-
-static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout)
-{
-    if (!rcc->latency_monitor.timer) {
-        return;
-    }
-    if (rcc->latency_monitor.state != PING_STATE_NONE) {
-        return;
-    }
-    rcc->latency_monitor.state = PING_STATE_TIMER;
-    rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout);
-}
-
-static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc)
-{
-    if (!rcc->latency_monitor.timer) {
-        return;
-    }
-    if (rcc->latency_monitor.state != PING_STATE_TIMER) {
-        return;
-    }
-
-    rcc->channel->core->timer_cancel(rcc->latency_monitor.timer);
-    rcc->latency_monitor.state = PING_STATE_NONE;
-}
-
-static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping)
-{
-    uint64_t now;
-
-    /* ignoring unexpected pongs, or post-migration pongs for pings that
-     * started just before migration */
-    if (ping->id != rcc->latency_monitor.id) {
-        spice_warning("ping-id (%u)!= pong-id %u",
-                      rcc->latency_monitor.id, ping->id);
-        return;
-    }
-
-    now = spice_get_monotonic_time_ns();
-
-    if (rcc->latency_monitor.state == PING_STATE_WARMUP) {
-        rcc->latency_monitor.state = PING_STATE_LATENCY;
-        return;
-    } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) {
-        spice_warning("unexpected");
-        return;
-    }
-
-    /* set TCP_NODELAY=0, in case we reverted it for the test*/
-    if (!rcc->latency_monitor.tcp_nodelay) {
-        int delay_val = 0;
-
-        if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
-                       sizeof(delay_val)) == -1) {
-            if (errno != ENOTSUP) {
-                spice_warning("setsockopt failed, %s", strerror(errno));
-            }
-        }
-    }
-
-    /*
-     * The real network latency shouldn't change during the connection. However,
-     *  the measurements can be bigger than the real roundtrip due to other
-     *  threads or processes that are utilizing the network. We update the roundtrip
-     *  measurement with the minimal value we encountered till now.
-     */
-    if (rcc->latency_monitor.roundtrip < 0 ||
-        now - ping->timestamp < rcc->latency_monitor.roundtrip) {
-        rcc->latency_monitor.roundtrip = now - ping->timestamp;
-        spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC);
-    }
+    channel->out_bytes_counter = 0;
 
-    rcc->latency_monitor.last_pong_time = now;
-    rcc->latency_monitor.state = PING_STATE_NONE;
-    red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS);
+    return channel;
 }
 
-int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
-                                      uint16_t type, void *message)
+static int do_nothing_handle_message(RedChannelClient *rcc,
+                                     uint16_t type,
+                                     uint32_t size,
+                                     uint8_t *msg)
 {
-    switch (type) {
-    case SPICE_MSGC_ACK_SYNC:
-        if (size != sizeof(uint32_t)) {
-            spice_printerr("bad message size");
-            return FALSE;
-        }
-        rcc->ack_data.client_generation = *(uint32_t *)(message);
-        break;
-    case SPICE_MSGC_ACK:
-        if (rcc->ack_data.client_generation == rcc->ack_data.generation) {
-            rcc->ack_data.messages_window -= rcc->ack_data.client_window;
-            red_channel_client_push(rcc);
-        }
-        break;
-    case SPICE_MSGC_DISCONNECTING:
-        break;
-    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
-        if (!rcc->wait_migrate_flush_mark) {
-            spice_error("unexpected flush mark");
-            return FALSE;
-        }
-        red_channel_handle_migrate_flush_mark(rcc);
-        rcc->wait_migrate_flush_mark = FALSE;
-        break;
-    case SPICE_MSGC_MIGRATE_DATA:
-        red_channel_handle_migrate_data(rcc, size, message);
-        break;
-    case SPICE_MSGC_PONG:
-        red_channel_client_handle_pong(rcc, message);
-        break;
-    default:
-        spice_printerr("invalid message type %u", type);
-        return FALSE;
-    }
     return TRUE;
 }
 
-static void red_channel_client_event(int fd, int event, void *data)
+RedChannel *red_channel_create_parser(int size,
+                                      RedsState *reds,
+                                      const SpiceCoreInterfaceInternal *core,
+                                      uint32_t type, uint32_t id,
+                                      int handle_acks,
+                                      spice_parse_channel_func_t parser,
+                                      channel_handle_parsed_proc handle_parsed,
+                                      const ChannelCbs *channel_cbs,
+                                      uint32_t migration_flags)
 {
-    RedChannelClient *rcc = (RedChannelClient *)data;
+    RedChannel *channel = red_channel_create(size, reds, core, type, id,
+                                             handle_acks,
+                                             do_nothing_handle_message,
+                                             channel_cbs,
+                                             migration_flags);
 
-    red_channel_client_ref(rcc);
-    if (event & SPICE_WATCH_EVENT_READ) {
-        red_channel_client_receive(rcc);
-    }
-    if (event & SPICE_WATCH_EVENT_WRITE) {
-        red_channel_client_push(rcc);
+    if (channel == NULL) {
+        return NULL;
     }
-    red_channel_client_unref(rcc);
+    channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
+    channel->incoming_cb.parser = parser;
+
+    return channel;
 }
 
-void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item)
+void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat)
 {
-    spice_assert(red_channel_client_no_item_being_sent(rcc));
-    spice_assert(msg_type != 0);
-    rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type);
-    rcc->send_data.item = item;
-    if (item) {
-        red_pipe_item_ref(item);
-    }
+    spice_return_if_fail(channel != NULL);
+    spice_return_if_fail(channel->stat == 0);
+
+#ifdef RED_STATISTICS
+    channel->stat = stat;
+    channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE);
+#endif
 }
 
-void red_channel_client_begin_send_message(RedChannelClient *rcc)
+void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data)
 {
-    SpiceMarshaller *m = rcc->send_data.marshaller;
+    spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN);
+    channel->client_cbs.connect = client_cbs->connect;
 
-    // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
-    if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) {
-        spice_printerr("BUG: header->type == 0");
-        return;
+    if (client_cbs->disconnect) {
+        channel->client_cbs.disconnect = client_cbs->disconnect;
     }
 
-    /* canceling the latency test timer till the nework is idle */
-    red_channel_client_cancel_ping_timer(rcc);
-
-    spice_marshaller_flush(m);
-    rcc->send_data.size = spice_marshaller_get_total_size(m);
-    rcc->send_data.header.set_msg_size(&rcc->send_data.header,
-                                       rcc->send_data.size - rcc->send_data.header.header_size);
-    rcc->ack_data.messages_window++;
-    rcc->send_data.last_sent_serial = rcc->send_data.serial;
-    rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
-    red_channel_client_send(rcc);
+    if (client_cbs->migrate) {
+        channel->client_cbs.migrate = client_cbs->migrate;
+    }
+    channel->data = cbs_data;
 }
 
-SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc)
+int test_capability(const uint32_t *caps, int num_caps, uint32_t cap)
 {
-    spice_assert(red_channel_client_no_item_being_sent(rcc));
-    spice_assert(rcc->send_data.header.data != NULL);
-    rcc->send_data.main.header_data = rcc->send_data.header.data;
-    rcc->send_data.main.item = rcc->send_data.item;
-
-    rcc->send_data.marshaller = rcc->send_data.urgent.marshaller;
-    rcc->send_data.item = NULL;
-    red_channel_client_reset_send_data(rcc);
-    return rcc->send_data.marshaller;
+    uint32_t index = cap / 32;
+    if (num_caps < index + 1) {
+        return FALSE;
+    }
+
+    return (caps[index] & (1 << (cap % 32))) != 0;
 }
 
-static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
+static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap)
 {
-    spice_marshaller_reset(rcc->send_data.urgent.marshaller);
-    rcc->send_data.marshaller = rcc->send_data.main.marshaller;
-    rcc->send_data.header.data = rcc->send_data.main.header_data;
-    if (!rcc->is_mini_header) {
-        rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
-    }
-    rcc->send_data.item = rcc->send_data.main.item;
+    int nbefore, n;
+
+    nbefore = *num_caps;
+    n = cap / 32;
+    *num_caps = MAX(*num_caps, n + 1);
+    *caps = spice_renew(uint32_t, *caps, *num_caps);
+    memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t));
+    (*caps)[n] |= (1 << (cap % 32));
 }
 
-uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
+void red_channel_set_common_cap(RedChannel *channel, uint32_t cap)
 {
-    return rcc->send_data.serial;
+    add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap);
 }
 
-void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial)
+void red_channel_set_cap(RedChannel *channel, uint32_t cap)
 {
-    rcc->send_data.last_sent_serial = serial;
-    rcc->send_data.serial = serial;
+    add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap);
 }
 
-static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos)
+void red_channel_ref(RedChannel *channel)
 {
-    spice_assert(rcc && item);
-    if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) {
-        spice_debug("rcc is disconnected %p", rcc);
-        red_pipe_item_unref(item);
-        return FALSE;
-    }
-    if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) {
-        rcc->channel->core->watch_update_mask(rcc->stream->watch,
-                                         SPICE_WATCH_EVENT_READ |
-                                         SPICE_WATCH_EVENT_WRITE);
-    }
-    rcc->pipe_size++;
-    ring_add(pos, &item->link);
-    return TRUE;
+    channel->refs++;
 }
 
-void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item)
+void red_channel_unref(RedChannel *channel)
 {
+    if (--channel->refs == 0) {
+        if (channel->local_caps.num_common_caps) {
+            free(channel->local_caps.common_caps);
+        }
 
-    client_pipe_add(rcc, item, &rcc->pipe);
-}
+        if (channel->local_caps.num_caps) {
+            free(channel->local_caps.caps);
+        }
 
-void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item)
-{
-    red_channel_client_pipe_add(rcc, item);
-    red_channel_client_push(rcc);
+        free(channel);
+    }
 }
 
-void red_channel_client_pipe_add_after(RedChannelClient *rcc,
-                                       RedPipeItem *item,
-                                       RedPipeItem *pos)
+void red_channel_destroy(RedChannel *channel)
 {
-    spice_assert(pos);
-    client_pipe_add(rcc, item, &pos->link);
-}
+    if (!channel) {
+        return;
+    }
 
-int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
-                                           RedPipeItem *item)
-{
-    return ring_item_is_linked(&item->link);
+    g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL);
+    red_channel_unref(channel);
 }
 
-static void red_channel_client_pipe_add_tail(RedChannelClient *rcc,
-                                             RedPipeItem *item)
+void red_channel_send(RedChannel *channel)
 {
-    client_pipe_add(rcc, item, rcc->pipe.prev);
+    g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL);
 }
 
-void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item)
+void red_channel_push(RedChannel *channel)
 {
-    if (client_pipe_add(rcc, item, rcc->pipe.prev)) {
-        red_channel_client_push(rcc);
+    if (!channel) {
+        return;
     }
+
+    g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL);
 }
 
-void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)
+// TODO: this function doesn't make sense because the window should be client (WAN/LAN)
+// specific
+void red_channel_init_outgoing_messages_window(RedChannel *channel)
 {
-    RedPipeItem *item = spice_new(RedPipeItem, 1);
-
-    red_pipe_item_init(item, pipe_item_type);
-    red_channel_client_pipe_add(rcc, item);
-    red_channel_client_push(rcc);
+    g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL);
 }
 
 static void red_channel_client_pipe_add_type_proxy(gpointer data, gpointer user_data)
@@ -1704,16 +424,6 @@  void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type)
                    GINT_TO_POINTER(pipe_item_type));
 }
 
-void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type)
-{
-    RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1);
-
-    red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG);
-    item->msg = msg_type;
-    red_channel_client_pipe_add(rcc, &item->base);
-    red_channel_client_push(rcc);
-}
-
 static void red_channel_client_pipe_add_empty_msg_proxy(gpointer data, gpointer user_data)
 {
     int type = GPOINTER_TO_INT(user_data);
@@ -1725,117 +435,38 @@  void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type)
     g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type));
 }
 
-int red_channel_client_is_connected(RedChannelClient *rcc)
-{
-    if (!rcc->dummy) {
-        return rcc->channel
-            && (g_list_find(rcc->channel->clients, rcc) != NULL);
-    } else {
-        return rcc->dummy_connected;
-    }
-}
-
 int red_channel_is_connected(RedChannel *channel)
 {
     return channel && channel->clients;
 }
 
-static void red_channel_client_clear_sent_item(RedChannelClient *rcc)
-{
-    red_channel_client_release_sent_item(rcc);
-    rcc->send_data.blocked = FALSE;
-    rcc->send_data.size = 0;
-}
-
-void red_channel_client_pipe_clear(RedChannelClient *rcc)
-{
-    RedPipeItem *item;
-
-    if (rcc) {
-        red_channel_client_clear_sent_item(rcc);
-    }
-    while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) {
-        ring_remove(&item->link);
-        red_pipe_item_unref(item);
-    }
-    rcc->pipe_size = 0;
-}
-
-void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
-{
-    rcc->ack_data.messages_window = 0;
-}
-
-void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window)
-{
-    rcc->ack_data.client_window = client_window;
-}
-
-static void red_channel_remove_client(RedChannelClient *rcc)
+void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc)
 {
     GList *link;
+    g_return_if_fail(channel == red_channel_client_get_channel(rcc));
 
-    if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) {
+    if (!pthread_equal(pthread_self(), channel->thread_id)) {
         spice_warning("channel type %d id %d - "
                       "channel->thread_id (0x%lx) != pthread_self (0x%lx)."
                       "If one of the threads is != io-thread && != vcpu-thread, "
                       "this might be a BUG",
-                      rcc->channel->type, rcc->channel->id,
-                      rcc->channel->thread_id, pthread_self());
+                      channel->type, channel->id,
+                      channel->thread_id, pthread_self());
     }
-    spice_return_if_fail(rcc->channel);
-    link = g_list_find(rcc->channel->clients, rcc);
+    spice_return_if_fail(channel);
+    link = g_list_find(channel->clients, rcc);
     spice_return_if_fail(link != NULL);
 
-    rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link);
+    channel->clients = g_list_remove_link(channel->clients, link);
     // TODO: should we set rcc->channel to NULL???
 }
 
-static void red_client_remove_channel(RedChannelClient *rcc)
-{
-    pthread_mutex_lock(&rcc->client->lock);
-    rcc->client->channels = g_list_remove(rcc->client->channels, rcc);
-    pthread_mutex_unlock(&rcc->client->lock);
-}
-
-static void red_channel_client_disconnect_dummy(RedChannelClient *rcc)
-{
-    GList *link;
-    spice_assert(rcc->dummy);
-    if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) {
-        spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel,
-                       rcc->channel->type, rcc->channel->id);
-        red_channel_remove_client(link->data);
-    }
-    rcc->dummy_connected = FALSE;
-}
-
-void red_channel_client_disconnect(RedChannelClient *rcc)
+void red_client_remove_channel(RedChannelClient *rcc)
 {
-    if (rcc->dummy) {
-        red_channel_client_disconnect_dummy(rcc);
-        return;
-    }
-    if (!red_channel_client_is_connected(rcc)) {
-        return;
-    }
-    spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel,
-                   rcc->channel->type, rcc->channel->id);
-    red_channel_client_pipe_clear(rcc);
-    if (rcc->stream->watch) {
-        rcc->channel->core->watch_remove(rcc->stream->watch);
-        rcc->stream->watch = NULL;
-    }
-    if (rcc->latency_monitor.timer) {
-        rcc->channel->core->timer_remove(rcc->latency_monitor.timer);
-        rcc->latency_monitor.timer = NULL;
-    }
-    if (rcc->connectivity_monitor.timer) {
-        rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer);
-        rcc->connectivity_monitor.timer = NULL;
-    }
-    red_channel_remove_client(rcc);
-    rcc->channel->channel_cbs.on_disconnect(rcc);
+    RedClient *client = red_channel_client_get_client(rcc);
+    pthread_mutex_lock(&client->lock);
+    client->channels = g_list_remove(client->channels, rcc);
+    pthread_mutex_unlock(&client->lock);
 }
 
 void red_channel_disconnect(RedChannel *channel)
@@ -1843,51 +474,6 @@  void red_channel_disconnect(RedChannel *channel)
     g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL);
 }
 
-RedChannelClient *red_channel_client_create_dummy(int size,
-                                                  RedChannel *channel,
-                                                  RedClient  *client,
-                                                  int num_common_caps, uint32_t *common_caps,
-                                                  int num_caps, uint32_t *caps)
-{
-    RedChannelClient *rcc = NULL;
-
-    spice_assert(size >= sizeof(RedChannelClient));
-
-    pthread_mutex_lock(&client->lock);
-    if (!red_channel_client_pre_create_validate(channel, client)) {
-        goto error;
-    }
-    rcc = spice_malloc0(size);
-    rcc->refs = 1;
-    rcc->client = client;
-    rcc->channel = channel;
-    red_channel_ref(channel);
-    red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
-    if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
-        rcc->incoming.header = mini_header_wrapper;
-        rcc->send_data.header = mini_header_wrapper;
-        rcc->is_mini_header = TRUE;
-    } else {
-        rcc->incoming.header = full_header_wrapper;
-        rcc->send_data.header = full_header_wrapper;
-        rcc->is_mini_header = FALSE;
-    }
-
-    rcc->incoming.header.data = rcc->incoming.header_buf;
-    rcc->incoming.serial = 1;
-    ring_init(&rcc->pipe);
-
-    rcc->dummy = TRUE;
-    rcc->dummy_connected = TRUE;
-    red_channel_add_client(channel, rcc);
-    red_client_add_channel(client, rcc);
-    pthread_mutex_unlock(&client->lock);
-    return rcc;
-error:
-    pthread_mutex_unlock(&client->lock);
-    return NULL;
-}
-
 void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
 {
     g_list_foreach(channel->clients, (GFunc)cb, NULL);
@@ -1908,7 +494,7 @@  int red_channel_all_blocked(RedChannel *channel)
     }
     for (link = channel->clients; link != NULL; link = link->next) {
         rcc = link->data;
-        if (!rcc->send_data.blocked) {
+        if (!red_channel_client_is_blocked(rcc)) {
             return FALSE;
         }
     }
@@ -1921,56 +507,25 @@  int red_channel_any_blocked(RedChannel *channel)
     RedChannelClient *rcc;
 
     FOREACH_CLIENT(channel, link, next, rcc) {
-        if (rcc->send_data.blocked) {
+        if (red_channel_client_is_blocked(rcc)) {
             return TRUE;
         }
     }
     return FALSE;
 }
 
-int red_channel_client_blocked(RedChannelClient *rcc)
-{
-    return rcc && rcc->send_data.blocked;
-}
-
-int red_channel_client_send_message_pending(RedChannelClient *rcc)
-{
-    return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0;
-}
-
-/* accessors for RedChannelClient */
-SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc)
-{
-    return rcc->send_data.marshaller;
-}
-
-RedsStream *red_channel_client_get_stream(RedChannelClient *rcc)
-{
-    return rcc->stream;
-}
-
-RedClient *red_channel_client_get_client(RedChannelClient *rcc)
-{
-    return rcc->client;
-}
-
-void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list)
-{
-    rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list);
-}
-
-/* end of accessors */
-
 int red_channel_get_first_socket(RedChannel *channel)
 {
     RedChannelClient *rcc;
+    RedsStream *stream;
 
     if (!channel || !channel->clients) {
         return -1;
     }
     rcc = g_list_nth_data(channel->clients, 0);
+    stream = red_channel_client_get_stream(rcc);
 
-    return rcc->stream->socket;
+    return stream->socket;
 }
 
 int red_channel_no_item_being_sent(RedChannel *channel)
@@ -1986,18 +541,6 @@  int red_channel_no_item_being_sent(RedChannel *channel)
     return TRUE;
 }
 
-int red_channel_client_no_item_being_sent(RedChannelClient *rcc)
-{
-    return !rcc || (rcc->send_data.size == 0);
-}
-
-void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc,
-                                                RedPipeItem *item)
-{
-    red_channel_client_pipe_remove(rcc, item);
-    red_pipe_item_unref(item);
-}
-
 /*
  * RedClient implementation - kept in red-channel.c because they are
  * pretty tied together.
@@ -2035,21 +578,6 @@  RedClient *red_client_unref(RedClient *client)
     return client;
 }
 
-/* client mutex should be locked before this call */
-static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc)
-{
-    gboolean ret = FALSE;
-
-    if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) {
-        rcc->wait_migrate_data = TRUE;
-        ret = TRUE;
-    }
-    spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc,
-        rcc->wait_migrate_data);
-
-    return ret;
-}
-
 void red_client_set_migration_seamless(RedClient *client) // dest
 {
     GList *link;
@@ -2069,6 +597,7 @@  void red_client_migrate(RedClient *client)
 {
     GList *link, *next;
     RedChannelClient *rcc;
+    RedChannel *channel;
 
     spice_printerr("migrate client with #channels %d", g_list_length(client->channels));
     if (!pthread_equal(pthread_self(), client->thread_id)) {
@@ -2081,8 +610,9 @@  void red_client_migrate(RedClient *client)
     while (link) {
         next = link->next;
         rcc = link->data;
+        channel = red_channel_client_get_channel(rcc);
         if (red_channel_client_is_connected(rcc)) {
-            rcc->channel->client_cbs.migrate(rcc);
+            channel->client_cbs.migrate(rcc);
         }
         link = next;
     }
@@ -2103,20 +633,21 @@  void red_client_destroy(RedClient *client)
     }
     link = client->channels;
     while (link) {
+        RedChannel *channel;
         next = link->next;
         // some channels may be in other threads, so disconnection
         // is not synchronous.
         rcc = link->data;
-        rcc->destroying = 1;
+        channel = red_channel_client_get_channel(rcc);
+        red_channel_client_set_destroying(rcc);
         // some channels may be in other threads. However we currently
         // assume disconnect is synchronous (we changed the dispatcher
         // to wait for disconnection)
         // TODO: should we go back to async. For this we need to use
         // ref count for channel clients.
-        rcc->channel->client_cbs.disconnect(rcc);
-        spice_assert(ring_is_empty(&rcc->pipe));
-        spice_assert(rcc->pipe_size == 0);
-        spice_assert(rcc->send_data.size == 0);
+        channel->client_cbs.disconnect(rcc);
+        spice_assert(red_channel_client_pipe_is_empty(rcc));
+        spice_assert(red_channel_client_no_item_being_sent(rcc));
         red_channel_client_destroy(rcc);
         link = next;
     }
@@ -2124,15 +655,17 @@  void red_client_destroy(RedClient *client)
 }
 
 /* client->lock should be locked */
-static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id)
+RedChannelClient *red_client_get_channel(RedClient *client, int type, int id)
 {
     GList *link;
     RedChannelClient *rcc;
     RedChannelClient *ret = NULL;
 
     for (link = client->channels; link != NULL; link = link->next) {
+        RedChannel *channel;
         rcc = link->data;
-        if (rcc->channel->type == type && rcc->channel->id == id) {
+        channel = red_channel_client_get_channel(rcc);
+        if (channel->type == type && channel->id == id) {
             ret = rcc;
             break;
         }
@@ -2141,7 +674,7 @@  static RedChannelClient *red_client_get_channel(RedClient *client, int type, int
 }
 
 /* client->lock should be locked */
-static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
+void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
 {
     spice_assert(rcc && client);
     client->channels = g_list_prepend(client->channels, rcc);
@@ -2173,11 +706,7 @@  void red_client_semi_seamless_migrate_complete(RedClient *client)
     link = client->channels;
     while (link) {
         next = link->next;
-        RedChannelClient *rcc = link->data;
-
-        if (rcc->latency_monitor.timer) {
-            red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS);
-        }
+        red_channel_client_semi_seamless_migration_complete(link->data);
         link = next;
     }
     pthread_mutex_unlock(&client->lock);
@@ -2269,8 +798,10 @@  uint32_t red_channel_max_pipe_size(RedChannel *channel)
     uint32_t pipe_size = 0;
 
     for (link = channel->clients; link != NULL; link = link->next) {
+        uint32_t new_size;
         rcc = link->data;
-        pipe_size = MAX(pipe_size, rcc->pipe_size);
+        new_size = red_channel_client_get_pipe_size(rcc);
+        pipe_size = MAX(pipe_size, new_size);
     }
     return pipe_size;
 }
@@ -2282,7 +813,9 @@  uint32_t red_channel_min_pipe_size(RedChannel *channel)
     uint32_t pipe_size = ~0;
 
     FOREACH_CLIENT(channel, link, next, rcc) {
-        pipe_size = MIN(pipe_size, rcc->pipe_size);
+        uint32_t new_size;
+        new_size = red_channel_client_get_pipe_size(rcc);
+        pipe_size = MIN(pipe_size, new_size);
     }
     return pipe_size == ~0 ? 0 : pipe_size;
 }
@@ -2294,102 +827,11 @@  uint32_t red_channel_sum_pipes_size(RedChannel *channel)
     uint32_t sum = 0;
 
     FOREACH_CLIENT(channel, link, next, rcc) {
-        sum += rcc->pipe_size;
+        sum += red_channel_client_get_pipe_size(rcc);
     }
     return sum;
 }
 
-int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
-                                          int64_t timeout)
-{
-    uint64_t end_time;
-    int blocked;
-
-    if (!red_channel_client_blocked(rcc)) {
-        return TRUE;
-    }
-    if (timeout != -1) {
-        end_time = spice_get_monotonic_time_ns() + timeout;
-    } else {
-        end_time = UINT64_MAX;
-    }
-    spice_info("blocked");
-
-    do {
-        usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
-        red_channel_client_receive(rcc);
-        red_channel_client_send(rcc);
-    } while ((blocked = red_channel_client_blocked(rcc)) &&
-             (timeout == -1 || spice_get_monotonic_time_ns() < end_time));
-
-    if (blocked) {
-        spice_warning("timeout");
-        return FALSE;
-    } else {
-        spice_assert(red_channel_client_no_item_being_sent(rcc));
-        return TRUE;
-    }
-}
-
-static void marker_pipe_item_free(RedPipeItem *base)
-{
-    MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base);
-
-    if (item->item_in_pipe) {
-        *item->item_in_pipe = FALSE;
-    }
-    free(item);
-}
-
-/* TODO: more evil sync stuff. anything with the word wait in it's name. */
-int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
-                                           RedPipeItem *item,
-                                           int64_t timeout)
-{
-    uint64_t end_time;
-    gboolean item_in_pipe;
-
-    spice_info(NULL);
-
-    if (timeout != -1) {
-        end_time = spice_get_monotonic_time_ns() + timeout;
-    } else {
-        end_time = UINT64_MAX;
-    }
-
-    MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1);
-
-    red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER,
-                            marker_pipe_item_free);
-    item_in_pipe = TRUE;
-    mark_item->item_in_pipe = &item_in_pipe;
-    red_channel_client_pipe_add_after(rcc, &mark_item->base, item);
-
-    if (red_channel_client_blocked(rcc)) {
-        red_channel_client_receive(rcc);
-        red_channel_client_send(rcc);
-    }
-    red_channel_client_push(rcc);
-
-    while(item_in_pipe &&
-          (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) {
-        usleep(CHANNEL_BLOCKED_SLEEP_DURATION);
-        red_channel_client_receive(rcc);
-        red_channel_client_send(rcc);
-        red_channel_client_push(rcc);
-    }
-
-    if (item_in_pipe) {
-        // still on the queue, make sure won't overwrite the stack variable
-        mark_item->item_in_pipe = NULL;
-        spice_warning("timeout");
-        return FALSE;
-    } else {
-        return red_channel_client_wait_outgoing_item(rcc,
-                                                     timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns());
-    }
-}
-
 int red_channel_wait_all_sent(RedChannel *channel,
                               int64_t timeout)
 {
@@ -2424,15 +866,6 @@  int red_channel_wait_all_sent(RedChannel *channel,
     }
 }
 
-void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc)
-{
-    if (red_channel_client_blocked(rcc) || rcc->pipe_size > 0) {
-        red_channel_client_disconnect(rcc);
-    } else {
-        spice_assert(red_channel_client_no_item_being_sent(rcc));
-    }
-}
-
 RedsState* red_channel_get_server(RedChannel *channel)
 {
     return channel->reds;
diff --git a/server/red-channel.h b/server/red-channel.h
index 5b38f57..ce7f732 100644
--- a/server/red-channel.h
+++ b/server/red-channel.h
@@ -33,6 +33,7 @@ 
 #include "reds-stream.h"
 #include "stat.h"
 #include "red-pipe-item.h"
+#include "red-channel-client.h"
 
 #define MAX_SEND_BUFS 1000
 #define CLIENT_ACK_WINDOW 20
@@ -129,7 +130,6 @@  typedef struct OutgoingHandler {
 /* Red Channel interface */
 
 typedef struct RedChannel RedChannel;
-typedef struct RedChannelClient RedChannelClient;
 typedef struct RedClient RedClient;
 typedef struct MainChannelClient MainChannelClient;
 
@@ -231,62 +231,6 @@  typedef struct RedChannelClientConnectivityMonitor {
     SpiceTimer *timer;
 } RedChannelClientConnectivityMonitor;
 
-struct RedChannelClient {
-    RedChannel *channel;
-    RedClient  *client;
-    RedsStream *stream;
-    int dummy;
-    int dummy_connected;
-
-    uint32_t refs;
-
-    struct {
-        uint32_t generation;
-        uint32_t client_generation;
-        uint32_t messages_window;
-        uint32_t client_window;
-    } ack_data;
-
-    struct {
-        SpiceMarshaller *marshaller;
-        SpiceDataHeaderOpaque header;
-        uint32_t size;
-        RedPipeItem *item;
-        int blocked;
-        uint64_t serial;
-        uint64_t last_sent_serial;
-
-        struct {
-            SpiceMarshaller *marshaller;
-            uint8_t *header_data;
-            RedPipeItem *item;
-        } main;
-
-        struct {
-            SpiceMarshaller *marshaller;
-        } urgent;
-    } send_data;
-
-    OutgoingHandler outgoing;
-    IncomingHandler incoming;
-    int during_send;
-    int id; // debugging purposes
-    Ring pipe;
-    uint32_t pipe_size;
-
-    RedChannelCapabilities remote_caps;
-    int is_mini_header;
-    int destroying;
-
-    int wait_migrate_data;
-    int wait_migrate_flush_mark;
-
-    RedChannelClientLatencyMonitor latency_monitor;
-    RedChannelClientConnectivityMonitor connectivity_monitor;
-};
-
-#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client))
-
 struct RedChannel {
     uint32_t type;
     uint32_t id;
@@ -339,17 +283,6 @@  struct RedChannel {
 
 #define RED_CHANNEL(Channel) ((RedChannel *)(Channel))
 
-/*
- * When an error occurs over a channel, we treat it as a warning
- * for spice-server and shutdown the channel.
- */
-#define spice_channel_client_error(rcc, format, ...)                                     \
-    do {                                                                                 \
-        spice_warning("rcc %p type %u id %u: " format, rcc,                              \
-                    rcc->channel->type, rcc->channel->id, ## __VA_ARGS__);               \
-        red_channel_client_shutdown(rcc);                                                \
-    } while (0)
-
 /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
  * explicitly destroy the channel */
 RedChannel *red_channel_create(int size,
@@ -372,6 +305,11 @@  RedChannel *red_channel_create_parser(int size,
                                       channel_handle_parsed_proc handle_parsed,
                                       const ChannelCbs *channel_cbs,
                                       uint32_t migration_flags);
+void red_channel_ref(RedChannel *channel);
+void red_channel_unref(RedChannel *channel);
+void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc);
+void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc);
+
 void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat);
 
 void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data);
@@ -379,26 +317,13 @@  void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *clien
 void red_channel_set_common_cap(RedChannel *channel, uint32_t cap);
 void red_channel_set_cap(RedChannel *channel, uint32_t cap);
 
-RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
-                                            RedsStream *stream,
-                                            int monitor_latency,
-                                            int num_common_caps, uint32_t *common_caps,
-                                            int num_caps, uint32_t *caps);
 // TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but
 // do use the client callbacks. So the channel clients are not connected (the channel doesn't
 // have list of them, but they do have a link to the channel, and the client has a list of them)
 RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, uint32_t id);
-RedChannelClient *red_channel_client_create_dummy(int size,
-                                                  RedChannel *channel,
-                                                  RedClient  *client,
-                                                  int num_common_caps, uint32_t *common_caps,
-                                                  int num_caps, uint32_t *caps);
 
 int red_channel_is_connected(RedChannel *channel);
-int red_channel_client_is_connected(RedChannelClient *rcc);
 
-void red_channel_client_default_migrate(RedChannelClient *rcc);
-int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc);
 /* seamless migration is supported for only one client. This routine
  * checks if the only channel client associated with channel is
  * waiting for migration data */
@@ -411,61 +336,15 @@  int red_channel_is_waiting_for_migrate_data(RedChannel *channel);
  * from red_client_destroy.
  */
 
-void red_channel_client_destroy(RedChannelClient *rcc);
 void red_channel_destroy(RedChannel *channel);
-void red_channel_client_ref(RedChannelClient *rcc);
-void red_channel_client_unref(RedChannelClient *rcc);
-
-int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap);
-int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap);
 
 /* return true if all the channel clients support the cap */
 int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap);
 int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap);
 
-/* shutdown is the only safe thing to do out of the client/channel
- * thread. It will not touch the rings, just shutdown the socket.
- * It should be followed by some way to gurantee a disconnection. */
-void red_channel_client_shutdown(RedChannelClient *rcc);
-
 /* should be called when a new channel is ready to send messages */
 void red_channel_init_outgoing_messages_window(RedChannel *channel);
 
-/* handles general channel msgs from the client */
-int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
-                                      uint16_t type, void *message);
-
-/* when preparing send_data: should call init and then use marshaller */
-void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item);
-
-uint64_t red_channel_client_get_message_serial(RedChannelClient *channel);
-void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t);
-
-/* When sending a msg. Should first call red_channel_client_begin_send_message.
- * It will first send the pending urgent data, if there is any, and then
- * the rest of the data.
- */
-void red_channel_client_begin_send_message(RedChannelClient *rcc);
-
-/*
- * Stores the current send data, and switches to urgent send data.
- * When it begins the actual send, it will send first the urgent data
- * and afterward the rest of the data.
- * Should be called only if during the marshalling of on message,
- * the need to send another message, before, rises.
- * Important: the serial of the non-urgent sent data, will be succeeded.
- * return: the urgent send data marshaller
- */
-SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc);
-
-/* returns -1 if we don't have an estimation */
-int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc);
-
-/*
- * Checks periodically if the connection is still alive
- */
-void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms);
-
 // TODO: add back the channel_pipe_add functionality - by adding reference counting
 // to the RedPipeItem.
 
@@ -475,23 +354,10 @@  int red_channel_pipes_new_add_push(RedChannel *channel, new_pipe_item_t creator,
 void red_channel_pipes_new_add(RedChannel *channel, new_pipe_item_t creator, void *data);
 void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator, void *data);
 
-void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos);
-int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item);
-void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item);
-/* for types that use this routine -> the pipe item should be freed */
-void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type);
 void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type);
 
-void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type);
 void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type);
 
-void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc);
-void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window);
-void red_channel_client_push_set_ack(RedChannelClient *rcc);
-
 int red_channel_get_first_socket(RedChannel *channel);
 
 /* return TRUE if all of the connected clients to this channel are blocked */
@@ -500,24 +366,13 @@  int red_channel_all_blocked(RedChannel *channel);
 /* return TRUE if any of the connected clients to this channel are blocked */
 int red_channel_any_blocked(RedChannel *channel);
 
-int red_channel_client_blocked(RedChannelClient *rcc);
-
-/* helper for channels that have complex logic that can possibly ready a send */
-int red_channel_client_send_message_pending(RedChannelClient *rcc);
-
 int red_channel_no_item_being_sent(RedChannel *channel);
-int red_channel_client_no_item_being_sent(RedChannelClient *rcc);
 
 // TODO: unstaticed for display/cursor channels. they do some specific pushes not through
 // adding elements or on events. but not sure if this is actually required (only result
 // should be that they ""try"" a little harder, but if the event system is correct it
 // should not make any difference.
 void red_channel_push(RedChannel *channel);
-void red_channel_client_push(RedChannelClient *rcc);
-// TODO: again - what is the context exactly? this happens in channel disconnect. but our
-// current red_channel_shutdown also closes the socket - is there a socket to close?
-// are we reading from an fd here? arghh
-void red_channel_client_pipe_clear(RedChannelClient *rcc);
 // Again, used in various places outside of event handler context (or in other event handler
 // contexts):
 //  flush_display_commands/flush_cursor_commands
@@ -526,23 +381,10 @@  void red_channel_client_pipe_clear(RedChannelClient *rcc);
 //  red_wait_pipe_item_sent
 //  handle_channel_events - this is the only one that was used before, and was in red-channel.c
 void red_channel_receive(RedChannel *channel);
-void red_channel_client_receive(RedChannelClient *rcc);
 // For red_worker
 void red_channel_send(RedChannel *channel);
-void red_channel_client_send(RedChannelClient *rcc);
 // For red_worker
 void red_channel_disconnect(RedChannel *channel);
-void red_channel_client_disconnect(RedChannelClient *rcc);
-
-/* accessors for RedChannelClient */
-/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */
-SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc);
-RedsStream *red_channel_client_get_stream(RedChannelClient *rcc);
-RedClient *red_channel_client_get_client(RedChannelClient *rcc);
-
-/* Note that the header is valid only between red_channel_reset_send_data and
- * red_channel_begin_send_message.*/
-void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list);
 
 /* return the sum of all the rcc pipe size */
 uint32_t red_channel_max_pipe_size(RedChannel *channel);
@@ -596,6 +438,11 @@  RedClient *red_client_ref(RedClient *client);
  */
 RedClient *red_client_unref(RedClient *client);
 
+/* client->lock should be locked */
+void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
+void red_client_remove_channel(RedChannelClient *rcc);
+RedChannelClient *red_client_get_channel(RedClient *client, int type, int id);
+
 MainChannelClient *red_client_get_main(RedClient *client);
 // main should be set once before all the other channels are created
 void red_client_set_main(RedClient *client, MainChannelClient *mcc);
@@ -611,7 +458,7 @@  void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side *
 int red_client_during_migrate_at_target(RedClient *client);
 
 void red_client_migrate(RedClient *client);
-
+gboolean red_client_seamless_migration_done_for_channel(RedClient *client);
 /*
  * blocking functions.
  *
@@ -620,13 +467,9 @@  void red_client_migrate(RedClient *client);
  * Return: TRUE if waiting succeeded. FALSE if timeout expired.
  */
 
-int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc,
-                                           RedPipeItem *item,
-                                           int64_t timeout);
-int red_channel_client_wait_outgoing_item(RedChannelClient *rcc,
-                                          int64_t timeout);
 int red_channel_wait_all_sent(RedChannel *channel,
                               int64_t timeout);
-void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc);
+
+#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro
 
 #endif
diff --git a/server/red-qxl.c b/server/red-qxl.c
index 2a4fa52..e517b41 100644
--- a/server/red-qxl.c
+++ b/server/red-qxl.c
@@ -102,12 +102,13 @@  static void red_qxl_disconnect_display_peer(RedChannelClient *rcc)
 {
     RedWorkerMessageDisplayDisconnect payload;
     Dispatcher *dispatcher;
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
-    if (!rcc->channel) {
+    if (!channel) {
         return;
     }
 
-    dispatcher = (Dispatcher *)rcc->channel->data;
+    dispatcher = (Dispatcher *)channel->data;
 
     spice_printerr("");
     payload.rcc = rcc;
@@ -123,11 +124,12 @@  static void red_qxl_display_migrate(RedChannelClient *rcc)
 {
     RedWorkerMessageDisplayMigrate payload;
     Dispatcher *dispatcher;
-    if (!rcc->channel) {
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    if (!channel) {
         return;
     }
-    dispatcher = (Dispatcher *)rcc->channel->data;
-    spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id);
+    dispatcher = (Dispatcher *)channel->data;
+    spice_printerr("channel type %u id %u", channel->type, channel->id);
     payload.rcc = rcc;
     dispatcher_send_message(dispatcher,
                             RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
@@ -162,12 +164,13 @@  static void red_qxl_disconnect_cursor_peer(RedChannelClient *rcc)
 {
     RedWorkerMessageCursorDisconnect payload;
     Dispatcher *dispatcher;
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
-    if (!rcc->channel) {
+    if (!channel) {
         return;
     }
 
-    dispatcher = (Dispatcher *)rcc->channel->data;
+    dispatcher = (Dispatcher *)channel->data;
     spice_printerr("");
     payload.rcc = rcc;
 
@@ -180,12 +183,13 @@  static void red_qxl_cursor_migrate(RedChannelClient *rcc)
 {
     RedWorkerMessageCursorMigrate payload;
     Dispatcher *dispatcher;
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
-    if (!rcc->channel) {
+    if (!channel) {
         return;
     }
-    dispatcher = (Dispatcher *)rcc->channel->data;
-    spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id);
+    dispatcher = (Dispatcher *)channel->data;
+    spice_printerr("channel type %u id %u", channel->type, channel->id);
     payload.rcc = rcc;
     dispatcher_send_message(dispatcher,
                             RED_WORKER_MESSAGE_CURSOR_MIGRATE,
diff --git a/server/red-worker.c b/server/red-worker.c
index 34a8bff..590412b 100644
--- a/server/red-worker.c
+++ b/server/red-worker.c
@@ -101,7 +101,8 @@  static int display_is_connected(RedWorker *worker)
 
 static uint8_t *common_alloc_recv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size)
 {
-    CommonGraphicsChannel *common = SPICE_CONTAINEROF(rcc->channel, CommonGraphicsChannel, base);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    CommonGraphicsChannel *common = SPICE_CONTAINEROF(channel, CommonGraphicsChannel, base);
 
     /* SPICE_MSGC_MIGRATE_DATA is the only client message whose size is dynamic */
     if (type == SPICE_MSGC_MIGRATE_DATA) {
diff --git a/server/reds.c b/server/reds.c
index 6f88649..90358e4 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -1042,12 +1042,14 @@  void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
 {
     RedCharDevice *dev_state = RED_CHAR_DEVICE(reds->agent_dev);
     RedChannelClient *rcc;
+    RedClient *client;
 
     if (!reds->vdagent) {
         return;
     }
     spice_assert(reds->vdagent->st && reds->vdagent->st == dev_state);
     rcc = main_channel_client_get_base(mcc);
+    client = red_channel_client_get_client(rcc);
     reds->agent_dev->priv->client_agent_started = TRUE;
     /*
      * Note that in older releases, send_tokens were set to ~0 on both client
@@ -1056,11 +1058,11 @@  void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
      * and vice versa, the sending from the server to the client won't have
      * flow control, but will have no other problem.
      */
-    if (!red_char_device_client_exists(dev_state, rcc->client)) {
+    if (!red_char_device_client_exists(dev_state, client)) {
         int client_added;
 
         client_added = red_char_device_client_add(dev_state,
-                                                  rcc->client,
+                                                  client,
                                                   TRUE, /* flow control */
                                                   REDS_VDI_PORT_NUM_RECEIVE_BUFFS,
                                                   REDS_AGENT_WINDOW_SIZE,
@@ -1074,7 +1076,7 @@  void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
         }
     } else {
         red_char_device_send_to_client_tokens_set(dev_state,
-                                                  rcc->client,
+                                                  client,
                                                   num_tokens);
     }
 
@@ -1086,12 +1088,13 @@  void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t
 
 void reds_on_main_agent_tokens(RedsState *reds, MainChannelClient *mcc, uint32_t num_tokens)
 {
+    RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc));
     if (!reds->vdagent) {
         return;
     }
     spice_assert(reds->vdagent->st);
     red_char_device_send_to_client_tokens_add(reds->vdagent->st,
-                                                main_channel_client_get_base(mcc)->client,
+                                                client,
                                                 num_tokens);
 }
 
@@ -1112,7 +1115,7 @@  uint8_t *reds_get_agent_data_buffer(RedsState *reds, MainChannelClient *mcc, siz
     }
 
     spice_assert(dev->priv->recv_from_client_buf == NULL);
-    client = main_channel_client_get_base(mcc)->client;
+    client = red_channel_client_get_client(main_channel_client_get_base(mcc));
     dev->priv->recv_from_client_buf = red_char_device_write_buffer_get(RED_CHAR_DEVICE(dev),
                                                                        client,
                                                                        size + sizeof(VDIChunkHeader));
@@ -1483,9 +1486,9 @@  int reds_handle_migrate_data(RedsState *reds, MainChannelClient *mcc,
     } else {
         spice_debug("agent was not attached on the source host");
         if (reds->vdagent) {
+            RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc));
             /* red_char_device_client_remove disables waiting for migration data */
-            red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev),
-                                          main_channel_client_get_base(mcc)->client);
+            red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev), client);
             main_channel_push_agent_connected(reds->main_channel);
         }
     }
@@ -1929,10 +1932,11 @@  int reds_on_migrate_dst_set_seamless(RedsState *reds, MainChannelClient *mcc, ui
         reds->dst_do_seamless_migrate = FALSE;
     } else {
         RedChannelClient *rcc = main_channel_client_get_base(mcc);
+        RedClient *client = red_channel_client_get_client(rcc);
 
-        red_client_set_migration_seamless(rcc->client);
+        red_client_set_migration_seamless(client);
         /* linking all the channels that have been connected before migration handshake */
-        reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, rcc->client);
+        reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, client);
     }
     return reds->dst_do_seamless_migrate;
 }
diff --git a/server/smartcard.c b/server/smartcard.c
index b69ac17..fb05d9f 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -30,7 +30,7 @@ 
 
 #include "reds.h"
 #include "char-device.h"
-#include "red-channel.h"
+#include "red-channel-client-private.h"
 #include "smartcard.h"
 #include "migration-protocol.h"
 
diff --git a/server/sound.c b/server/sound.c
index 8335101..279c927 100644
--- a/server/sound.c
+++ b/server/sound.c
@@ -34,6 +34,7 @@ 
 #include "main-channel.h"
 #include "reds.h"
 #include "red-qxl.h"
+#include "red-channel-client-private.h"
 #include "sound.h"
 #include <common/snd_codec.h>
 #include "demarshallers.h"
@@ -212,14 +213,16 @@  static void snd_disconnect_channel(SndChannel *channel)
 {
     SndWorker *worker;
     RedsState *reds;
+    RedChannel *red_channel;
 
     if (!channel || !channel->stream) {
         spice_debug("not connected");
         return;
     }
+    red_channel = red_channel_client_get_channel(channel->channel_client);
     reds = snd_channel_get_server(channel);
     spice_debug("SndChannel=%p rcc=%p type=%d",
-                 channel, channel->channel_client, channel->channel_client->channel->type);
+                 channel, channel->channel_client, red_channel->type);
     worker = channel->worker;
     channel->cleanup(channel);
     red_channel_client_disconnect(worker->connection->channel_client);
@@ -422,12 +425,13 @@  static int snd_record_handle_message(SndChannel *channel, size_t size, uint32_t
 static void snd_receive(SndChannel *channel)
 {
     SpiceDataHeaderOpaque *header;
+    IncomingHandler *incoming = red_channel_client_get_incoming_handler(channel->channel_client);
 
     if (!channel) {
         return;
     }
 
-    header = &channel->channel_client->incoming.header;
+    header = &incoming->header;
 
     for (;;) {
         ssize_t n;
diff --git a/server/spicevmc.c b/server/spicevmc.c
index 953d193..9e34251 100644
--- a/server/spicevmc.c
+++ b/server/spicevmc.c
@@ -27,6 +27,9 @@ 
 #include <string.h>
 #include <netinet/in.h> // IPPROTO_TCP
 #include <netinet/tcp.h> // TCP_NODELAY
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
 
 #include <common/generated_server_marshallers.h>
 
@@ -34,9 +37,8 @@ 
 #include "red-channel.h"
 #include "reds.h"
 #include "migration-protocol.h"
-#ifdef USE_LZ4
-#include <lz4.h>
-#endif
+/* FIXME: only included for sizeof RedChannelClient */
+#include "red-channel-client-private.h"
 
 /* todo: add flow control. i.e.,
  * (a) limit the tokens available for the client
@@ -211,14 +213,15 @@  static void spicevmc_chardev_send_msg_to_client(RedPipeItem *msg,
 {
     SpiceVmcState *state = opaque;
 
-    spice_assert(state->rcc->client == client);
+    spice_assert(red_channel_client_get_client(state->rcc) == client);
     red_pipe_item_ref(msg);
     red_channel_client_pipe_add_push(state->rcc, msg);
 }
 
 static SpiceVmcState *spicevmc_red_channel_client_get_state(RedChannelClient *rcc)
 {
-    return SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
+    return SPICE_CONTAINEROF(channel, SpiceVmcState, channel);
 }
 
 static void spicevmc_port_send_init(RedChannelClient *rcc)
@@ -254,7 +257,8 @@  static void spicevmc_char_dev_remove_client(RedClient *client, void *opaque)
     SpiceVmcState *state = opaque;
 
     spice_printerr("vmc state %p, client %p", state, client);
-    spice_assert(state->rcc && state->rcc->client == client);
+    spice_assert(state->rcc &&
+                 red_channel_client_get_client(state->rcc) == client);
 
     red_channel_client_shutdown(state->rcc);
 }
@@ -263,8 +267,9 @@  static int spicevmc_red_channel_client_config_socket(RedChannelClient *rcc)
 {
     int delay_val = 1;
     RedsStream *stream = red_channel_client_get_stream(rcc);
+    RedChannel *channel = red_channel_client_get_channel(rcc);
 
-    if (rcc->channel->type == SPICE_CHANNEL_USBREDIR) {
+    if (channel->type == SPICE_CHANNEL_USBREDIR) {
         if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY,
                 &delay_val, sizeof(delay_val)) != 0) {
             if (errno != ENOTSUP && errno != ENOPROTOOPT) {
@@ -281,6 +286,7 @@  static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc)
 {
     SpiceVmcState *state;
     SpiceCharDeviceInterface *sif;
+    RedClient *client = red_channel_client_get_client(rcc);
 
     if (!rcc) {
         return;
@@ -292,17 +298,17 @@  static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc)
     red_char_device_write_buffer_release(state->chardev, &state->recv_from_client_buf);
 
     if (state->chardev) {
-        if (red_char_device_client_exists(state->chardev, rcc->client)) {
-            red_char_device_client_remove(state->chardev, rcc->client);
+        if (red_char_device_client_exists(state->chardev, client)) {
+            red_char_device_client_remove(state->chardev, client);
         } else {
             spice_printerr("client %p have already been removed from char dev %p",
-                           rcc->client, state->chardev);
+                           client, state->chardev);
         }
     }
 
     /* Don't destroy the rcc if it is already being destroyed, as then
        red_client_destroy/red_channel_client_destroy will already do this! */
-    if (!rcc->destroying)
+    if (!red_channel_client_is_destroying(rcc))
         red_channel_client_destroy(rcc);
 
     state->rcc = NULL;
@@ -422,6 +428,7 @@  static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
                                                        uint32_t size)
 {
     SpiceVmcState *state;
+    RedClient *client = red_channel_client_get_client(rcc);
 
     state = spicevmc_red_channel_client_get_state(rcc);
 
@@ -430,7 +437,7 @@  static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
         assert(!state->recv_from_client_buf);
 
         state->recv_from_client_buf = red_char_device_write_buffer_get(state->chardev,
-                                                                       rcc->client,
+                                                                         client,
                                                                        size);
         if (!state->recv_from_client_buf) {
             spice_error("failed to allocate write buffer");
diff --git a/server/stream.c b/server/stream.c
index e070202..acbd6c6 100644
--- a/server/stream.c
+++ b/server/stream.c
@@ -634,7 +634,7 @@  static uint64_t get_initial_bit_rate(DisplayChannelClient *dcc, Stream *stream)
         MainChannelClient *mcc;
         uint64_t net_test_bit_rate;
 
-        mcc = red_client_get_main(RED_CHANNEL_CLIENT(dcc)->client);
+        mcc = red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc)));
         net_test_bit_rate = main_channel_client_is_network_info_initialized(mcc) ?
                                 main_channel_client_get_bitrate_per_sec(mcc) :
                                 0;
@@ -666,7 +666,8 @@  static uint32_t get_roundtrip_ms(void *opaque)
 
     roundtrip = red_channel_client_get_roundtrip_ms(RED_CHANNEL_CLIENT(agent->dcc));
     if (roundtrip < 0) {
-        MainChannelClient *mcc = red_client_get_main(RED_CHANNEL_CLIENT(agent->dcc)->client);
+        MainChannelClient *mcc =
+            red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(agent->dcc)));
 
         /*
          * the main channel client roundtrip might not have been
@@ -690,7 +691,9 @@  static void update_client_playback_delay(void *opaque, uint32_t delay_ms)
 {
     StreamAgent *agent = opaque;
     DisplayChannelClient *dcc = agent->dcc;
-    RedsState *reds = red_channel_get_server(RED_CHANNEL_CLIENT(dcc)->channel);
+    RedChannel *channel = red_channel_client_get_channel(RED_CHANNEL_CLIENT(dcc));
+    RedClient *client = red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc));
+    RedsState *reds = red_channel_get_server(channel);
 
     dcc_update_streams_max_latency(dcc, agent);
 
@@ -700,7 +703,7 @@  static void update_client_playback_delay(void *opaque, uint32_t delay_ms)
     }
     spice_debug("resetting client latency: %u", dcc_get_max_stream_latency(agent->dcc));
     main_dispatcher_set_mm_time_latency(reds_get_main_dispatcher(reds),
-                                        RED_CHANNEL_CLIENT(agent->dcc)->client,
+                                        client,
                                         dcc_get_max_stream_latency(agent->dcc));
 }
 

Comments

> 
> Reduce direct access to RedChannelClient, and get ready to convert to
> GObject.

This patch should be mechanical but I'm having some problems reviewing.
I have to do some magic about new files, the diff is terrible, not
clear if we miss some other patches in the meantime.
The most scary thing however is that red-channel-client-private.h is
included everywhere. It's not private at all!

Frediano

> ---
>  server/Makefile.am                  |    3 +
>  server/cursor-channel.c             |    2 +
>  server/dcc-private.h                |    1 +
>  server/dcc.h                        |    2 +-
>  server/display-channel.c            |    3 +-
>  server/inputs-channel-client.c      |    1 +
>  server/inputs-channel.c             |   15 +-
>  server/main-channel-client.c        |   23 +-
>  server/main-channel.c               |   39 +-
>  server/red-channel-client-private.h |   78 ++
>  server/red-channel-client.c         | 1626 +++++++++++++++++++++++++++++++
>  server/red-channel-client.h         |  177 ++++
>  server/red-channel.c                | 1835
>  +++--------------------------------
>  server/red-channel.h                |  185 +---
>  server/red-qxl.c                    |   24 +-
>  server/red-worker.c                 |    3 +-
>  server/reds.c                       |   22 +-
>  server/smartcard.c                  |    2 +-
>  server/sound.c                      |    8 +-
>  server/spicevmc.c                   |   31 +-
>  server/stream.c                     |   11 +-
>  21 files changed, 2146 insertions(+), 1945 deletions(-)
>  create mode 100644 server/red-channel-client-private.h
>  create mode 100644 server/red-channel-client.c
>  create mode 100644 server/red-channel-client.h
> 
> diff --git a/server/Makefile.am b/server/Makefile.am
> index 24e6e21..a249046 100644
> --- a/server/Makefile.am
> +++ b/server/Makefile.am
> @@ -95,6 +95,9 @@ libserver_la_SOURCES =				\
>  	mjpeg-encoder.c				\
>  	red-channel.c				\
>  	red-channel.h				\
> +	red-channel-client.c			\
> +	red-channel-client.h			\
> +	red-channel-client-private.h		\
>  	red-common.h				\
>  	dispatcher.c				\
>  	dispatcher.h				\
> diff --git a/server/cursor-channel.c b/server/cursor-channel.c
> index dac929a..83ce61b 100644
> --- a/server/cursor-channel.c
> +++ b/server/cursor-channel.c
> @@ -21,7 +21,9 @@
>  
>  #include <glib.h>
>  #include <common/generated_server_marshallers.h>
> +
>  #include "cursor-channel.h"
> +#include "red-channel-client-private.h"
>  #include "cache-item.h"
>  

....
On Fri, 2016-08-12 at 03:55 -0400, Frediano Ziglio wrote:
> > 
> > 
> > Reduce direct access to RedChannelClient, and get ready to convert
> > to
> > GObject.
> 
> This patch should be mechanical but I'm having some problems
> reviewing.
> I have to do some magic about new files, the diff is terrible, not
> clear if we miss some other patches in the meantime.
> The most scary thing however is that red-channel-client-private.h is
> included everywhere. It's not private at all!
> 


working on splitting this and the next commit up into a bit more
manageable pieces. It's difficult for me to review as well, and I
apparently wrote it :/



> Frediano
> 
> > 
> > ---
> >  server/Makefile.am                  |    3 +
> >  server/cursor-channel.c             |    2 +
> >  server/dcc-private.h                |    1 +
> >  server/dcc.h                        |    2 +-
> >  server/display-channel.c            |    3 +-
> >  server/inputs-channel-client.c      |    1 +
> >  server/inputs-channel.c             |   15 +-
> >  server/main-channel-client.c        |   23 +-
> >  server/main-channel.c               |   39 +-
> >  server/red-channel-client-private.h |   78 ++
> >  server/red-channel-client.c         | 1626
> > +++++++++++++++++++++++++++++++
> >  server/red-channel-client.h         |  177 ++++
> >  server/red-channel.c                | 1835
> >  +++--------------------------------
> >  server/red-channel.h                |  185 +---
> >  server/red-qxl.c                    |   24 +-
> >  server/red-worker.c                 |    3 +-
> >  server/reds.c                       |   22 +-
> >  server/smartcard.c                  |    2 +-
> >  server/sound.c                      |    8 +-
> >  server/spicevmc.c                   |   31 +-
> >  server/stream.c                     |   11 +-
> >  21 files changed, 2146 insertions(+), 1945 deletions(-)
> >  create mode 100644 server/red-channel-client-private.h
> >  create mode 100644 server/red-channel-client.c
> >  create mode 100644 server/red-channel-client.h
> > 
> > diff --git a/server/Makefile.am b/server/Makefile.am
> > index 24e6e21..a249046 100644
> > --- a/server/Makefile.am
> > +++ b/server/Makefile.am
> > @@ -95,6 +95,9 @@ libserver_la_SOURCES =				
> > \
> >  	mjpeg-encoder.c				\
> >  	red-channel.c				\
> >  	red-channel.h				\
> > +	red-channel-client.c			\
> > +	red-channel-client.h			\
> > +	red-channel-client-private.h		\
> >  	red-common.h				\
> >  	dispatcher.c				\
> >  	dispatcher.h				\
> > diff --git a/server/cursor-channel.c b/server/cursor-channel.c
> > index dac929a..83ce61b 100644
> > --- a/server/cursor-channel.c
> > +++ b/server/cursor-channel.c
> > @@ -21,7 +21,9 @@
> >  
> >  #include <glib.h>
> >  #include <common/generated_server_marshallers.h>
> > +
> >  #include "cursor-channel.h"
> > +#include "red-channel-client-private.h"
> >  #include "cache-item.h"
> >  
> 
> ....