[RFC,spice-vdagent,05/18] add VDAgentConnection

Submitted by Jakub Janku on Aug. 14, 2018, 6:53 p.m.

Details

Message ID 20180814185352.6080-6-jjanku@redhat.com
State New
Headers show
Series "GLib integration" ( rev: 1 ) in Spice

Not browsing as part of any series.

Commit Message

Jakub Janku Aug. 14, 2018, 6:53 p.m.
Add a set of helper functions built around GIO that can be used to
easily write messages to and read from the given FD.

Since VDAgentConnection uses GIO,
it integrates well with GMainLoop.

Read messages must begin with a header of a fixed size.
Message body size can vary.

User of VDAgentConnection is notified
through callbacks about the following events:
- message header read
- message body read
- I/O error

A new VDAgentConnection can be constructed using
vdagent_connection_new() based on a GIOStream.

A new GIOStream can be obtained using
vdagent_file_open() or vdagent_socket_connect().

vdagent_connection_destroy() destroyes the connection.
However, due to the asynchronous nature of used GIO functions,
this does NOT close the underlying FD immediately.
---
 Makefile.am              |   2 +
 src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
 src/vdagent-connection.h | 103 ++++++++++++++
 3 files changed, 406 insertions(+)
 create mode 100644 src/vdagent-connection.c
 create mode 100644 src/vdagent-connection.h

Patch hide | download patch | download mbox

diff --git a/Makefile.am b/Makefile.am
index fa54bbc..b291b19 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -7,6 +7,8 @@  sbin_PROGRAMS = src/spice-vdagentd
 common_sources =				\
 	src/udscs.c				\
 	src/udscs.h				\
+	src/vdagent-connection.c		\
+	src/vdagent-connection.h		\
 	src/vdagentd-proto-strings.h		\
 	src/vdagentd-proto.h			\
 	$(NULL)
diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
new file mode 100644
index 0000000..0eb2ec9
--- /dev/null
+++ b/src/vdagent-connection.c
@@ -0,0 +1,301 @@ 
+/*  vdagent-connection.c
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <syslog.h>
+#include <fcntl.h>
+#include <glib/gstdio.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixsocketaddress.h>
+
+#include "vdagent-connection.h"
+
+struct VDAgentConnection {
+    GIOStream              *io_stream;
+    gboolean                opening;
+    GCancellable           *cancellable;
+
+    GQueue                 *write_queue;
+    GMainLoop              *flush_loop;
+
+    VDAgentConnReadCb       read_cb;
+    gpointer                read_buff;
+    gpointer                header_buff;
+    gsize                   header_size;
+    VDAgentConnHeaderReadCb header_read_cb;
+
+    VDAgentConnErrorCb      error_cb;
+
+    GCredentials           *credentials;
+
+    gpointer                user_data;
+};
+
+static void request_message_write(VDAgentConnection *conn);
+static void request_message_read(VDAgentConnection *conn);
+
+GIOStream *vdagent_file_open(const gchar *path)
+{
+    gint fd;
+
+    fd = g_open(path, O_RDWR);
+    if (fd == -1) {
+        syslog(LOG_ERR, "%s: %m", __func__);
+        return NULL;
+    }
+
+    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
+                                  g_unix_output_stream_new(fd, TRUE));
+}
+
+GIOStream *vdagent_socket_connect(const gchar *address)
+{
+    GSocketConnection *socket_conn;
+    GSocketClient *client;
+    GSocketConnectable *connectable;
+    GError *err = NULL;
+
+    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
+    client = g_object_new(G_TYPE_SOCKET_CLIENT,
+                          "family", G_SOCKET_FAMILY_UNIX,
+                          "type", G_SOCKET_TYPE_STREAM,
+                          NULL);
+
+    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
+    g_object_unref(client);
+    g_object_unref(connectable);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+    }
+    return G_IO_STREAM(socket_conn);
+}
+
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data)
+{
+    VDAgentConnection *conn;
+
+    conn = g_new(VDAgentConnection, 1);
+    conn->io_stream = io_stream;
+    conn->cancellable = g_cancellable_new();
+    conn->opening = wait_on_opening;
+    conn->write_queue = g_queue_new();
+    conn->flush_loop = NULL;
+    conn->read_cb = read_cb;
+    conn->read_buff = NULL;
+    conn->header_buff = g_malloc(header_size);
+    conn->header_size = header_size;
+    conn->header_read_cb = header_read_cb;
+    conn->error_cb = error_cb;
+    conn->credentials = NULL;
+    conn->user_data = user_data;
+
+    request_message_read(conn);
+
+    return conn;
+}
+
+static gboolean connection_has_pending(VDAgentConnection *conn)
+{
+    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
+    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
+
+    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
+}
+
+/* Free up all resources used by VDAgentConnection
+ * once all I/O operations have finished. */
+static void connection_finalize(VDAgentConnection *conn)
+{
+    g_object_unref(conn->cancellable);
+    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
+    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    g_clear_object(&conn->credentials);
+    g_free(conn->header_buff);
+    g_free(conn->read_buff);
+    g_object_unref(conn->io_stream);
+    g_free(conn);
+}
+
+void vdagent_connection_destroy(VDAgentConnection *conn)
+{
+    /* If there's a pending I/O operation on either of the streams, cancel it,
+     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
+    if (connection_has_pending(conn))
+        g_cancellable_cancel(conn->cancellable);
+    else
+        connection_finalize(conn);
+}
+
+static void handle_io_error(VDAgentConnection *conn, GError *err)
+{
+    if (g_cancellable_is_cancelled(conn->cancellable)) {
+        if (!connection_has_pending(conn))
+            connection_finalize(conn);
+    } else {
+        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
+        conn->error_cb(conn->user_data);
+    }
+    g_error_free(err);
+}
+
+GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
+{
+    GSocket *socket;
+    GError *err = NULL;
+
+    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
+
+    if (conn->credentials)
+        return conn->credentials;
+
+    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
+    conn->credentials = g_socket_get_credentials(socket, &err);
+    if (err) {
+        syslog(LOG_ERR, "%s: %s", __func__, err->message);
+        g_error_free(err);
+    }
+    return conn->credentials;
+}
+
+static void message_write_cb(GObject      *source_object,
+                             GAsyncResult *res,
+                             gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GOutputStream *out = G_OUTPUT_STREAM(source_object);
+    GError *err = NULL;
+
+    g_output_stream_write_all_finish(out, res, NULL, &err);
+    g_bytes_unref(g_queue_pop_head(conn->write_queue));
+
+    if (err)
+        return handle_io_error(conn, err);
+
+    conn->opening = FALSE;
+
+    if (g_queue_is_empty(conn->write_queue))
+        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
+    else
+        request_message_write(conn);
+}
+
+static void request_message_write(VDAgentConnection *conn)
+{
+    GBytes *msg;
+    GOutputStream *out;
+
+    msg = g_queue_peek_head(conn->write_queue);
+    out = g_io_stream_get_output_stream(conn->io_stream);
+
+    g_output_stream_write_all_async(out,
+        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
+        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
+}
+
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size)
+{
+    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
+
+    if (g_queue_get_length(conn->write_queue) == 1)
+        request_message_write(conn);
+}
+
+void vdagent_connection_flush(VDAgentConnection *conn)
+{
+    GMainLoop *loop;
+    /* TODO: allow multiple flush calls at once? */
+    g_return_if_fail(conn->flush_loop == NULL);
+
+    if (g_queue_is_empty(conn->write_queue))
+        return;
+
+    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
+    /* When using GTK+, this should be wrapped with
+     * gdk_threads_leave() and gdk_threads_enter(),
+     * but since flush is used in virtio-port.c only
+     * let's leave it as it is for now. */
+    g_main_loop_run(loop);
+    g_main_loop_unref(loop);
+}
+
+static void message_read_cb(GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+    VDAgentConnection *conn = user_data;
+    GInputStream *in = G_INPUT_STREAM(source_object);
+    GError *err = NULL;
+    gsize bytes_read, data_size;
+
+    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
+    if (err)
+        return handle_io_error(conn, err);
+    if (bytes_read == 0) {
+        /* see virtio-port.c for the rationale behind this */
+        if (conn->opening) {
+            g_usleep(10000);
+            request_message_read(conn);
+        } else {
+            conn->error_cb(conn->user_data);
+        }
+        return;
+    }
+    conn->opening = FALSE;
+
+    if (conn->read_buff == NULL) {
+        /* we've read the message header, now let's read its body */
+        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
+            return;
+        if (data_size > 0) {
+            conn->read_buff = g_malloc(data_size);
+            /* TODO: if allocation fails, we could try g_input_stream_skip()
+             * and hope that the message wasn't crucial for proper functiong.
+             * An example might be when a user tries to copy large clipboard.
+             * Not sure whether it's worth implementing.
+             * Other stuff might just as well fall apart
+             * when the system is running out of memory? */
+            g_input_stream_read_all_async(in, conn->read_buff, data_size,
+                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+            return;
+        }
+    }
+
+    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
+        return;
+    g_clear_pointer(&conn->read_buff, g_free);
+    request_message_read(conn);
+}
+
+static void request_message_read(VDAgentConnection *conn)
+{
+    GInputStream *in;
+    in = g_io_stream_get_input_stream(conn->io_stream);
+
+    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
+        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
+}
diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
new file mode 100644
index 0000000..6fc0081
--- /dev/null
+++ b/src/vdagent-connection.h
@@ -0,0 +1,103 @@ 
+/*  vdagent-connection.h
+
+    Copyright 2018 Red Hat, Inc.
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __VDAGENT_CONNECTION_H
+#define __VDAGENT_CONNECTION_H
+
+#include <glib.h>
+#include <gio/gio.h>
+
+typedef struct VDAgentConnection VDAgentConnection;
+
+/* Called when a message header has been read.
+ *
+ * If the handler wishes to continue reading,
+ * it must set @body_size to the size of message's body and return TRUE.
+ * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
+ *
+ * Otherwise the handler should return FALSE
+ * and call vdagent_connection_destroy().
+ *
+ * @header_buff is owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
+                                            gsize   *body_size,
+                                            gpointer user_data);
+
+/* Called when a full message has been read.
+ *
+ * If the handler wished to continue reading, it must return TRUE,
+ * otherwise FALSE and call vdagent_connection_destroy().
+ *
+ * @header, @data are owned by VDAgentConnection and must not be freed. */
+typedef gboolean (*VDAgentConnReadCb)(gpointer header,
+                                      gpointer data,
+                                      gpointer user_data);
+
+/* Called when an error occured during read or wirte.
+ * The handler is expected to call vdagent_connection_destroy(). */
+typedef void (*VDAgentConnErrorCb)(gpointer user_data);
+
+/* Open a file in @path for read and write.
+ * Returns a GIOStream to the given file or NULL on error. */
+GIOStream *vdagent_file_open(const gchar *path);
+
+/* Create a socket and initiate a new connection to the socket on @address.
+ * Returns a GIOStream corresponding to the new connection or NULL on error. */
+GIOStream *vdagent_socket_connect(const gchar *address);
+
+/* Create new VDAgentConnection and start reading incoming messages.
+ *
+ * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
+ * until the first message is successfully read or written to the @io_stream.
+ *
+ * @user_data will be passed to the supplied callbacks. */
+VDAgentConnection *vdagent_connection_new(
+    GIOStream              *io_stream,
+    gboolean                wait_on_opening,
+    gsize                   header_size,
+    VDAgentConnHeaderReadCb header_read_cb,
+    VDAgentConnReadCb       read_cb,
+    VDAgentConnErrorCb      error_cb,
+    gpointer                user_data);
+
+/* Free up all resources associated with the VDAgentConnection.
+ *
+ * This operation can be asynchronous. */
+void vdagent_connection_destroy(VDAgentConnection *conn);
+
+/* Append a message to write queue.
+ *
+ * VDAgentConnection takes ownership of the @data
+ * and frees it once the message is flushed. */
+void vdagent_connection_write(VDAgentConnection *conn,
+                              gpointer           data,
+                              gsize              size);
+
+/* Waits until all queued messages get written to the output stream.
+ *
+ * Note: other GSources can be triggered during this call */
+void vdagent_connection_flush(VDAgentConnection *conn);
+
+/* Returns the credentials of the foreign process connected to the socket.
+ *
+ * It is an error to call this function with a VDAgentConnection
+ * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
+GCredentials *vdagent_connection_get_peer_credentials(
+    VDAgentConnection *conn);
+
+#endif

Comments

Hi,

On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote:
> Add a set of helper functions built around GIO that can be used to
> easily write messages to and read from the given FD.
> 
> Since VDAgentConnection uses GIO,
> it integrates well with GMainLoop.
> 
> Read messages must begin with a header of a fixed size.
> Message body size can vary.
> 
> User of VDAgentConnection is notified
> through callbacks about the following events:
> - message header read
> - message body read
> - I/O error
> 
> A new VDAgentConnection can be constructed using
> vdagent_connection_new() based on a GIOStream.
> 
> A new GIOStream can be obtained using
> vdagent_file_open() or vdagent_socket_connect().
> 
> vdagent_connection_destroy() destroyes the connection.
> However, due to the asynchronous nature of used GIO functions,
> this does NOT close the underlying FD immediately.

Yep, commented about it on 00/18 but I take that making this a
GObject might help. Not giving a full review here, just small
note after looking at the patch and the follow up ones.

> ---
>  Makefile.am              |   2 +
>  src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
>  src/vdagent-connection.h | 103 ++++++++++++++
>  3 files changed, 406 insertions(+)
>  create mode 100644 src/vdagent-connection.c
>  create mode 100644 src/vdagent-connection.h
> 
> diff --git a/Makefile.am b/Makefile.am
> index fa54bbc..b291b19 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
>  common_sources =				\
>  	src/udscs.c				\
>  	src/udscs.h				\
> +	src/vdagent-connection.c		\
> +	src/vdagent-connection.h		\
>  	src/vdagentd-proto-strings.h		\
>  	src/vdagentd-proto.h			\
>  	$(NULL)
> diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
> new file mode 100644
> index 0000000..0eb2ec9
> --- /dev/null
> +++ b/src/vdagent-connection.c
> @@ -0,0 +1,301 @@
> +/*  vdagent-connection.c
> +
> +    Copyright 2018 Red Hat, Inc.
> +
> +    This program is free software: you can redistribute it and/or modify
> +    it under the terms of the GNU General Public License as published by
> +    the Free Software Foundation, either version 3 of the License, or
> +    (at your option) any later version.
> +
> +    This program is distributed in the hope that it will be useful,
> +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +    GNU General Public License for more details.
> +
> +    You should have received a copy of the GNU General Public License
> +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#include <syslog.h>
> +#include <fcntl.h>
> +#include <glib/gstdio.h>
> +#include <gio/gunixinputstream.h>
> +#include <gio/gunixoutputstream.h>
> +#include <gio/gunixsocketaddress.h>
> +
> +#include "vdagent-connection.h"
> +
> +struct VDAgentConnection {
> +    GIOStream              *io_stream;
> +    gboolean                opening;
> +    GCancellable           *cancellable;
> +
> +    GQueue                 *write_queue;
> +    GMainLoop              *flush_loop;
> +
> +    VDAgentConnReadCb       read_cb;
> +    gpointer                read_buff;
> +    gpointer                header_buff;
> +    gsize                   header_size;
> +    VDAgentConnHeaderReadCb header_read_cb;
> +
> +    VDAgentConnErrorCb      error_cb;
> +
> +    GCredentials           *credentials;
> +
> +    gpointer                user_data;
> +};
> +
> +static void request_message_write(VDAgentConnection *conn);
> +static void request_message_read(VDAgentConnection *conn);
> +
> +GIOStream *vdagent_file_open(const gchar *path)
> +{
> +    gint fd;
> +
> +    fd = g_open(path, O_RDWR);
> +    if (fd == -1) {
> +        syslog(LOG_ERR, "%s: %m", __func__);
> +        return NULL;
> +    }
> +
> +    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
> +                                  g_unix_output_stream_new(fd, TRUE));
> +}
> +
> +GIOStream *vdagent_socket_connect(const gchar *address)
> +{
> +    GSocketConnection *socket_conn;
> +    GSocketClient *client;
> +    GSocketConnectable *connectable;
> +    GError *err = NULL;
> +
> +    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
> +    client = g_object_new(G_TYPE_SOCKET_CLIENT,
> +                          "family", G_SOCKET_FAMILY_UNIX,
> +                          "type", G_SOCKET_TYPE_STREAM,
> +                          NULL);
> +
> +    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
> +    g_object_unref(client);
> +    g_object_unref(connectable);
> +    if (err) {
> +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> +        g_error_free(err);
> +    }
> +    return G_IO_STREAM(socket_conn);
> +}

Not convinced that this API is really needed? The function can be
kept but to be used internally by vdagent_connection_new()
itself, I guess. I don't see other components using GIOStream
unless to pass it to vdagent_connection_new ().

Might make sense to make socket's address as VDAgentConnection's
property which should be set on g_object_new () too.

Reviewed-by: Victor Toso <victortoso@redhat.com>

Victor

> +
> +VDAgentConnection *vdagent_connection_new(
> +    GIOStream              *io_stream,
> +    gboolean                wait_on_opening,
> +    gsize                   header_size,
> +    VDAgentConnHeaderReadCb header_read_cb,
> +    VDAgentConnReadCb       read_cb,
> +    VDAgentConnErrorCb      error_cb,
> +    gpointer                user_data)
> +{
> +    VDAgentConnection *conn;
> +
> +    conn = g_new(VDAgentConnection, 1);
> +    conn->io_stream = io_stream;
> +    conn->cancellable = g_cancellable_new();
> +    conn->opening = wait_on_opening;
> +    conn->write_queue = g_queue_new();
> +    conn->flush_loop = NULL;
> +    conn->read_cb = read_cb;
> +    conn->read_buff = NULL;
> +    conn->header_buff = g_malloc(header_size);
> +    conn->header_size = header_size;
> +    conn->header_read_cb = header_read_cb;
> +    conn->error_cb = error_cb;
> +    conn->credentials = NULL;
> +    conn->user_data = user_data;
> +
> +    request_message_read(conn);
> +
> +    return conn;
> +}
> +
> +static gboolean connection_has_pending(VDAgentConnection *conn)
> +{
> +    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
> +    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
> +
> +    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
> +}
> +
> +/* Free up all resources used by VDAgentConnection
> + * once all I/O operations have finished. */
> +static void connection_finalize(VDAgentConnection *conn)
> +{
> +    g_object_unref(conn->cancellable);
> +    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
> +    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> +    g_clear_object(&conn->credentials);
> +    g_free(conn->header_buff);
> +    g_free(conn->read_buff);
> +    g_object_unref(conn->io_stream);
> +    g_free(conn);
> +}
> +
> +void vdagent_connection_destroy(VDAgentConnection *conn)
> +{
> +    /* If there's a pending I/O operation on either of the streams, cancel it,
> +     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
> +    if (connection_has_pending(conn))
> +        g_cancellable_cancel(conn->cancellable);
> +    else
> +        connection_finalize(conn);
> +}
> +
> +static void handle_io_error(VDAgentConnection *conn, GError *err)
> +{
> +    if (g_cancellable_is_cancelled(conn->cancellable)) {
> +        if (!connection_has_pending(conn))
> +            connection_finalize(conn);
> +    } else {
> +        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
> +        conn->error_cb(conn->user_data);
> +    }
> +    g_error_free(err);
> +}
> +
> +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
> +{
> +    GSocket *socket;
> +    GError *err = NULL;
> +
> +    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
> +
> +    if (conn->credentials)
> +        return conn->credentials;
> +
> +    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
> +    conn->credentials = g_socket_get_credentials(socket, &err);
> +    if (err) {
> +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> +        g_error_free(err);
> +    }
> +    return conn->credentials;
> +}
> +
> +static void message_write_cb(GObject      *source_object,
> +                             GAsyncResult *res,
> +                             gpointer      user_data)
> +{
> +    VDAgentConnection *conn = user_data;
> +    GOutputStream *out = G_OUTPUT_STREAM(source_object);
> +    GError *err = NULL;
> +
> +    g_output_stream_write_all_finish(out, res, NULL, &err);
> +    g_bytes_unref(g_queue_pop_head(conn->write_queue));
> +
> +    if (err)
> +        return handle_io_error(conn, err);
> +
> +    conn->opening = FALSE;
> +
> +    if (g_queue_is_empty(conn->write_queue))
> +        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> +    else
> +        request_message_write(conn);
> +}
> +
> +static void request_message_write(VDAgentConnection *conn)
> +{
> +    GBytes *msg;
> +    GOutputStream *out;
> +
> +    msg = g_queue_peek_head(conn->write_queue);
> +    out = g_io_stream_get_output_stream(conn->io_stream);
> +
> +    g_output_stream_write_all_async(out,
> +        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
> +        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
> +}
> +
> +void vdagent_connection_write(VDAgentConnection *conn,
> +                              gpointer           data,
> +                              gsize              size)
> +{
> +    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
> +
> +    if (g_queue_get_length(conn->write_queue) == 1)
> +        request_message_write(conn);
> +}
> +
> +void vdagent_connection_flush(VDAgentConnection *conn)
> +{
> +    GMainLoop *loop;
> +    /* TODO: allow multiple flush calls at once? */
> +    g_return_if_fail(conn->flush_loop == NULL);
> +
> +    if (g_queue_is_empty(conn->write_queue))
> +        return;
> +
> +    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
> +    /* When using GTK+, this should be wrapped with
> +     * gdk_threads_leave() and gdk_threads_enter(),
> +     * but since flush is used in virtio-port.c only
> +     * let's leave it as it is for now. */
> +    g_main_loop_run(loop);
> +    g_main_loop_unref(loop);
> +}
> +
> +static void message_read_cb(GObject      *source_object,
> +                            GAsyncResult *res,
> +                            gpointer      user_data)
> +{
> +    VDAgentConnection *conn = user_data;
> +    GInputStream *in = G_INPUT_STREAM(source_object);
> +    GError *err = NULL;
> +    gsize bytes_read, data_size;
> +
> +    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
> +    if (err)
> +        return handle_io_error(conn, err);
> +    if (bytes_read == 0) {
> +        /* see virtio-port.c for the rationale behind this */
> +        if (conn->opening) {
> +            g_usleep(10000);
> +            request_message_read(conn);
> +        } else {
> +            conn->error_cb(conn->user_data);
> +        }
> +        return;
> +    }
> +    conn->opening = FALSE;
> +
> +    if (conn->read_buff == NULL) {
> +        /* we've read the message header, now let's read its body */
> +        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
> +            return;
> +        if (data_size > 0) {
> +            conn->read_buff = g_malloc(data_size);
> +            /* TODO: if allocation fails, we could try g_input_stream_skip()
> +             * and hope that the message wasn't crucial for proper functiong.
> +             * An example might be when a user tries to copy large clipboard.
> +             * Not sure whether it's worth implementing.
> +             * Other stuff might just as well fall apart
> +             * when the system is running out of memory? */
> +            g_input_stream_read_all_async(in, conn->read_buff, data_size,
> +                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> +            return;
> +        }
> +    }
> +
> +    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
> +        return;
> +    g_clear_pointer(&conn->read_buff, g_free);
> +    request_message_read(conn);
> +}
> +
> +static void request_message_read(VDAgentConnection *conn)
> +{
> +    GInputStream *in;
> +    in = g_io_stream_get_input_stream(conn->io_stream);
> +
> +    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
> +        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> +}
> diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
> new file mode 100644
> index 0000000..6fc0081
> --- /dev/null
> +++ b/src/vdagent-connection.h
> @@ -0,0 +1,103 @@
> +/*  vdagent-connection.h
> +
> +    Copyright 2018 Red Hat, Inc.
> +
> +    This program is free software: you can redistribute it and/or modify
> +    it under the terms of the GNU General Public License as published by
> +    the Free Software Foundation, either version 3 of the License, or
> +    (at your option) any later version.
> +
> +    This program is distributed in the hope that it will be useful,
> +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +    GNU General Public License for more details.
> +
> +    You should have received a copy of the GNU General Public License
> +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#ifndef __VDAGENT_CONNECTION_H
> +#define __VDAGENT_CONNECTION_H
> +
> +#include <glib.h>
> +#include <gio/gio.h>
> +
> +typedef struct VDAgentConnection VDAgentConnection;
> +
> +/* Called when a message header has been read.
> + *
> + * If the handler wishes to continue reading,
> + * it must set @body_size to the size of message's body and return TRUE.
> + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
> + *
> + * Otherwise the handler should return FALSE
> + * and call vdagent_connection_destroy().
> + *
> + * @header_buff is owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
> +                                            gsize   *body_size,
> +                                            gpointer user_data);
> +
> +/* Called when a full message has been read.
> + *
> + * If the handler wished to continue reading, it must return TRUE,
> + * otherwise FALSE and call vdagent_connection_destroy().
> + *
> + * @header, @data are owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnReadCb)(gpointer header,
> +                                      gpointer data,
> +                                      gpointer user_data);
> +
> +/* Called when an error occured during read or wirte.
> + * The handler is expected to call vdagent_connection_destroy(). */
> +typedef void (*VDAgentConnErrorCb)(gpointer user_data);
> +
> +/* Open a file in @path for read and write.
> + * Returns a GIOStream to the given file or NULL on error. */
> +GIOStream *vdagent_file_open(const gchar *path);
> +
> +/* Create a socket and initiate a new connection to the socket on @address.
> + * Returns a GIOStream corresponding to the new connection or NULL on error. */
> +GIOStream *vdagent_socket_connect(const gchar *address);
> +
> +/* Create new VDAgentConnection and start reading incoming messages.
> + *
> + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
> + * until the first message is successfully read or written to the @io_stream.
> + *
> + * @user_data will be passed to the supplied callbacks. */
> +VDAgentConnection *vdagent_connection_new(
> +    GIOStream              *io_stream,
> +    gboolean                wait_on_opening,
> +    gsize                   header_size,
> +    VDAgentConnHeaderReadCb header_read_cb,
> +    VDAgentConnReadCb       read_cb,
> +    VDAgentConnErrorCb      error_cb,
> +    gpointer                user_data);
> +
> +/* Free up all resources associated with the VDAgentConnection.
> + *
> + * This operation can be asynchronous. */
> +void vdagent_connection_destroy(VDAgentConnection *conn);
> +
> +/* Append a message to write queue.
> + *
> + * VDAgentConnection takes ownership of the @data
> + * and frees it once the message is flushed. */
> +void vdagent_connection_write(VDAgentConnection *conn,
> +                              gpointer           data,
> +                              gsize              size);
> +
> +/* Waits until all queued messages get written to the output stream.
> + *
> + * Note: other GSources can be triggered during this call */
> +void vdagent_connection_flush(VDAgentConnection *conn);
> +
> +/* Returns the credentials of the foreign process connected to the socket.
> + *
> + * It is an error to call this function with a VDAgentConnection
> + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
> +GCredentials *vdagent_connection_get_peer_credentials(
> +    VDAgentConnection *conn);
> +
> +#endif
> -- 
> 2.17.1
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel
Hey,

On Tue, Aug 28, 2018 at 10:04 AM Victor Toso <victortoso@redhat.com> wrote:
>
> Hi,
>
> On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote:
> > Add a set of helper functions built around GIO that can be used to
> > easily write messages to and read from the given FD.
> >
> > Since VDAgentConnection uses GIO,
> > it integrates well with GMainLoop.
> >
> > Read messages must begin with a header of a fixed size.
> > Message body size can vary.
> >
> > User of VDAgentConnection is notified
> > through callbacks about the following events:
> > - message header read
> > - message body read
> > - I/O error
> >
> > A new VDAgentConnection can be constructed using
> > vdagent_connection_new() based on a GIOStream.
> >
> > A new GIOStream can be obtained using
> > vdagent_file_open() or vdagent_socket_connect().
> >
> > vdagent_connection_destroy() destroyes the connection.
> > However, due to the asynchronous nature of used GIO functions,
> > this does NOT close the underlying FD immediately.
>
> Yep, commented about it on 00/18 but I take that making this a
> GObject might help. Not giving a full review here, just small
> note after looking at the patch and the follow up ones.
>
> > ---
> >  Makefile.am              |   2 +
> >  src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
> >  src/vdagent-connection.h | 103 ++++++++++++++
> >  3 files changed, 406 insertions(+)
> >  create mode 100644 src/vdagent-connection.c
> >  create mode 100644 src/vdagent-connection.h
> >
> > diff --git a/Makefile.am b/Makefile.am
> > index fa54bbc..b291b19 100644
> > --- a/Makefile.am
> > +++ b/Makefile.am
> > @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
> >  common_sources =                             \
> >       src/udscs.c                             \
> >       src/udscs.h                             \
> > +     src/vdagent-connection.c                \
> > +     src/vdagent-connection.h                \
> >       src/vdagentd-proto-strings.h            \
> >       src/vdagentd-proto.h                    \
> >       $(NULL)
> > diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
> > new file mode 100644
> > index 0000000..0eb2ec9
> > --- /dev/null
> > +++ b/src/vdagent-connection.c
> > @@ -0,0 +1,301 @@
> > +/*  vdagent-connection.c
> > +
> > +    Copyright 2018 Red Hat, Inc.
> > +
> > +    This program is free software: you can redistribute it and/or modify
> > +    it under the terms of the GNU General Public License as published by
> > +    the Free Software Foundation, either version 3 of the License, or
> > +    (at your option) any later version.
> > +
> > +    This program is distributed in the hope that it will be useful,
> > +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +    GNU General Public License for more details.
> > +
> > +    You should have received a copy of the GNU General Public License
> > +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> > +*/
> > +
> > +#include <syslog.h>
> > +#include <fcntl.h>
> > +#include <glib/gstdio.h>
> > +#include <gio/gunixinputstream.h>
> > +#include <gio/gunixoutputstream.h>
> > +#include <gio/gunixsocketaddress.h>
> > +
> > +#include "vdagent-connection.h"
> > +
> > +struct VDAgentConnection {
> > +    GIOStream              *io_stream;
> > +    gboolean                opening;
> > +    GCancellable           *cancellable;
> > +
> > +    GQueue                 *write_queue;
> > +    GMainLoop              *flush_loop;
> > +
> > +    VDAgentConnReadCb       read_cb;
> > +    gpointer                read_buff;
> > +    gpointer                header_buff;
> > +    gsize                   header_size;
> > +    VDAgentConnHeaderReadCb header_read_cb;
> > +
> > +    VDAgentConnErrorCb      error_cb;
> > +
> > +    GCredentials           *credentials;
> > +
> > +    gpointer                user_data;
> > +};
> > +
> > +static void request_message_write(VDAgentConnection *conn);
> > +static void request_message_read(VDAgentConnection *conn);
> > +
> > +GIOStream *vdagent_file_open(const gchar *path)
> > +{
> > +    gint fd;
> > +
> > +    fd = g_open(path, O_RDWR);
> > +    if (fd == -1) {
> > +        syslog(LOG_ERR, "%s: %m", __func__);
> > +        return NULL;
> > +    }
> > +
> > +    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
> > +                                  g_unix_output_stream_new(fd, TRUE));
> > +}
> > +
> > +GIOStream *vdagent_socket_connect(const gchar *address)
> > +{
> > +    GSocketConnection *socket_conn;
> > +    GSocketClient *client;
> > +    GSocketConnectable *connectable;
> > +    GError *err = NULL;
> > +
> > +    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
> > +    client = g_object_new(G_TYPE_SOCKET_CLIENT,
> > +                          "family", G_SOCKET_FAMILY_UNIX,
> > +                          "type", G_SOCKET_TYPE_STREAM,
> > +                          NULL);
> > +
> > +    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
> > +    g_object_unref(client);
> > +    g_object_unref(connectable);
> > +    if (err) {
> > +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> > +        g_error_free(err);
> > +    }
> > +    return G_IO_STREAM(socket_conn);
> > +}
>
> Not convinced that this API is really needed? The function can be
> kept but to be used internally by vdagent_connection_new()
> itself, I guess. I don't see other components using GIOStream
> unless to pass it to vdagent_connection_new ().

The rationale behind this was actually purely cosmetic.
We could have something like vdagent_connection_new_file() and
vdagent_connection_new_socket(),
but both functions would need to have 7 arguments and we would need to
initialize all the members of VDAgentConnection struct in both of the
functions.
So having just one common vdagent_connection_new() function seemed
cleaner to me.
>
> Might make sense to make socket's address as VDAgentConnection's
> property which should be set on g_object_new () too.
>
> Reviewed-by: Victor Toso <victortoso@redhat.com>
>
> Victor
>
> > +
> > +VDAgentConnection *vdagent_connection_new(
> > +    GIOStream              *io_stream,
> > +    gboolean                wait_on_opening,
> > +    gsize                   header_size,
> > +    VDAgentConnHeaderReadCb header_read_cb,
> > +    VDAgentConnReadCb       read_cb,
> > +    VDAgentConnErrorCb      error_cb,
> > +    gpointer                user_data)
> > +{
> > +    VDAgentConnection *conn;
> > +
> > +    conn = g_new(VDAgentConnection, 1);
> > +    conn->io_stream = io_stream;
> > +    conn->cancellable = g_cancellable_new();
> > +    conn->opening = wait_on_opening;
> > +    conn->write_queue = g_queue_new();
> > +    conn->flush_loop = NULL;
> > +    conn->read_cb = read_cb;
> > +    conn->read_buff = NULL;
> > +    conn->header_buff = g_malloc(header_size);
> > +    conn->header_size = header_size;
> > +    conn->header_read_cb = header_read_cb;
> > +    conn->error_cb = error_cb;
> > +    conn->credentials = NULL;
> > +    conn->user_data = user_data;
> > +
> > +    request_message_read(conn);
> > +
> > +    return conn;
> > +}
> > +
> > +static gboolean connection_has_pending(VDAgentConnection *conn)
> > +{
> > +    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
> > +    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
> > +
> > +    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
> > +}
> > +
> > +/* Free up all resources used by VDAgentConnection
> > + * once all I/O operations have finished. */
> > +static void connection_finalize(VDAgentConnection *conn)
> > +{
> > +    g_object_unref(conn->cancellable);
> > +    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
> > +    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> > +    g_clear_object(&conn->credentials);
> > +    g_free(conn->header_buff);
> > +    g_free(conn->read_buff);
> > +    g_object_unref(conn->io_stream);
> > +    g_free(conn);
> > +}
> > +
> > +void vdagent_connection_destroy(VDAgentConnection *conn)
> > +{
> > +    /* If there's a pending I/O operation on either of the streams, cancel it,
> > +     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
> > +    if (connection_has_pending(conn))
> > +        g_cancellable_cancel(conn->cancellable);
> > +    else
> > +        connection_finalize(conn);
> > +}
> > +
> > +static void handle_io_error(VDAgentConnection *conn, GError *err)
> > +{
> > +    if (g_cancellable_is_cancelled(conn->cancellable)) {
> > +        if (!connection_has_pending(conn))
> > +            connection_finalize(conn);
> > +    } else {
> > +        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
> > +        conn->error_cb(conn->user_data);
> > +    }
> > +    g_error_free(err);
> > +}
> > +
> > +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
> > +{
> > +    GSocket *socket;
> > +    GError *err = NULL;
> > +
> > +    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
> > +
> > +    if (conn->credentials)
> > +        return conn->credentials;
> > +
> > +    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
> > +    conn->credentials = g_socket_get_credentials(socket, &err);
> > +    if (err) {
> > +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> > +        g_error_free(err);
> > +    }
> > +    return conn->credentials;
> > +}
> > +
> > +static void message_write_cb(GObject      *source_object,
> > +                             GAsyncResult *res,
> > +                             gpointer      user_data)
> > +{
> > +    VDAgentConnection *conn = user_data;
> > +    GOutputStream *out = G_OUTPUT_STREAM(source_object);
> > +    GError *err = NULL;
> > +
> > +    g_output_stream_write_all_finish(out, res, NULL, &err);
> > +    g_bytes_unref(g_queue_pop_head(conn->write_queue));
> > +
> > +    if (err)
> > +        return handle_io_error(conn, err);
> > +
> > +    conn->opening = FALSE;
> > +
> > +    if (g_queue_is_empty(conn->write_queue))
> > +        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> > +    else
> > +        request_message_write(conn);
> > +}
> > +
> > +static void request_message_write(VDAgentConnection *conn)
> > +{
> > +    GBytes *msg;
> > +    GOutputStream *out;
> > +
> > +    msg = g_queue_peek_head(conn->write_queue);
> > +    out = g_io_stream_get_output_stream(conn->io_stream);
> > +
> > +    g_output_stream_write_all_async(out,
> > +        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
> > +        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
> > +}
> > +
> > +void vdagent_connection_write(VDAgentConnection *conn,
> > +                              gpointer           data,
> > +                              gsize              size)
> > +{
> > +    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
> > +
> > +    if (g_queue_get_length(conn->write_queue) == 1)
> > +        request_message_write(conn);
> > +}
> > +
> > +void vdagent_connection_flush(VDAgentConnection *conn)
> > +{
> > +    GMainLoop *loop;
> > +    /* TODO: allow multiple flush calls at once? */
> > +    g_return_if_fail(conn->flush_loop == NULL);
> > +
> > +    if (g_queue_is_empty(conn->write_queue))
> > +        return;
> > +
> > +    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
> > +    /* When using GTK+, this should be wrapped with
> > +     * gdk_threads_leave() and gdk_threads_enter(),
> > +     * but since flush is used in virtio-port.c only
> > +     * let's leave it as it is for now. */
> > +    g_main_loop_run(loop);
> > +    g_main_loop_unref(loop);
> > +}
> > +
> > +static void message_read_cb(GObject      *source_object,
> > +                            GAsyncResult *res,
> > +                            gpointer      user_data)
> > +{
> > +    VDAgentConnection *conn = user_data;
> > +    GInputStream *in = G_INPUT_STREAM(source_object);
> > +    GError *err = NULL;
> > +    gsize bytes_read, data_size;
> > +
> > +    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
> > +    if (err)
> > +        return handle_io_error(conn, err);
> > +    if (bytes_read == 0) {
> > +        /* see virtio-port.c for the rationale behind this */
> > +        if (conn->opening) {
> > +            g_usleep(10000);
> > +            request_message_read(conn);
> > +        } else {
> > +            conn->error_cb(conn->user_data);
> > +        }
> > +        return;
> > +    }
> > +    conn->opening = FALSE;
> > +
> > +    if (conn->read_buff == NULL) {
> > +        /* we've read the message header, now let's read its body */
> > +        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
> > +            return;
> > +        if (data_size > 0) {
> > +            conn->read_buff = g_malloc(data_size);
> > +            /* TODO: if allocation fails, we could try g_input_stream_skip()
> > +             * and hope that the message wasn't crucial for proper functiong.
> > +             * An example might be when a user tries to copy large clipboard.
> > +             * Not sure whether it's worth implementing.
> > +             * Other stuff might just as well fall apart
> > +             * when the system is running out of memory? */
> > +            g_input_stream_read_all_async(in, conn->read_buff, data_size,
> > +                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> > +            return;
> > +        }
> > +    }
> > +
> > +    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
> > +        return;
> > +    g_clear_pointer(&conn->read_buff, g_free);
> > +    request_message_read(conn);
> > +}
> > +
> > +static void request_message_read(VDAgentConnection *conn)
> > +{
> > +    GInputStream *in;
> > +    in = g_io_stream_get_input_stream(conn->io_stream);
> > +
> > +    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
> > +        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> > +}
> > diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
> > new file mode 100644
> > index 0000000..6fc0081
> > --- /dev/null
> > +++ b/src/vdagent-connection.h
> > @@ -0,0 +1,103 @@
> > +/*  vdagent-connection.h
> > +
> > +    Copyright 2018 Red Hat, Inc.
> > +
> > +    This program is free software: you can redistribute it and/or modify
> > +    it under the terms of the GNU General Public License as published by
> > +    the Free Software Foundation, either version 3 of the License, or
> > +    (at your option) any later version.
> > +
> > +    This program is distributed in the hope that it will be useful,
> > +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +    GNU General Public License for more details.
> > +
> > +    You should have received a copy of the GNU General Public License
> > +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> > +*/
> > +
> > +#ifndef __VDAGENT_CONNECTION_H
> > +#define __VDAGENT_CONNECTION_H
> > +
> > +#include <glib.h>
> > +#include <gio/gio.h>
> > +
> > +typedef struct VDAgentConnection VDAgentConnection;
> > +
> > +/* Called when a message header has been read.
> > + *
> > + * If the handler wishes to continue reading,
> > + * it must set @body_size to the size of message's body and return TRUE.
> > + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
> > + *
> > + * Otherwise the handler should return FALSE
> > + * and call vdagent_connection_destroy().
> > + *
> > + * @header_buff is owned by VDAgentConnection and must not be freed. */
> > +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
> > +                                            gsize   *body_size,
> > +                                            gpointer user_data);
> > +
> > +/* Called when a full message has been read.
> > + *
> > + * If the handler wished to continue reading, it must return TRUE,
> > + * otherwise FALSE and call vdagent_connection_destroy().
> > + *
> > + * @header, @data are owned by VDAgentConnection and must not be freed. */
> > +typedef gboolean (*VDAgentConnReadCb)(gpointer header,
> > +                                      gpointer data,
> > +                                      gpointer user_data);
> > +
> > +/* Called when an error occured during read or wirte.
> > + * The handler is expected to call vdagent_connection_destroy(). */
> > +typedef void (*VDAgentConnErrorCb)(gpointer user_data);
> > +
> > +/* Open a file in @path for read and write.
> > + * Returns a GIOStream to the given file or NULL on error. */
> > +GIOStream *vdagent_file_open(const gchar *path);
> > +
> > +/* Create a socket and initiate a new connection to the socket on @address.
> > + * Returns a GIOStream corresponding to the new connection or NULL on error. */
> > +GIOStream *vdagent_socket_connect(const gchar *address);
> > +
> > +/* Create new VDAgentConnection and start reading incoming messages.
> > + *
> > + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
> > + * until the first message is successfully read or written to the @io_stream.
> > + *
> > + * @user_data will be passed to the supplied callbacks. */
> > +VDAgentConnection *vdagent_connection_new(
> > +    GIOStream              *io_stream,
> > +    gboolean                wait_on_opening,
> > +    gsize                   header_size,
> > +    VDAgentConnHeaderReadCb header_read_cb,
> > +    VDAgentConnReadCb       read_cb,
> > +    VDAgentConnErrorCb      error_cb,
> > +    gpointer                user_data);
> > +
> > +/* Free up all resources associated with the VDAgentConnection.
> > + *
> > + * This operation can be asynchronous. */
> > +void vdagent_connection_destroy(VDAgentConnection *conn);
> > +
> > +/* Append a message to write queue.
> > + *
> > + * VDAgentConnection takes ownership of the @data
> > + * and frees it once the message is flushed. */
> > +void vdagent_connection_write(VDAgentConnection *conn,
> > +                              gpointer           data,
> > +                              gsize              size);
> > +
> > +/* Waits until all queued messages get written to the output stream.
> > + *
> > + * Note: other GSources can be triggered during this call */
> > +void vdagent_connection_flush(VDAgentConnection *conn);
> > +
> > +/* Returns the credentials of the foreign process connected to the socket.
> > + *
> > + * It is an error to call this function with a VDAgentConnection
> > + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
> > +GCredentials *vdagent_connection_get_peer_credentials(
> > +    VDAgentConnection *conn);
> > +
> > +#endif
> > --
> > 2.17.1
> >
> > _______________________________________________
> > Spice-devel mailing list
> > Spice-devel@lists.freedesktop.org
> > https://lists.freedesktop.org/mailman/listinfo/spice-devel