add transport/sock/line_proto/etc code compiles new-transport
authorTero Marttila <terom@fixme.fi>
Tue, 28 Apr 2009 22:08:59 +0300
branchnew-transport
changeset 156 6534a4ac957b
parent 155 c59d3eaff0fb
child 157 1e5674d0eec4
add transport/sock/line_proto/etc code compiles
src/CMakeLists.txt
src/error.c
src/error.h
src/irc_client.c
src/irc_conn.c
src/irc_net.h
src/irc_net_connect.c
src/line_proto.c
src/line_proto.h
src/log.c
src/log.h
src/sock_gnutls.c
src/sock_tcp.c
src/transport.c
src/transport.h
src/transport_fd.c
src/transport_fd.h
src/transport_internal.h
--- a/src/CMakeLists.txt	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/CMakeLists.txt	Tue Apr 28 22:08:59 2009 +0300
@@ -11,7 +11,7 @@
 
 # define our source code modules
 set (CORE_SOURCES error.c log.c str.c)
-set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c)
+set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c line_proto.c)
 set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c irc_net_connect.c)
 set (LUA_SOURCES nexus_lua.c lua_objs.c lua_config.c lua_irc.c lua_func.c lua_type.c)
 set (CONSOLE_SOURCES console.c lua_console.c)
--- a/src/error.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/error.c	Tue Apr 28 22:08:59 2009 +0300
@@ -90,10 +90,17 @@
     {   ERR_LUA_ERR,                        "lua: error handling error",                ERR_EXTRA_STR       },
     {   ERR_LUA_FILE,                       "lua: error loading file",                  ERR_EXTRA_STR       },
     {   _ERR_INVALID,                       NULL,                                       0                   }
+
 }, _pcre_error_desc[] = {
     {   ERR_PCRE_COMPILE,                   "pcre_compile",                             ERR_EXTRA_STR       },
     {   ERR_PCRE_EXEC,                      "pcre_exec",                                ERR_EXTRA_STR       },
     {   _ERR_INVALID,                       NULL,                                       0                   }
+}, _general_error_desc[] = {
+    {   ERR_MISC,                           "miscellaneous error",                      ERR_EXTRA_STR       },
+    {   ERR_CMD_OPT,                        "invalid command line option",              ERR_EXTRA_STR       },
+    {   ERR_DUP_NAME,                       "duplicate name",                           ERR_EXTRA_STR       },
+    {   ERR_EOF,                            "EOF",                                      ERR_EXTRA_NONE      },
+    {   _ERR_INVALID,                       NULL,                                       0                   }
 };
 
 /**
--- a/src/error.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/error.h	Tue Apr 28 22:08:59 2009 +0300
@@ -72,6 +72,7 @@
     _ERR_LIBEVENT   = 0x000500,
     ERR_EVENT_NEW,
     ERR_EVENT_ADD,
+    ERR_EVENT_DEL,
 
     /** Evsql errors */
     _ERR_EVSQL      = 0x000600,
@@ -131,10 +132,11 @@
 
     /** General errors */
     _ERR_GENERAL    = 0xffff00,
-    ERR_CMD_OPT,
+    ERR_MISC,               ///< general error
+    ERR_CMD_OPT,            ///< invalid commandline option
     ERR_UNKNOWN,
-    ERR_DUP_NAME,
-    ERR_EOF,
+    ERR_DUP_NAME,           ///< duplicate name
+    ERR_EOF,                ///< end of file
 
 };
 
--- a/src/irc_client.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_client.c	Tue Apr 28 22:08:59 2009 +0300
@@ -58,8 +58,8 @@
     };
 
     // combine _net_info and defaults to get net_info
-    if (_net_info->raw_sock)
-        RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "raw_sock");
+    if (_net_info->transport)
+        RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "transport");
 
     if ((net_info.network = _net_info->network) == NULL)
         RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "network");
--- a/src/irc_conn.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_conn.c	Tue Apr 28 22:08:59 2009 +0300
@@ -292,7 +292,7 @@
 {
     // the line_proto
     if (conn->lp)
-        line_proto_release(conn->lp);
+        line_proto_destroy(conn->lp);
 
     // the queue
     if (conn->out_queue)
--- a/src/irc_net.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_net.h	Tue Apr 28 22:08:59 2009 +0300
@@ -35,8 +35,8 @@
     /** Protocol registration info (nickname etc) */
     struct irc_conn_register_info register_info;
 
-    /** Raw socket to use, mainly for testing purposes */
-    struct sock_stream *raw_sock;
+    /** Alternatively, raw transport to use, mainly for testing purposes */
+    transport_t *transport;
 };
 
 /**
--- a/src/irc_net_connect.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_net_connect.c	Tue Apr 28 22:08:59 2009 +0300
@@ -1,5 +1,6 @@
 
 #include "irc_net_internal.h"
+#include "sock_tcp.h"
 #include "log.h"
 
 #include <time.h>
@@ -25,18 +26,18 @@
 }
 
 /**
- * We have succesfully established a connection to our server with the given sock, so create the irc_conn and bind it
- * to us.
+ * We have succesfully established a connection to our server with the given transport, so create the irc_conn and bind
+ * it to us.
  *
- * If this fails, this will clean up any partial state, including sock.
+ * If this fails, this will clean up any partial state, including \a transport.
  */
-static err_t irc_net_connected (struct irc_net *net, struct sock_stream *sock, struct error_info *err)
+static err_t irc_net_connected (struct irc_net *net, transport_t *transport, struct error_info *err)
 {
     // mark state
     net->connecting = false;
 
     // create the irc connection state
-    if (irc_conn_create(&net->conn, sock, &irc_net_conn_callbacks, net, err))
+    if (irc_conn_create(&net->conn, transport, &irc_net_conn_callbacks, net, err))
         goto error;
 
     // add our command handlers
@@ -55,8 +56,8 @@
 
 error:
     if (!net->conn) {
-        // cleanup sock ourselves
-        sock_stream_release(sock);
+        // cleanup transport ourselves
+        transport_destroy(transport);
 
     } else {
         // cleanup the partial stuff
@@ -72,26 +73,36 @@
  * Our sock_*_connect_async callback. If the connect ended up failing, then try and reconnect later. Otherwise, do
  * irc_net_connected().
  */
-static void irc_net_connect_cb (struct sock_stream *sock, struct error_info *conn_err, void *arg)
+static void irc_net_on_connect (transport_t *transport, void *arg)
 {
     struct irc_net *net = arg;
-    struct error_info err;
+    error_t err;
     
-    if (conn_err) {
-        // attempt reconnect later
-        log_err_info(conn_err, "connect failed");
-        
-        if (irc_net_connect(net, false, &err))
-            log_err_info(&err, "unable to reconnect");
+    // yay
+    if (irc_net_connected(net, transport, &err))
+        log_err_info(&err, "irc_net_connected");
+}
 
-    } else {
-        // yay
-        if (irc_net_connected(net, sock, &err))
-            log_err_info(&err, "irc_net_connected");
+static void irc_net_on_connect_error (transport_t *transport, const error_t *conn_err, void *arg)
+{
+    struct irc_net *net = arg;
+    error_t err;
 
-    }
+    // clean up
+    transport_destroy(transport);
+
+    // attempt reconnect later
+    log_err_info(conn_err, "connect failed");
+    
+    if (irc_net_connect(net, false, &err))
+        log_err_info(&err, "unable to reconnect");
 }
 
+static const struct transport_callbacks irc_net_transport_callbacks = {
+    .on_connect = irc_net_on_connect,
+    .on_error   = irc_net_on_connect_error,
+};
+
 /**
  * The low-level connect() implementation, connects based on irc_net::info, calling irc_net_connected/irc_net_reconnect
  * later if this succeeds.
@@ -99,20 +110,24 @@
 static err_t irc_net_do_connect (struct irc_net *net, struct error_info *err)
 {
     struct irc_net_info *info = &net->info;
-    struct sock_stream *sock = NULL;
+    struct transport_info transport_info = { &irc_net_transport_callbacks, net, TRANSPORT_READ | TRANSPORT_WRITE };
+    transport_t *transport = NULL;
 
     // sanity check
     assert(!net->connecting && !net->connected);
 
     // connect based on what's known
-    if (info->raw_sock) {
-        log_debug("connected using raw socket: %p", info->raw_sock);
+    if (info->transport) {
+        log_debug("connected using raw transport: %p", info->transport);
 
-        // direct sock_stream connection
-        sock = info->raw_sock;
+        // direct transport connection
+        transport = info->transport;
 
-        // then create the conn right away
-        if (irc_net_connected(net, sock, err))
+        // invalidate it from info since it will get destroyed later
+        info->transport = NULL;
+
+        // then create the transport right away
+        if (irc_net_connected(net, transport, err))
             goto error;
 
     } else if (info->ssl_cred) {
@@ -123,20 +138,23 @@
         log_debug("connecting to [%s]:%s using SSL", info->hostname, info->service);
 
         // connect
-        if (sock_ssl_connect_async(&sock, info->hostname, info->service, info->ssl_cred, &irc_net_connect_cb, net, err))
+        if (sock_ssl_connect(&transport_info, &transport, info->hostname, info->service, info->ssl_cred, err))
             goto error;
 
         net->connecting = true;
 
-    } else {
+    } else if (info->hostname || info->service) {
         log_debug("connecting to [%s]:%s", info->hostname, info->service);
             
         // begin async connect
-        if (sock_tcp_connect_async(&sock, info->hostname, info->service, &irc_net_connect_cb, net, err))
+        if (sock_tcp_connect(&transport_info, &transport, info->hostname, info->service, err))
             goto error;
         
         net->connecting = true;
 
+    } else {
+        RETURN_SET_ERROR_STR(err, ERR_MISC, "no connection info specified");
+
     }
 
     return SUCCESS;
@@ -161,9 +179,6 @@
         log_err_info(&err, "unable to reconnect");
 }
 
-// XXX: to get the ev_base
-#include "sock_internal.h"
-
 /**
  * Schedule a reconnection attempt in IRC_NET_RECONNECT_INTERVAL.
  */
@@ -204,6 +219,10 @@
     return ERROR_CODE(err);
 }
 
+// XXX: to get the ev_base
+#include "sock_internal.h"
+
+
 err_t irc_net_connect_init (struct irc_net *net, struct error_info *err)
 {
     // look up the ev_base 
--- a/src/line_proto.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/line_proto.c	Tue Apr 28 22:08:59 2009 +0300
@@ -10,8 +10,8 @@
  * Our state
  */
 struct line_proto {
-    /* The sock_stream we read/write with */
-    struct sock_stream *sock;
+    /* The transport we read/write with */
+    transport_t *transport;
 
     /* The incoming/outgoing line buffer */
     char *in_buf, *out_buf;
@@ -36,9 +36,6 @@
     void *cb_arg;
 };
 
-// function prototypes
-static err_t line_proto_schedule_events (struct line_proto *lp, short what);
-
 /**
  * An error occured which we could not recover from; the line_proto should now be considered corrupt.
  *
@@ -54,74 +51,54 @@
 }
 
 /**
- * Our sock_stream on_read handler
+ * Our transport_callbacks::on_read handler
  */
-static void line_proto_on_read (struct sock_stream *sock, void *arg)
+static void line_proto_on_read (transport_t *transport, void *arg)
 {
     struct line_proto *lp = arg;
     char *line;
 
-    (void) sock;
+    (void) transport;
 
     // sanity-check
     assert(lp->tail_offset < lp->buf_len);
     
     do {
         // attempt to read a line
-        if (line_proto_recv(lp, &line)) {
+        if (line_proto_recv(lp, &line))
             // faaail
             return line_proto_set_error(lp);
-        }
 
         // got a line?
         if (line)
             lp->callbacks.on_line(line, lp->cb_arg);
 
     } while (line);
-
-    // reschedule
-    if (line_proto_schedule_events(lp, EV_READ))
-        line_proto_set_error(lp);
 }
 
 /*
  * Signal for write
  */
-static void line_proto_on_write (struct sock_stream *sock, void *arg)
+static void line_proto_on_write (transport_t *transport, void *arg)
 {
     struct line_proto *lp = arg;
     int ret;
 
-    (void) sock;
+    (void) transport;
 
     // just flush
-    if ((ret = line_proto_flush(lp)) < 0) {
-        // faaaail
-        SET_ERROR(&lp->err, -ret);
-
+    if ((ret = line_proto_flush(lp)) < 0)
+        // faaail
         return line_proto_set_error(lp);
-    }
 }
 
-/*
- * Schedule our sock_stream callback
- */
-static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
-{
-    // just use sock_stream's interface
-    if (sock_stream_event_enable(lp->sock, what))
-        RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
-    return SUCCESS;
-}
-
-struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+static const struct transport_callbacks line_proto_transport_callbacks = {
     .on_read    = &line_proto_on_read,
     .on_write   = &line_proto_on_write,
 };
 
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
-        const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err)
+err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
+        const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err)
 {
     struct line_proto *lp;
 
@@ -134,17 +111,16 @@
         JUMP_SET_ERROR(err, ERR_CALLOC);
 
     // store
-    lp->sock = sock;
+    lp->transport = transport;
     lp->buf_len = buf_size;
     lp->callbacks = *callbacks;
     lp->cb_arg = cb_arg;
 
-    // initialize event-based stuff
-    if (
-            sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp)
-        ||  line_proto_schedule_events(lp, EV_READ)
-    )
-        JUMP_SET_ERROR_INFO(err, &lp->err);
+    // setup the transport
+    transport_set_callbacks(transport, &line_proto_transport_callbacks, lp);
+    
+    if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
+        goto error;
 
     // return
     *lp_ptr = lp;
@@ -154,7 +130,7 @@
 error:
     // cleanup the lp
     if (lp)
-        line_proto_release(lp);
+        line_proto_destroy(lp);
 
     return ERROR_CODE(err);
 }
@@ -237,9 +213,8 @@
             return ERR_LINE_TOO_LONG;
         
         // otherwise, read more data
-        if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
-            // store and return NULL on errors
-            RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+        if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0)
+            return ERROR_CODE(&lp->err);
 
         // EAGAIN?
         if (ret == 0) {
@@ -271,11 +246,8 @@
         return -ERR_LINE_TOO_LONG;
     
     // try and write the line
-    if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
-        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
+    if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0)
         return -ERROR_CODE(&lp->err);
-    }
 
     // length of the sent data
     ret_len = ret;
@@ -294,11 +266,7 @@
         // update offset
         lp->out_offset = trailing;
         
-        // register for EV_WRITE
-        if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
-            return -ERROR_CODE(&lp->err);
-
-        // buffered...
+        // buffered... transport should invoke on_write itself
         return 1;
 
     } else {
@@ -314,11 +282,8 @@
     size_t ret_len;
 
     // try and write the line
-    if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
-        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
+    if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0)
         return -ERROR_CODE(&lp->err);
-    }
 
     ret_len = ret;
 
@@ -340,10 +305,6 @@
         lp->out_offset = remaining;
     }
 
-    // register for EV_WRITE
-    if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
-        return -ERROR_CODE(&lp->err);
-
     // ok
     return 1;
 }
@@ -354,15 +315,15 @@
     return &lp->err;
 }
 
-void line_proto_release (struct line_proto *lp)
+void line_proto_destroy (struct line_proto *lp)
 {
     // free buffers
     free(lp->in_buf);
     free(lp->out_buf);
 
     // socket?
-    if (lp->sock)
-        sock_stream_release(lp->sock);
+    if (lp->transport)
+        transport_destroy(lp->transport);
 
     // free the state itself
     free(lp);
--- a/src/line_proto.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/line_proto.h	Tue Apr 28 22:08:59 2009 +0300
@@ -38,10 +38,10 @@
  * @param err error information is returned via this pointer
  */
 err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
-        const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err);
+        const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err);
 
 /**
- * Runs the socket recv() into our internal buffer. If a full line was received, a pointer to our internal bufffer is
+ * Runs transport_read() with our internal buffer. If a full line was received, a pointer to our internal bufffer is
  * returned via *line_ptr, and we return SUCCESS. If we don't yet have a full line, and receiving more would block,
  * NULL is returned via *line_ptr instead. Otherwise, nonzero error return code.
  *
@@ -71,10 +71,10 @@
 const struct error_info* line_proto_error (struct line_proto *lp);
 
 /**
- * Release any allocated buffers, and the underlying sock_stream.
+ * Destroy any buffers and the underlying transport.
  *
  * This does not close the connection cleanly, and is intended for use to abort after errors.
  */
-void line_proto_release (struct line_proto *lp);
+void line_proto_destroy (struct line_proto *lp);
 
 #endif /* LINE_PROTO_H */
--- a/src/log.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/log.c	Tue Apr 28 22:08:59 2009 +0300
@@ -124,7 +124,7 @@
     va_end(vargs);
 }
 
-void _log_err (enum log_level level, struct error_info *err, const char *func, const char *format, ...)
+void _log_err (enum log_level level, const error_t *err, const char *func, const char *format, ...)
 {
     va_list vargs;
 
--- a/src/log.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/log.h	Tue Apr 28 22:08:59 2009 +0300
@@ -76,7 +76,7 @@
 /**
  * Log a message with the given level, appending the formatted error message
  */
-void _log_err (enum log_level level, struct error_info *err, const char *func, const char *format, ...)
+void _log_err (enum log_level level, const error_t *err, const char *func, const char *format, ...)
     __attribute__ ((format (printf, 4, 5)));
 
 /**
--- a/src/sock_gnutls.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/sock_gnutls.c	Tue Apr 28 22:08:59 2009 +0300
@@ -25,12 +25,12 @@
     switch ((ret = gnutls_record_get_direction(sock->session))) {
         case 0: 
             // read more data
-            mask = EV_READ;
+            mask = TRANSPORT_READ;
             break;
         
         case 1:
             // write buffer full
-            mask = EV_WRITE;
+            mask = TRANSPORT_WRITE;
             break;
         
         default:
@@ -39,7 +39,7 @@
     }
     
     // do the enabling
-    if ((ERROR_CODE(err) = transport_fd_enable(SOCK_GNUTLS_FD(sock), EV_READ)))
+    if ((ERROR_CODE(err) = transport_fd_enable(SOCK_GNUTLS_FD(sock), mask)))
         return ERROR_CODE(err);
     
 
--- a/src/sock_tcp.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/sock_tcp.c	Tue Apr 28 22:08:59 2009 +0300
@@ -16,7 +16,6 @@
 
 
 
-
 /**
  * Our transport_methods
  */
@@ -35,6 +34,7 @@
     .methods                = {
         .read               = transport_fd_methods_read,
         .write              = transport_fd_methods_write,
+        .events             = transport_fd_methods_events,
         .destroy            = _sock_tcp_destroy,
     },
 };
@@ -138,16 +138,35 @@
 }
 
 /**
- * Our async connect operation has completed, clean up and call the user callback. The given \a err should be NULL for
- * successful completion, or the error for failures.
+ * Our async connect operation has completed, clean up, set up state for event-based operation with user callbacks, and
+ * invoke transport_connected().
+ *
+ * The given \a err should be NULL for successful completion, or the error for failures.
  */
 static void sock_tcp_connect_done (struct sock_tcp *sock, struct error_info *err)
 {
     // cleanup
     sock_tcp_connect_cleanup(sock);
+
+    if (err)
+        // passthrough errors
+        goto error;
     
-    // ok, run callback
-    transport_connected(SOCK_TCP_TRANSPORT(sock), err, false);
+    // install the transport_invoke callback handler
+    if ((ERROR_CODE(err) = transport_fd_setup(SOCK_TCP_FD(sock), transport_fd_callback_user, NULL)))
+        goto error;
+
+    // enable read unless masked out
+    if (SOCK_TCP_TRANSPORT(sock)->info.ev_mask & TRANSPORT_READ) {
+        if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), TRANSPORT_READ)))
+            goto error;
+    }
+
+    // ok, no error
+
+error:                
+    // pass on to transport
+    transport_connected(SOCK_TCP_TRANSPORT(sock), IS_ERROR(err) ? err : NULL, false);
 }
 
 /**
@@ -212,7 +231,7 @@
             goto error;
     
         // enable for write
-        if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), EV_WRITE)))
+        if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), TRANSPORT_WRITE)))
             goto error;
 
     } else {
--- a/src/transport.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport.c	Tue Apr 28 22:08:59 2009 +0300
@@ -2,6 +2,9 @@
 
 #include <assert.h>
 
+/*
+ * Internal API
+ */
 void transport_init (transport_t *transport, const struct transport_type *type, const struct transport_info *info)
 {
     // not already bound
@@ -61,3 +64,71 @@
     // invoke callback
     transport->info.cb_tbl->on_error(transport, err, transport->info.cb_arg);
 }
+
+/*
+ * Public API
+ */
+int transport_read (transport_t *transport, void *buf, size_t len, error_t *err)
+{
+    // not readable
+    if (!transport->type->methods.read)
+        return -1;
+
+    // proxy off to method handler
+    if (transport->type->methods.read(transport, buf, &len, err))
+        return -ERROR_CODE(err);
+    
+    // return updated bytes-read len
+    return len;
+}
+
+int transport_write (transport_t *transport, const void *buf, size_t len, error_t *err)
+{
+    // XXX: not writeable
+    if (!transport->type->methods.write)
+        return -1;
+
+    // proxy off to method handler
+    if (transport->type->methods.write(transport, buf, &len, err))
+        return -ERROR_CODE(err);
+
+    // return updated bytes-written len
+    return len;
+}
+
+err_t transport_events (transport_t *transport, short mask)
+{
+    error_t err;
+
+    // notify transport
+    if (transport->type->methods.events) {
+        if (transport->type->methods.events(transport, mask, &err))
+            goto error;
+    }
+
+    // update the event mask
+    transport->info.ev_mask = mask;
+
+    // ok
+    return SUCCESS;
+
+error:
+    return ERROR_CODE(&err);
+}
+
+void transport_set_callbacks (transport_t *transport, const struct transport_callbacks *cb_tbl, void *cb_arg)
+{
+    transport->info.cb_tbl = cb_tbl;
+    transport->info.cb_arg = cb_arg;
+}
+
+void transport_destroy (transport_t *transport)
+{
+    // destroy the transport-specific stuff
+    if (transport->type->methods.destroy)
+        transport->type->methods.destroy(transport);
+
+    // then the transport itself
+    free(transport);
+}
+
--- a/src/transport.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport.h	Tue Apr 28 22:08:59 2009 +0300
@@ -10,17 +10,54 @@
 #include "error.h"
 
 /**
- * Opaque transport state
+ * Opaque transport state handle.
+ *
+ * Transports are reliable byte streams, connected with some endpoint over some medium. Common implementations are
+ * e.g. TCP, SSL or fifo transports (using the OS file/socket API).
+ *
+ * Transports can be connected or unconnected. For synchronous opens (e.g. fifo_open_read()), the transport returned
+ * will already be connected, meaning that the transport_callbacks::on_connect callback is unused. For async connects
+ * such as sock_tcp_connect()/sock_ssl_connect(), the transport returned is *not* connected, and you must wait for
+ * transport_callbacks::on_connect to be called before being able to send/recieve data on the transport.
+ *
+ * Once you have an opened transport, sending and receiving data is simple - just call transport_read()/transport_write().
+ * These implement unbuffered I/O, so they may do partial reads/writes. In terms of the system read/write calls, the
+ * main difference is in the error return codes. On EOF, instead of returning zero, they return ERR_EOF (or
+ * ERR_WRITE_EOF for transport_write, for whoever knows what that means...). This means that when the underlying
+ * transport is unable to fufill the request due to lack of data/buffer space, these can return zero to signifiy s
+ * something simliar to EAGAIN.
+ *
+ * The transport API also implements non-blocking/event-based operation (usually on top of libevent), although at a
+ * slightly different level than the normal select/poll API. Instead of the user asking the transport to notify for
+ * read/write after transport_read/transport_write return zero, the transport will take care of this itself. 
+ * 
+ * Specifically, the user can supply a mask of events they are currently interested in. By default, this should be the
+ * full TRANSPORT_READ | TRANSPORT_WRITE, as the transport will take care of managing events by itself. If you wish to
+ * e.g. throttle read/write, you may set a different event mask using transport_events(), which will prevent the
+ * relevant callback from being triggered.
+ *
+ * For reads, the transport maintains a persistent read event, and will always call on_read when data is available on
+ * the socket (i.e. normal select() semantics). If masked out using transport_events(), there should be no event
+ * activity on the transport (i.e. the fd read event is removed).
+ *
+ * For writes, the transport maintains a write event that is disabled by default. If on_write returns zero, it will
+ * become enabled *once*, and consequently trigger transport_callbacks::on_write *once*, after which you must call
+ * transport_write() to possibly enable it again. If masked out using transport_events(), transport_write() will not
+ * enable the write event, and any pending write event is cancelled. If masked back in using transport_events(), the
+ * write event will *not* be registered, so if you have pending data, do a transport_write() after enabling
+ * TRANSPORT_WRITE.
  */
 struct transport;
 
 /**
- * Because it's so annoying seeing "struct" everywhere
+ * @see transport
  */
 typedef struct transport transport_t;
 
 /**
- * Callbacks for structs
+ * User callbacks for transports
+ *
+ * @see transport
  */
 struct transport_callbacks {
     /**
@@ -41,12 +78,16 @@
     /**
      * An asynchronous error has occured. This is only called for errors that occur while being called directly from
      * the underlying event loop, and never from inside an API function.
+     *
+     * You must call transport_destroy to release the transport.
      */
     void (*on_error) (transport_t *transport, const error_t *err, void *arg);
 };
 
 /**
  * Bitmask of available events
+ *
+ * @see transport
  */
 enum transport_event {
     TRANSPORT_READ  = 0x01,
@@ -55,6 +96,8 @@
 
 /**
  * User info required to build a transport
+ *
+ * @see transport
  */
 struct transport_info {
     /** The callbacks table */
@@ -101,9 +144,14 @@
 int transport_write (transport_t *transport, const void *buf, size_t len, error_t *err);
 
 /**
- * Change the mask of enabled events
+ * Change the mask of enabled events.
  */
-void transport_events (transport_t *transport, short mask);
+err_t transport_events (transport_t *transport, short mask);
+
+/**
+ * Install a new set of callback handlers, replacing the old ones.
+ */
+void transport_set_callbacks (transport_t *transport, const struct transport_callbacks *cb_tbl, void *cb_arg);
 
 /**
  * Close and destroy the transport immediately, severing any established connection rudely.
--- a/src/transport_fd.c	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_fd.c	Tue Apr 28 22:08:59 2009 +0300
@@ -74,6 +74,11 @@
         // EAGAIN -> zero bytes
         *len = 0;
 
+        if (transport->info.ev_mask & TRANSPORT_WRITE)
+            // enable the write event
+            if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE)))
+                return ERROR_CODE(err);
+
     } else {
         // normal -> bytes read
         *len = ret;
@@ -82,9 +87,26 @@
     return SUCCESS;
 }
 
+err_t transport_fd_methods_events (transport_t *transport, short mask, error_t *err)
+{
+    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
+    
+    short _mask = 0;
+
+    // enable read as requested
+    if (mask & TRANSPORT_READ)
+        _mask |= TRANSPORT_READ;
+    
+    // enable write if requested and it's currently enabled
+    if ((mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL))
+        _mask |= TRANSPORT_WRITE;
+
+    // set
+    return (ERROR_CODE(err) = transport_fd_events(fd, mask));
+}
+
 void _transport_fd_destroy (transport_t *transport)
 {
-
     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
 
     transport_fd_destroy(fd);
@@ -93,10 +115,22 @@
 const struct transport_methods transport_fd_methods = {
     .read       = transport_fd_methods_read,
     .write      = transport_fd_methods_write,
+    .events     = transport_fd_methods_events,
     .destroy    = _transport_fd_destroy
 };
 
 /**
+ * Dummy callbacks
+ */
+void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg)
+{
+    (void) arg;
+    
+    // proxy
+    transport_invoke(TRANSPORT_FD_BASE(fd), what);
+}
+
+/**
  * Function implementations
  */
 void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd)
@@ -139,7 +173,7 @@
     assert(!fd->ev_read && !fd->ev_write);
 
     // create new events
-    if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ, transport_fd_on_event, fd)) == NULL)
+    if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL)
         goto err_event_add;
 
     if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL)
@@ -166,22 +200,59 @@
     
     // install the event handlers?
     if (!fd->ev_read || !fd->ev_write)
-        transport_fd_install(fd);
+        return transport_fd_install(fd);
+    else
+        return SUCCESS;
+}
+
+err_t transport_fd_enable (struct transport_fd *fd, short mask)
+{
+    // just add the appropriate events
+    if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL))
+        return ERR_EVENT_ADD;
+ 
+    if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL))
+        return ERR_EVENT_ADD;
+    
+
+    return SUCCESS;
+}
+
+err_t transport_fd_disable (struct transport_fd *fd, short mask)
+{
+    if (mask & TRANSPORT_READ && event_del(fd->ev_read))
+        return ERR_EVENT_DEL;
+
+    if (mask & TRANSPORT_WRITE && event_del(fd->ev_write))
+        return ERR_EVENT_DEL;
 
 
     return SUCCESS;
 }
 
-err_t transport_fd_enable (struct transport_fd *fd, short mask)
+err_t transport_fd_events (struct transport_fd *fd, short mask)
 {
-    // just add the appropraite events
-    if (mask & EV_READ && event_add(fd->ev_read, NULL))
-        return ERR_EVENT_ADD;
- 
-    if (mask & EV_WRITE && event_add(fd->ev_write, NULL))
-        return ERR_EVENT_ADD;
+    err_t err;
     
+    // enable/disable read
+    if (mask & TRANSPORT_READ)
+        err = event_add(fd->ev_read, NULL);
+    else
+        err = event_del(fd->ev_read);
 
+    if (err)
+        return err;
+
+    // enable/disable write
+    if (mask & TRANSPORT_WRITE)
+        err = event_add(fd->ev_write, NULL);
+    else
+        err = event_del(fd->ev_write);
+
+    if (err)
+        return err;
+
+    // ok
     return SUCCESS;
 }
 
@@ -228,16 +299,8 @@
 
 void transport_fd_invoke (struct transport_fd *fd, short what)
 {
-    short _what = 0;
-
-    if (what & EV_READ)
-        _what |= TRANSPORT_READ;
-
-    if (what & EV_WRITE)
-        _what |= TRANSPORT_WRITE;
-    
     // invoke
-    transport_invoke(TRANSPORT_FD_BASE(fd), _what);
+    transport_invoke(TRANSPORT_FD_BASE(fd), what);
 }
 
 err_t transport_fd_close (struct transport_fd *fd)
--- a/src/transport_fd.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_fd.h	Tue Apr 28 22:08:59 2009 +0300
@@ -19,7 +19,7 @@
 /**
  * Low-level callback
  */
-typedef void (*transport_fd_callback_func) (struct transport_fd *tp_fd, short what, void *arg);
+typedef void (*transport_fd_callback_func) (struct transport_fd *fd, short what, void *arg);
 
 /**
  * The fd-based transport implementation
@@ -61,9 +61,21 @@
 err_t transport_fd_methods_read (transport_t *transport, void *buf, size_t *len, error_t *err);
 
 /**
- * Implementation of transport_methods::write
+ * Implementation of transport_methods::write.
+ *
+ * If this gets EAGAIN, it will automatically enable the write event, unless masked out.
  */
-err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, struct error_info *err);
+err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, error_t *err);
+
+/**
+ * Implementation of transport_methods::events.
+ *
+ * For TRANSPORT_READ, this will simply apply enable/disable as given.
+ *
+ * For TRANSPORT_WRITE, the write event will only be enabled if given in the mask, *and* the ev_write event is currently
+ * active (via transport_fd_methods_write()); otherwise, the write event will not be enabled.
+ */
+err_t transport_fd_methods_events (transport_t *transport, short mask, error_t *err);
 
 /**
  * The transport_methods struct
@@ -71,6 +83,16 @@
 extern const struct transport_methods transport_fd_methods;
 
 /**
+ * A transport_fd_callback_func that simply invokes the transport_callback user functions.
+ *
+ * Register with a NULL cb_arg.
+ */
+void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg);
+
+
+
+
+/**
  * Initialize the transport_fd to use the given, connected fd, or TRANSPORT_FD_INVALID if we don't yet have an fd.
  *
  * It is an error to call this if the transport_fd already has an fd set
@@ -94,11 +116,21 @@
 err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg);
 
 /**
- * Enable the callbacks for the specified events, any of { EV_WRITE, EV_READ }.
+ * Enable the specified events, any of { EV_WRITE, EV_READ }.
  */
 err_t transport_fd_enable (struct transport_fd *fd, short mask);
 
 /**
+ * Disable the specifid events, any of { EV_WRITE, EV_READ }.
+ */
+err_t transport_fd_disable (struct transport_fd *fd, short mask);
+
+/**
+ * Set the enable/disable state of our events to the given mask of { EV_WRITE, EV_READ }.
+ */
+err_t transport_fd_events (struct transport_fd *fd, short mask);
+
+/**
  * Remove any old event callback present, so it will not be called anymore.
  *
  * It is perfectly safe to call this without any callbacks installed.
--- a/src/transport_internal.h	Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_internal.h	Tue Apr 28 22:08:59 2009 +0300
@@ -11,7 +11,9 @@
 #include <stdbool.h>
 
 /**
- * Method table for implementation stuff
+ * Method table for implementation stuff.
+ *
+ * Note that it is the transport's resposibility to implement the behaviour described in transport.h
  */
 struct transport_methods {
     /** For transport_read() */
@@ -20,6 +22,13 @@
     /** For transport_write() */
     err_t (*write) (transport_t *transport, const void *buf, size_t *len, error_t *err);
 
+    /**
+     * The mask of event flags will be set to the given mask if this method is succesfully.
+     *
+     * The old mask is still available in transport::info::ev_mask.
+     */
+    err_t (*events) (transport_t *transport, short mask, error_t *err);
+
     /** 
      * Release the transport's internal state, but not the transport itself.
      *
@@ -57,7 +66,7 @@
 
     /** User info */
     struct transport_info info;
-
+    
     /** Are we connected? */
     bool connected;
 };