most of the new transport/sock code compiles, but things are still missing new-transport
authorTero Marttila <terom@fixme.fi>
Tue, 28 Apr 2009 20:27:45 +0300
branchnew-transport
changeset 155 c59d3eaff0fb
parent 154 f4472119de3b
child 156 6534a4ac957b
most of the new transport/sock code compiles, but things are still missing
src/CMakeLists.txt
src/error.h
src/fifo.h
src/irc_conn.c
src/irc_conn.h
src/irc_queue.c
src/irc_queue.h
src/line_proto.h
src/signals.c
src/sock.c
src/sock.h
src/sock_gnutls.c
src/sock_gnutls.h
src/sock_internal.h
src/sock_ssl.h
src/sock_tcp.c
src/sock_tcp.h
src/sock_tcp_internal.h
src/transport.c
src/transport.h
src/transport_fd.c
src/transport_fd.h
src/transport_internal.h
--- a/src/CMakeLists.txt	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/CMakeLists.txt	Tue Apr 28 20:27:45 2009 +0300
@@ -11,13 +11,13 @@
 
 # define our source code modules
 set (CORE_SOURCES error.c log.c str.c)
-set (SOCK_SOURCES sock.c sock_fd.c sock_tcp.c sock_gnutls.c sock_test.c sock_fifo.c line_proto.c)
+set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c)
 set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c irc_net_connect.c)
 set (LUA_SOURCES nexus_lua.c lua_objs.c lua_config.c lua_irc.c lua_func.c lua_type.c)
 set (CONSOLE_SOURCES console.c lua_console.c)
 
-set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES} ${LUA_SOURCES} ${CONSOLE_SOURCES} signals.c module.c config.c)
-set (TEST_SOURCES test.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES})
+set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${IO_SOURCES} ${IRC_SOURCES} ${LUA_SOURCES} ${CONSOLE_SOURCES} signals.c module.c config.c)
+set (TEST_SOURCES test.c ${CORE_SOURCES} ${IO_SOURCES} ${IRC_SOURCES})
 set (IRC_LOG_SOURCES modules/irc_log.c)
 set (LOGWATCH_SOURCES modules/logwatch.c modules/logwatch_source.c modules/logwatch_filter.c modules/logwatch_chan.c)
 
--- a/src/error.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/error.h	Tue Apr 28 20:27:45 2009 +0300
@@ -52,8 +52,18 @@
     ERR_GETADDRINFO,
     ERR_GETADDRINFO_EMPTY, 
     
-    /** @see sock_error_code*/
+    /** socket/IO errors */
     _ERR_SOCK       = 0x000300,
+    ERR_SOCKET,         ///< socket(2) failed
+    ERR_CONNECT,        ///< connect(2) error - either direct or async
+    ERR_READ,           ///< read(2) error - will probably show up as an ERR_WRITE as well
+    ERR_READ_EOF,       ///< EOF on read(2)
+    ERR_WRITE,          ///< write(2) error - data was unsent, will probably show up as an ERR_READ as well
+    ERR_WRITE_EOF,      ///< write(2) gave EOF - zero bytes written
+    ERR_FCNTL,          ///< fcntl(2) failed
+    ERR_CLOSE,          ///< close(2) failed, some written data was probably not sent
+    ERR_GETSOCKOPT,     ///< getsockopt(2) failed
+    ERR_OPEN,           ///< open(2) failed
 
     /** @see sock_gnutls_error_code */
     _ERR_GNUTLS     = 0x000400,
@@ -116,11 +126,15 @@
     /** str errors */
     _ERR_STR        = 0x000f00,
 
+    /** Transport errors */
+    _ERR_TRANSPORT  = 0x001000,
+
     /** General errors */
     _ERR_GENERAL    = 0xffff00,
     ERR_CMD_OPT,
     ERR_UNKNOWN,
     ERR_DUP_NAME,
+    ERR_EOF,
 
 };
 
--- a/src/fifo.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/fifo.h	Tue Apr 28 20:27:45 2009 +0300
@@ -8,11 +8,11 @@
  *
  * The transport will be ready for use right away, transport_callbacks::on_connect will never be called.
  *
- * @param transport the unconnected transport.
+ * @param transport_ptr returned transport
  * @param path the path to the filesystem fifo object
  * @param err returned error info
  */
-err_t fifo_open_read (transport_t *transport, const char *path, error_t *err);
+err_t fifo_open_read (transport_t **transport_ptr, const char *path, error_t *err);
 
 
 #endif
--- a/src/irc_conn.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/irc_conn.c	Tue Apr 28 20:27:45 2009 +0300
@@ -239,8 +239,13 @@
     .on_error       = &irc_conn_on_error,
 };
 
-err_t irc_conn_create (struct irc_conn **conn_ptr, struct sock_stream *sock, const struct irc_conn_callbacks *callbacks, 
-        void *cb_arg, struct error_info *err)
+// XXX: ugly hack to get at an event_base
+#include "sock_internal.h"
+
+struct event_base **ev_base_ptr = &_sock_stream_ctx.ev_base;
+
+err_t irc_conn_create (struct irc_conn **conn_ptr, transport_t *transport, const struct irc_conn_callbacks *callbacks, 
+        void *cb_arg, error_t *err)
 {
     struct irc_conn *conn;
 
@@ -264,11 +269,11 @@
         goto error;
 
     // create the line_proto, with our on_line handler
-    if (line_proto_create(&conn->lp, sock, IRC_LINE_MAX * 1.5, &irc_conn_lp_callbacks, conn, err))
+    if (line_proto_create(&conn->lp, transport, IRC_LINE_MAX * 1.5, &irc_conn_lp_callbacks, conn, err))
         goto error;
 
     // create the outgoing line queue
-    if (irc_queue_create(&conn->out_queue, conn->lp, err))
+    if (irc_queue_create(&conn->out_queue, *ev_base_ptr, conn->lp, err))
         goto error;
 
     // ok
--- a/src/irc_conn.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/irc_conn.h	Tue Apr 28 20:27:45 2009 +0300
@@ -12,7 +12,7 @@
 struct irc_conn;
 
 #include "error.h"
-#include "sock.h"
+#include "transport.h"
 #include "line_proto.h"
 #include "irc_queue.h"
 #include "irc_line.h"
@@ -173,13 +173,13 @@
  * via *err.
  *
  * @param conn_ptr returned new irc_conn structure
- * @param sock the socket connected to the IRC server
+ * @param transport connected transport
  * @param callbacks the high-level status callbacks, required
  * @param cb_arg opqaue context argument for callbacks
  * @param err returned error info
  */
-err_t irc_conn_create (struct irc_conn **conn_ptr, struct sock_stream *sock, const struct irc_conn_callbacks *callbacks, 
-        void *cb_arg, struct error_info *err);
+err_t irc_conn_create (struct irc_conn **conn_ptr, transport_t *transport, const struct irc_conn_callbacks *callbacks, 
+        void *cb_arg, error_t *err);
 
 /**
  * Destroy the irc_conn state, terminating any connection and releasing all resources.
--- a/src/irc_queue.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/irc_queue.c	Tue Apr 28 20:27:45 2009 +0300
@@ -1,9 +1,6 @@
 #include "irc_queue.h"
 #include "log.h"
 
-// XXX: for ev_base
-#include "sock_internal.h"
-
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
@@ -128,7 +125,7 @@
     }
 }
 
-err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct event_base *ev_base, struct line_proto *lp, struct error_info *err)
 {
     struct irc_queue *queue;
 
@@ -137,8 +134,7 @@
         return SET_ERROR(err, ERR_CALLOC);
 
     // create the timer event
-    // XXX: using the sock module ev_base
-    if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
+    if ((queue->ev = evtimer_new(ev_base, &irc_queue_timer, queue)) == NULL)
         JUMP_SET_ERROR(err, ERR_EVENT_NEW);
 
     // initialize
--- a/src/irc_queue.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/irc_queue.h	Tue Apr 28 20:27:45 2009 +0300
@@ -69,7 +69,7 @@
 /**
  * Create a new irc_queue for use with the given line_proto
  */
-err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err);
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct event_base *ev_base, struct line_proto *lp, struct error_info *err);
 
 /**
  * Process a line, either sending it directly, or enqueueing it, based on the timer state.
--- a/src/line_proto.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/line_proto.h	Tue Apr 28 20:27:45 2009 +0300
@@ -6,7 +6,7 @@
  *
  * Support for protocols that send/receive lines
  */
-#include "sock.h"
+#include "transport.h"
 #include "error.h"
 
 /**
@@ -30,16 +30,14 @@
  *
  * The incoming lines are buffered in a buffer of \a buf_size bytes. This imposes a maximum limit on the line length.
  *
- * Note that the given callbacks struct is copied.
- *
  * @param lp_ptr a pointer to the new line_proto will be returned via this pointer
- * @param sock the sock_stream to use
+ * @param transport the connected transport to use
  * @param buf_size the incoming/outgoing buffer size, should be enough to hold the biggest possible line
  * @param callbacks the callbacks to use, a copy is stored
  * @param cb_arg the read_cb callback argument
  * @param err error information is returned via this pointer
  */
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
         const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err);
 
 /**
--- a/src/signals.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/signals.c	Tue Apr 28 20:27:45 2009 +0300
@@ -1,8 +1,9 @@
-#define _GNU_SOURCE
 
 #include "signals.h"
 #include "log.h"
 
+#define _GNU_SOURCE
+
 #include <string.h>
 #include <signal.h>
 #include <stdlib.h>
@@ -29,7 +30,7 @@
 
     (void) event;
 
-    log_info("caught %s: exiting", strsignal(signal));
+    log_info("caught %s: exiting", /* strsignal(signal) */ "xxx");
     
     if (event_base_loopexit(signals->ev_base, NULL))
         FATAL("event_base_loopexit");
--- a/src/sock.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock.c	Tue Apr 28 20:27:45 2009 +0300
@@ -20,98 +20,3 @@
     return SUCCESS;
 }
 
-void sock_stream_init (struct sock_stream *sock, struct sock_stream_type *type, sock_stream_connect_cb cb_func, void *cb_arg)
-{
-    // be strict
-    assert(sock->type == NULL);
-
-    // store 
-    sock->type = type;
-    sock->conn_cb_func = cb_func;
-    sock->conn_cb_arg = cb_arg;
-}
-
-int sock_stream_read (struct sock_stream *sock, void *buf, size_t len)
-{
-    struct error_info *err = SOCK_ERR(sock);
-
-    // XXX: not readable
-    if (!sock->type->methods.read)
-        return -1;
-
-    // proxy off to method handler
-    if (sock->type->methods.read(sock, buf, &len, err))
-        return -ERROR_CODE(err);
-    
-    // return updated bytes-read len
-    return len;
-}
-
-int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len)
-{
-    struct error_info *err = SOCK_ERR(sock);
-
-    // XXX: not writeable
-    if (!sock->type->methods.write)
-        return -1;
-
-    // proxy off to method handler
-    if (sock->type->methods.write(sock, buf, &len, err))
-        return -ERROR_CODE(err);
-
-    // return updated bytes-written len
-    return len;
-}
-
-err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg)
-{
-    // store
-    sock->cb_info = callbacks;
-    sock->cb_arg = arg;
-
-    // run method
-    return sock->type->methods.event_init(sock);
-}
-
-err_t sock_stream_event_enable (struct sock_stream *sock, short mask)
-{
-    // run method
-    return sock->type->methods.event_enable(sock, mask);
-}
-
-const struct error_info* sock_stream_error (struct sock_stream *sock)
-{
-    // return pointer
-    return SOCK_ERR(sock);
-}
-
-void sock_stream_invoke_callbacks (struct sock_stream *sock, short what)
-{
-    if (what & EV_READ && sock->cb_info->on_read)
-        sock->cb_info->on_read(sock, sock->cb_arg);
-
-    if (what & EV_WRITE && sock->cb_info->on_write)
-        sock->cb_info->on_read(sock, sock->cb_arg);
-}
-
-void sock_stream_invoke_conn_cb (struct sock_stream *sock, struct error_info *err, bool direct)
-{
-    if (!direct && sock->type->methods._conn_cb) {
-        // invoke indirectly
-        sock->type->methods._conn_cb(sock, err);
-
-    } else {
-        sock_stream_connect_cb cb_func = sock->conn_cb_func;
-
-        // invoke directly
-        sock->conn_cb_func = NULL;
-        cb_func(sock, err, sock->conn_cb_arg);
-        sock->conn_cb_arg = NULL;
-    }
-}
-
-void sock_stream_release (struct sock_stream *sock)
-{
-    sock->type->methods.release(sock);
-}
-
--- a/src/sock.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock.h	Tue Apr 28 20:27:45 2009 +0300
@@ -11,23 +11,6 @@
 #include <event2/event.h>
 
 /**
- * Socket function error codes
- */
-enum sock_error_code {
-    _ERR_SOCK_BEGIN = _ERR_SOCK,
-    ERR_SOCKET,         ///< socket(2) failed
-    ERR_CONNECT,        ///< connect(2) error - either direct or async
-    ERR_READ,           ///< read(2) error - will probably show up as an ERR_WRITE as well
-    ERR_READ_EOF,       ///< EOF on read(2)
-    ERR_WRITE,          ///< write(2) error - data was unsent, will probably show up as an ERR_READ as well
-    ERR_WRITE_EOF,      ///< write(2) gave EOF - zero bytes written
-    ERR_FCNTL,          ///< fcntl(2) failed
-    ERR_CLOSE,          ///< close(2) failed, some written data was probably not sent
-    ERR_GETSOCKOPT,     ///< getsockopt(2) failed
-    ERR_OPEN,           ///< open(2) failed
-};
-
-/**
  * Initialize the socket module's global state. Call this before calling any other sock_* functions.
  *
  * The given \a ev_base is the libevent base to use for nonblocking operation.
--- a/src/sock_gnutls.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_gnutls.c	Tue Apr 28 20:27:45 2009 +0300
@@ -13,23 +13,24 @@
 #include <assert.h>
 
 /**
- * Register for events based on the session's gnutls_record_get_direction().
+ * Enable the TCP events based on the session's gnutls_record_get_direction().
  */
-static err_t sock_gnutls_ev_enable (struct sock_gnutls *sock, struct error_info *err)
+static err_t sock_gnutls_ev_enable (struct sock_gnutls *sock, error_t *err)
 {
     int ret;
+    short mask;
 
     // gnutls_record_get_direction tells us what I/O operation gnutls would have required for the last
     // operation, so we can use that to determine what events to register
     switch ((ret = gnutls_record_get_direction(sock->session))) {
         case 0: 
             // read more data
-            sock_fd_enable_events(SOCK_GNUTLS_FD(sock), EV_READ); 
+            mask = EV_READ;
             break;
         
         case 1:
             // write buffer full
-            sock_fd_enable_events(SOCK_GNUTLS_FD(sock), EV_WRITE);
+            mask = EV_WRITE;
             break;
         
         default:
@@ -37,7 +38,11 @@
             RETURN_SET_ERROR_EXTRA(err, ERR_GNUTLS_RECORD_GET_DIRECTION, ret);
     }
     
-    // ok... wait
+    // do the enabling
+    if ((ERROR_CODE(err) = transport_fd_enable(SOCK_GNUTLS_FD(sock), EV_READ)))
+        return ERROR_CODE(err);
+    
+
     return SUCCESS;
 }
 
@@ -72,7 +77,7 @@
  *
  * Based on the GnuTLS examples/ex-rfc2818.c
  */
-static err_t sock_gnutls_verify (struct sock_gnutls *sock, struct error_info *err)
+static err_t sock_gnutls_verify (struct sock_gnutls *sock, error_t *err)
 {
     unsigned int status;
     const gnutls_datum_t *cert_list;
@@ -138,7 +143,7 @@
  *
  * @return >0 for finished handshake, 0 for handshake-in-progress, -err_t for errors.
  */
-static int sock_gnutls_handshake (struct sock_gnutls *sock, struct error_info *err)
+static int sock_gnutls_handshake (struct sock_gnutls *sock, error_t *err)
 {
     int ret;
 
@@ -177,60 +182,61 @@
 }
 
 /**
- * Our SOCK_STREAM event handler. Drive the handshake if that's current, otherwise, invoke user callbacks.
- *
- * XXX: this is ugly. This sock_stream-level separation doesn't really work that well.
+ * Our transport_fd event handler. Drive the handshake if that's current, otherwise, invoke user callbacks.
  */
-static void sock_gnutls_event_handler (int fd, short what, void *arg)
+static void sock_gnutls_on_event (struct transport_fd *fd, short what, void *arg)
 {
     struct sock_gnutls *sock = arg;
-    struct error_info err;
+    error_t err;
 
     (void) fd;
+
+    // XXX: timeouts
     (void) what;
 
     // are we in the handshake cycle?
     if (sock->handshake) {
         RESET_ERROR(&err);
 
+        // perform the next handshake step
         if (sock_gnutls_handshake(sock, &err) == 0) {
-            // wait for the next handshake step
+            // handshake continues
         
-        } else if (SOCK_GNUTLS_BASE(sock)->conn_cb_func) {
+        } else if (SOCK_GNUTLS_TRANSPORT(sock)->connected) {
             // the async connect process has now completed, either succesfully or with an error
             // invoke the user connect callback directly with appropriate error
-            sock_stream_invoke_conn_cb(SOCK_GNUTLS_BASE(sock), ERROR_CODE(&err) ? &err : NULL, true);
+            transport_connected(SOCK_GNUTLS_TRANSPORT(sock), ERROR_CODE(&err) ? &err : NULL, true);
 
         } else {
-            // re-handshake completed, so continue with the sock_stream_callbacks, so the user can call sock_gnutls_read/write
-
             if (ERROR_CODE(&err))
-                // XXX: bad, since we can't report this directly... we need to let the user call _read/write, and get
-                // the error from there
-                log_warn_err(&err, "sock_gnutls_handshake failed");
-            
-            // continue where we left off
-            sock_stream_invoke_callbacks(SOCK_GNUTLS_BASE(sock), sock->ev_mask);
+                // the re-handshake failed, so this transport is dead
+                transport_error(SOCK_GNUTLS_TRANSPORT(sock), &err);
+        
+            else
+                // re-handshake completed, so continue with the transport_callbacks
+                transport_fd_invoke(SOCK_GNUTLS_FD(sock), what);
         }
 
     } else {
         // normal sock_stream operation
-        // gnutls might be able to proceed now, so ask user to try what didn't work before now, using the mask given to
-        // event_enable().
-        sock_stream_invoke_callbacks(SOCK_GNUTLS_BASE(sock), sock->ev_mask);
+        // gnutls might be able to proceed now, so invoke user callbacks
+        transport_fd_invoke(SOCK_GNUTLS_FD(sock), what);
     }
 }
 
-static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t *len, struct error_info *err)
+static err_t sock_gnutls_read (transport_t *transport, void *buf, size_t *len, error_t *err)
 {
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    struct sock_gnutls *sock = transport_check(transport, &sock_gnutls_type);
     int ret;
     
     // read gnutls record
-    ret = gnutls_record_recv(sock->session, buf, *len);
+    do {
+        ret = gnutls_record_recv(sock->session, buf, *len);
+
+    } while (ret == GNUTLS_E_INTERRUPTED);
     
     // errors
-    // XXX: E_INTERRUPTED, E_REHANDSHAKE?
+    // XXX: E_REHANDSHAKE?
     if (ret < 0 && ret != GNUTLS_E_AGAIN)
         RETURN_SET_ERROR_EXTRA(err, ERR_GNUTLS_RECORD_RECV, ret);
     
@@ -238,7 +244,7 @@
         return SET_ERROR(err, ERR_READ_EOF);
 
 
-    // eagain?
+    // EAGAIN?
     if (ret < 0) {
         *len = 0;
 
@@ -251,14 +257,17 @@
     return SUCCESS;
 }
 
-static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t *len, struct error_info *err)
+static err_t sock_gnutls_write (transport_t *transport, const void *buf, size_t *len, error_t *err)
 {
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    struct sock_gnutls *sock = transport_check(transport, &sock_gnutls_type);
     int ret;
  
     // read gnutls record
-    ret = gnutls_record_send(sock->session, buf, *len);
-    
+    do {
+        ret = gnutls_record_send(sock->session, buf, *len);
+   
+    } while (ret == GNUTLS_E_INTERRUPTED);
+
     // errors
     if (ret < 0 && ret != GNUTLS_E_AGAIN)
         RETURN_SET_ERROR_EXTRA(err, ERR_GNUTLS_RECORD_RECV, ret);
@@ -279,43 +288,21 @@
     return SUCCESS;
 }
 
-static err_t sock_gnutls_event_init (struct sock_stream *base_sock)
-{
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
-
-    (void) sock;
-
-    // already setup, ok
-    return SUCCESS;
-}
-
-static err_t sock_gnutls_event_enable (struct sock_stream *base_sock, short mask)
+static void _sock_gnutls_destroy (transport_t *transport)
 {
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
-    
-    // store the ev_mask. We don't care about it here, because we assume that event_enable is only called once read or
-    // write, respectively, return zero. This is really the only case we can handle with gnutls.
-    sock->ev_mask = mask;
+    struct sock_gnutls *sock = transport_check(transport, &sock_gnutls_type);
     
-    // then wait for the event
-    return sock_gnutls_ev_enable(sock, SOCK_GNUTLS_ERR(sock));
-}
-
-static void sock_gnutls_release (struct sock_stream *base_sock)
-{
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
-    
-    // DIEEEE
+    // die
     sock_gnutls_destroy(sock);
 }
 
 /**
  * Our sock_tcp-invoked connect handler
  */
-static void sock_gnutls_on_connect (struct sock_stream *base_sock, struct error_info *tcp_err)
+static void sock_gnutls__connected (transport_t *transport, const error_t *tcp_err)
 {
-    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
-    struct error_info err;
+    struct sock_gnutls *sock = transport_check(transport, &sock_gnutls_type);
+    error_t err;
 
     // trap errors to let the user handle them directly
     if (tcp_err)
@@ -325,7 +312,7 @@
     gnutls_transport_set_ptr(sock->session, (gnutls_transport_ptr_t) (long int) SOCK_GNUTLS_FD(sock)->fd);
 
     // add ourselves as the event handler
-    if ((ERROR_CODE(&err) = sock_fd_init_ev(SOCK_GNUTLS_FD(sock), &sock_gnutls_event_handler, sock)))
+    if ((ERROR_CODE(&err) = transport_fd_setup(SOCK_GNUTLS_FD(sock), sock_gnutls_on_event, sock)))
         goto error;
 
     // start handshake
@@ -338,20 +325,15 @@
 
 error:
     // tell the user
-    SOCK_GNUTLS_BASE(sock)->conn_cb_func(SOCK_GNUTLS_BASE(sock), &err, SOCK_GNUTLS_BASE(sock)->conn_cb_arg);
+    transport_connected(transport, &err, true);
 }
 
-/*
- * Our sock_stream_Type
- */
-struct sock_stream_type sock_gnutls_type = {
+struct transport_type sock_gnutls_type = {
     .methods                = {
-        .read               = &sock_gnutls_read,
-        .write              = &sock_gnutls_write,
-        .event_init         = &sock_gnutls_event_init,
-        .event_enable       = &sock_gnutls_event_enable,
-        .release            = &sock_gnutls_release,
-        ._conn_cb           = &sock_gnutls_on_connect,
+        .read               = sock_gnutls_read,
+        .write              = sock_gnutls_write,
+        .destroy            = _sock_gnutls_destroy,
+        ._connected         = sock_gnutls__connected,
     },
 };
 
@@ -366,7 +348,7 @@
     printf("gnutls: %d: %s", level, msg);
 }
 
-err_t sock_gnutls_global_init (struct error_info *err)
+err_t sock_gnutls_global_init (error_t *err)
 {
     // global init
     if ((ERROR_EXTRA(err) = gnutls_global_init()) < 0)
@@ -395,7 +377,7 @@
 err_t sock_ssl_client_cred_create (struct sock_ssl_client_cred **ctx_cred,
         const char *cafile_path, bool verify,
         const char *cert_path, const char *pkey_path,
-        struct error_info *err
+        error_t *err
 ) {
     struct sock_ssl_client_cred *cred;
 
@@ -453,11 +435,10 @@
         sock_ssl_client_cred_destroy(cred);
 }
 
-err_t sock_ssl_connect_async (struct sock_stream **sock_ptr, 
+err_t sock_ssl_connect (const struct transport_info *info, transport_t **transport_ptr, 
         const char *hostname, const char *service,
         struct sock_ssl_client_cred *cred,
-        sock_stream_connect_cb cb_func, void *cb_arg, 
-        struct error_info *err
+        error_t *err
     )
 {
     struct sock_gnutls *sock = NULL;
@@ -467,7 +448,7 @@
         return SET_ERROR(err, ERR_CALLOC);
 
     // initialize base
-    sock_stream_init(SOCK_GNUTLS_BASE(sock), &sock_gnutls_type, cb_func, cb_arg);
+    transport_init(SOCK_GNUTLS_TRANSPORT(sock), &sock_gnutls_type, info);
 
     if (!cred) {
         // default credentials
@@ -487,6 +468,9 @@
     if ((sock->hostname = strdup(hostname)) == NULL)
         JUMP_SET_ERROR(err, ERR_STRDUP);
 
+    // initialize TCP
+    sock_tcp_init(SOCK_GNUTLS_TCP(sock));
+
     // initialize client session
     if ((ERROR_EXTRA(err) = gnutls_init(&sock->session, GNUTLS_CLIENT)) < 0)
         JUMP_SET_ERROR(err, ERR_GNUTLS_INIT);
@@ -503,11 +487,11 @@
         JUMP_SET_ERROR(err, ERR_GNUTLS_CRED_SET);
 
     // TCP connect
-    if (sock_tcp_connect_async_begin(SOCK_GNUTLS_TCP(sock), hostname, service, err))
+    if (sock_tcp_connect_async(SOCK_GNUTLS_TCP(sock), hostname, service, err))
         goto error;
 
     // done, wait for the connect to complete
-    *sock_ptr = SOCK_GNUTLS_BASE(sock);
+    *transport_ptr = SOCK_GNUTLS_TRANSPORT(sock);
 
     return SUCCESS;
 
@@ -520,18 +504,17 @@
 
 void sock_gnutls_destroy (struct sock_gnutls *sock)
 {
-    // terminate the TCP transport
-    sock_fd_close(SOCK_GNUTLS_FD(sock));
-
     // close the session rudely
     gnutls_deinit(sock->session);
-    
+ 
+    // terminate the TCP transport
+    sock_tcp_destroy(SOCK_GNUTLS_TCP(sock));
+   
     if (sock->cred)
         // drop the cred ref
         sock_ssl_client_cred_put(sock->cred);
 
     // free
     free(sock->hostname);
-    free(sock);
 }
 
--- a/src/sock_gnutls.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_gnutls.h	Tue Apr 28 20:27:45 2009 +0300
@@ -7,8 +7,7 @@
  * A sock_stream implementation using GnuTLS for SSL
  */
 
-#include "sock_internal.h"
-#include "sock_tcp.h"
+#include "sock_tcp_internal.h"
 
 #include <gnutls/gnutls.h>
 
@@ -33,6 +32,11 @@
     ERR_GNUTLS_CERT_SET_X509_KEY_FILE,
 };
 
+/*
+ * Our transport_type
+ */
+extern struct transport_type sock_gnutls_type;
+
 /**
  * GnuTLS credentials for client sockets.
  */
@@ -86,12 +90,7 @@
 /**
  * Cast a sock_gnutls to a sock_stream.
  */
-#define SOCK_GNUTLS_BASE(sock_ptr) SOCK_TCP_BASE(SOCK_GNUTLS_TCP(sock_ptr))
-
-/**
- * Get a pointer to the sock_gnutls's error_info.
- */
-#define SOCK_GNUTLS_ERR(sock_ptr) SOCK_ERR(SOCK_GNUTLS_BASE(sock_ptr))
+#define SOCK_GNUTLS_TRANSPORT(sock_ptr) SOCK_TCP_TRANSPORT(SOCK_GNUTLS_TCP(sock_ptr))
 
 /**
  * Initialize the global gnutls state
--- a/src/sock_internal.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_internal.h	Tue Apr 28 20:27:45 2009 +0300
@@ -7,116 +7,14 @@
  * internal sock_* interface
  */
 #include "sock.h"
-#include <stdbool.h>
-
-/**
- * General state for all sock_stream's
- */
-struct sock_stream_ctx {
-    /** libevent core */
-    struct event_base *ev_base;
-
-};
 
 /**
  * Global sock_stream_ctx used for sock_init() and all sock_stream's
  */
-extern struct sock_stream_ctx _sock_stream_ctx;
-
-/**
- * Socket implementation type methods
- */
-struct sock_stream_methods {
-    /** As read(2) */
-    err_t (*read) (struct sock_stream *sock, void *buf, size_t *len, struct error_info *err);
-
-    /** As write(2) */
-    err_t (*write) (struct sock_stream *sock, const void *buf, size_t *len, struct error_info *err);
-
-    /** Initialize events. cb_info/cb_arg are already updated */
-    err_t (*event_init) (struct sock_stream *sock);
-
-    /** Enable events as specified */
-    err_t (*event_enable) (struct sock_stream *sock, short mask);
-    
-    /** Release all resources and free the sock_stream */
-    void (*release) (struct sock_stream *sock);
-
-    /** The type's connect_cb handler, defaults to just invoke conn_cb_func */
-    void (*_conn_cb) (struct sock_stream *sock, struct error_info *err);
-};
-
-/**
- * The base type struct, which defines the method table.
- */
-struct sock_stream_type {
-    /** Method table */
-    struct sock_stream_methods methods;
-};
-
-/**
- * The base sock_stream type, as used by the sock_stream_* functions.
- *
- * The specific implementations should embed this at the start of their type-specific struct, and then cast around
- * as appropriate.
- */
-struct sock_stream {
-    /** The sock_stream_type for this socket */
-    struct sock_stream_type *type;
-
-    /** Last error info, XXX: remove this */
-    struct error_info err;
+extern struct sock_stream_ctx {
+    /** libevent core */
+    struct event_base *ev_base;
 
-    /** Callbacks */
-    const struct sock_stream_callbacks *cb_info;
-
-    /** Callback arg */
-    void *cb_arg;
-
-    /** Connection callback function */
-    sock_stream_connect_cb conn_cb_func;
-
-    /** Connection callback context argument */
-    void *conn_cb_arg;
-};
-
-/**
- * Convert a `struct sock_stream*` to the given type.
- */
-#define SOCK_FROM_BASE(sock, type) ((type*) sock)
-
-/**
- * Get a pointer to the sock_stream's error_info field.
- */
-#define SOCK_ERR(sock) (&(sock)->err)
-
-/**
- * Initialize a sock_stream with the given sock_stream_type.
- *
- * The sock_stream should be initialized to zero. It is a bug to call this twice.
- *
- * @param sock the new sock_stream
- * @param type the sock_stream_type defining the implementation used
- * @param cb_func the optional connect_async callback function
- * @param cb_arg the optional context argument for cb_func
- */
-void sock_stream_init (struct sock_stream *sock, struct sock_stream_type *type, sock_stream_connect_cb cb_func, void *cb_arg);
-
-/**
- * Invoke the appropriate callbacks for the given EV_* bitmask.
- *
- * @param sock the sock_stream
- * @param what combination of EV_* bits describing what callbacks to invoke
- */
-void sock_stream_invoke_callbacks (struct sock_stream *sock, short what);
-
-/**
- * Invoke the sock_stream_conn_cb callback with the given error param.
- *
- * This invokes the sock_stream_methods::_conn_cb if present and \a direct is not given, otherwise the callback directly
- *
- * @param direct force the conn_cb to be called directly
- */
-void sock_stream_invoke_conn_cb (struct sock_stream *sock, struct error_info *err, bool direct);
+} _sock_stream_ctx;
 
 #endif /* SOCK_INTERNAL_H */
--- a/src/sock_ssl.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_ssl.h	Tue Apr 28 20:27:45 2009 +0300
@@ -4,7 +4,7 @@
 /**
  * @file
  *
- * SSL-specific functionality as related to sock.h
+ * SSL transport implementation.
  */
 #include "sock.h"
 
@@ -36,7 +36,7 @@
 err_t sock_ssl_client_cred_create (struct sock_ssl_client_cred **ctx_cred,
         const char *cafile_path, bool verify,
         const char *cert_path, const char *pkey_path,
-        struct error_info *err
+        error_t *err
 );
 
 /**
@@ -58,19 +58,17 @@
  * or a sock_ssl_client_cred allocated using sock_ssl_client_cred_create(). The contexts are reference-counted, so once
  * a cred context has been released, it will be destroyed once the last connection using it is destroyed.
  * 
- * @param sock_ptr the new sock_stream
+ * @param info the transport setup info
+ * @param transport_ptr returned transport
  * @param hostname the hostname to connect to
  * @param service the TCP service name (i.e. port) to connect to
  * @param cred the SSL client credentials to use, if not NULL
- * @param cb_func the callback for connection/handshake completion
- * @param cb_arg the callback context argument
  * @param err returned error info
  */
-err_t sock_ssl_connect_async (struct sock_stream **sock_ptr, 
+err_t sock_ssl_connect (const struct transport_info *info, transport_t **transport_ptr, 
         const char *hostname, const char *service,
         struct sock_ssl_client_cred *cred,
-        sock_stream_connect_cb cb_func, void *cb_arg, 
-        struct error_info *err
+        error_t *err
     );
 
 
--- a/src/sock_tcp.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_tcp.c	Tue Apr 28 20:27:45 2009 +0300
@@ -1,5 +1,6 @@
 
-#include "sock_tcp.h"
+#include "sock_tcp_internal.h"
+#include "sock_internal.h"
 #include "log.h"
 
 #include <stdlib.h>
@@ -7,166 +8,195 @@
 #include <sys/socket.h>
 #include <string.h>
 
-static void sock_tcp_release (struct sock_stream *base_sock)
+/**
+ * Start connecting to the given address in a non-blocking fashion. Returns any errors that immediately crop up,
+ * otherwise eventually calls sock_tcp_connect_done().
+ */
+static err_t sock_tcp_connect_addr (struct sock_tcp *sock, struct addrinfo *addr, error_t *err);
+
+
+
+
+/**
+ * Our transport_methods
+ */
+static void _sock_tcp_destroy (transport_t *transport)
 {
-    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
+    struct sock_tcp *sock = transport_check(transport, &sock_tcp_type);
     
-    // close and free
-    sock_fd_close(SOCK_TCP_FD(sock));
-    sock_tcp_free(sock);
+    // proxy
+    sock_tcp_destroy(sock);
 }
 
 /*
- * Our sock_stream_type
+ * Our transport_type
  */
-static struct sock_stream_type sock_tcp_type = {
+struct transport_type sock_tcp_type = {
     .methods                = {
-        .read               = &sock_fd_read,
-        .write              = &sock_fd_write,
-        .event_init         = &sock_fd_event_init,
-        .event_enable       = &sock_fd_event_enable,
-        .release            = &sock_tcp_release,
+        .read               = transport_fd_methods_read,
+        .write              = transport_fd_methods_write,
+        .destroy            = _sock_tcp_destroy,
     },
 };
 
-static err_t sock_tcp_alloc (struct sock_tcp **sock_ptr, sock_stream_connect_cb cb_func, void *cb_arg)
-{
-    // alloc
-    if ((*sock_ptr = calloc(1, sizeof(**sock_ptr))) == NULL)
-        return ERR_CALLOC;
-    
-    // initialize base with sock_tcp_type
-    sock_stream_init(SOCK_TCP_BASE(*sock_ptr), &sock_tcp_type, cb_func, cb_arg);
-
-    // init without any fd
-    sock_fd_init(SOCK_TCP_FD(*sock_ptr), -1);
-
-    // done
-    return SUCCESS;
-}
-
-err_t sock_tcp_init_socket (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
+/**
+ * Create a new socket() using the given addr's family/socktype/protocol, and update our transport_fd state.
+ */
+static err_t sock_tcp_create_socket (struct sock_tcp *sock, struct addrinfo *addr, error_t *err)
 {
     int fd;
 
-    // call socket
+    // call socket()
     if ((fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol)) < 0)
         RETURN_SET_ERROR_ERRNO(err, ERR_SOCKET);
 
-    // ok
-    sock_fd_init(SOCK_TCP_FD(sock), fd);
+    // ok, update transport_fd
+    transport_fd_set(SOCK_TCP_FD(sock), fd);
+
 
     return SUCCESS;
 }
 
 /**
- * Attempt to connect to the given addrinfo, or the next one, if that fails, etc.
+ * Read the socket's error code, if any.
+ *
+ * The read error number is stored in err->extra on SUCCESS, unless reading the error fails, in which case
+ * err contains the normal error info.
+ *
+ * @return error code on failure
  */
-static err_t sock_tcp_connect_async_continue (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
+static err_t sock_tcp_read_error (struct sock_tcp *sock, error_t *err)
 {
-    // no more addresses left?
-    if (!addr)
-        // XXX: rename error
-        return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
-
-    // try and connect to each one until we find one that works
-    do {
-        // attempt to start connect
-        if (sock_tcp_connect_async_addr(sock, addr, err) == SUCCESS)
-            break;
-
-        // try the next one
-        log_warn("sock_tcp_connect_async_addr(%s): %s", addr->ai_canonname, error_msg(err));
-
-    } while ((addr = addr->ai_next));
-    
-
-    if (addr) {
-        // we succesfully did a sock_tcp_connect_async_addr on valid address
-        return SUCCESS;
-
-    } else {
-        // all of the connect_async_addr's failed, return the last error
-        return ERROR_CODE(err);
-    }
-}
-
-/**
- * Our async connect operation has completed, clean up addrinfos and events, and call the user callback. The given
- * \a err should be NULL for successful completion, or the error for unsuccesfully completion.
- */
-static void sock_tcp_connect_async_done (struct sock_tcp *sock, struct error_info *err)
-{
-    struct sock_stream *sock_base = SOCK_TCP_BASE(sock);
-
-    // free the addrinfo
-    freeaddrinfo(sock->async_res); 
-    sock->async_res = sock->async_cur = NULL;
-
-    // remove our event handler so the user can install their own
-    sock_fd_deinit_ev(SOCK_TCP_FD(sock));
-
-    // ok, run callback
-    sock_stream_invoke_conn_cb(sock_base, err, false);
-}
-
-/**
- * Our start_connect callback
- */
-static void sock_tcp_connect_cb (int fd, short what, void *arg)
-{
-    struct sock_tcp *sock = arg;
     int optval;
     socklen_t optlen;
-    struct error_info err;
-    err_t tmp;
 
-    // XXX: timeouts
-    (void) what;
+    RESET_ERROR(err);
 
     // init params
     optval = 0;
     optlen = sizeof(optval);
 
     // read error code
-    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen))
-        JUMP_SET_ERROR_ERRNO(&err, ERR_GETSOCKOPT);
+    if (getsockopt(SOCK_TCP_FD(sock)->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen))
+        RETURN_SET_ERROR_ERRNO(err, ERR_GETSOCKOPT);
     
     // sanity-check optlen... not sure if this is sensible
     if (optlen != sizeof(optval))
-        JUMP_SET_ERROR_EXTRA(&err, ERR_GETSOCKOPT, EINVAL);
-        
-    // did the connect complete succesfully or not?
-    if (optval)
-        JUMP_SET_ERROR_EXTRA(&err, ERR_CONNECT, optval);
+        RETURN_SET_ERROR_EXTRA(err, ERR_GETSOCKOPT, EINVAL);
     
-    // done
-    return sock_tcp_connect_async_done(sock, NULL);
+    // then store the system error code
+    ERROR_EXTRA(err) = optval;
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Attempt to connect to the given addrinfo, or the next one, if that fails, etc.
+ *
+ * This does not call transport_connected().
+ */
+static err_t sock_tcp_connect_continue (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
+{
+    if (!addr)
+        // no more addresses left to try
+        return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
+
+    // try and connect to each one until we find one that works
+    do {
+        // attempt to start connect
+        if (sock_tcp_connect_addr(sock, addr, err) == SUCCESS)
+            break;
+
+        // log a warning on the failed connect
+        log_warn("sock_tcp_connect_addr(%s): %s", addr->ai_canonname, error_msg(err));
+
+    } while ((addr = addr->ai_next));
+    
+
+    if (addr)
+        // we succesfully did a sock_tcp_connect_addr on valid address
+        return SUCCESS;
+
+    else
+        // all of the connect_async_addr's failed, return the last error
+        return ERROR_CODE(err);
+    
+}
+
+/**
+ * Cleanup our resolver state and any connect callbacks after a connect
+ */
+static void sock_tcp_connect_cleanup (struct sock_tcp *sock)
+{
+    // free the addrinfo
+    freeaddrinfo(sock->async_res); 
+    sock->async_res = sock->async_cur = NULL;
+    
+    // remove our event handler
+    transport_fd_clear(SOCK_TCP_FD(sock));
+}
+
+/**
+ * Our async connect operation has completed, clean up and call the user callback. The given \a err should be NULL for
+ * successful completion, or the error for failures.
+ */
+static void sock_tcp_connect_done (struct sock_tcp *sock, struct error_info *err)
+{
+    // cleanup
+    sock_tcp_connect_cleanup(sock);
+    
+    // ok, run callback
+    transport_connected(SOCK_TCP_TRANSPORT(sock), err, false);
+}
+
+/**
+ * Our async connect callback
+ */
+static void sock_tcp_on_connect (struct transport_fd *fd, short what, void *arg)
+{
+    struct sock_tcp *sock = arg;
+    struct error_info err;
+    err_t tmp;
+
+    // XXX: timeouts
+    (void) what;
+    
+    // read socket error code
+    if (sock_tcp_read_error(sock, &err))
+        goto error;
+
+    // did the connect fail?
+    if (ERROR_EXTRA(&err))
+        JUMP_SET_ERROR(&err, ERR_CONNECT);
+    
+    // done, success
+    return sock_tcp_connect_done(sock, NULL);
 
 error:
     // close the socket
-    if ((tmp = sock_fd_close(SOCK_TCP_FD(sock))))
+    if ((tmp = transport_fd_close(fd)))
         log_warn("error closing socket after connect error: %s", error_name(tmp));
 
     // log a warning
     log_warn("connect to '%s' failed: %s", sock->async_cur->ai_canonname, error_msg(&err));
 
     // try the next one or fail completely
-    if (sock_tcp_connect_async_continue(sock, sock->async_cur->ai_next, &err))
-        sock_tcp_connect_async_done(sock, &err);
+    if (sock_tcp_connect_continue(sock, sock->async_cur->ai_next, &err))
+        sock_tcp_connect_done(sock, &err);
 }
 
-err_t sock_tcp_connect_async_addr (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
+static err_t sock_tcp_connect_addr (struct sock_tcp *sock, struct addrinfo *addr, error_t *err)
 {
     int ret;
     err_t tmp;
 
     // first, create the socket
-    if (sock_tcp_init_socket(sock, addr, err))
+    if (sock_tcp_create_socket(sock, addr, err))
         return ERROR_CODE(err);
 
     // then, set it up as nonblocking
-    if ((ERROR_CODE(err) = sock_fd_set_nonblock(SOCK_TCP_FD(sock), true)))
+    if ((ERROR_CODE(err) = transport_fd_nonblock(SOCK_TCP_FD(sock), true)))
         goto error;
 
     // then, initiate the connect
@@ -174,20 +204,21 @@
         JUMP_SET_ERROR_ERRNO(err, ERR_CONNECT);
     
     if (ret < 0) {
+        // set the "current" address in case it fails and we need to try the next one
+        sock->async_cur = addr;
+
         // ok, connect started, setup our completion callback
-        if ((ERROR_CODE(err) = sock_fd_init_ev(SOCK_TCP_FD(sock), &sock_tcp_connect_cb, sock)))
+        if ((ERROR_CODE(err) = transport_fd_setup(SOCK_TCP_FD(sock), sock_tcp_on_connect, sock)))
             goto error;
     
         // enable for write
-        if ((ERROR_CODE(err) = sock_fd_enable_events(SOCK_TCP_FD(sock), EV_WRITE)))
+        if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), EV_WRITE)))
             goto error;
 
-        // set the "current" address in case it fails and we need to try the next one
-        sock->async_cur = addr;
-
     } else {
         // oops... blocking connect - fail to avoid confusion
         // XXX: come up with a better error name to use
+        // XXX: support non-async connects as well
         JUMP_SET_ERROR_EXTRA(err, ERR_CONNECT, EINPROGRESS);
     }
     
@@ -196,28 +227,40 @@
 
 error:
     // close the stuff we did open
-    if ((tmp = sock_fd_close(SOCK_TCP_FD(sock))))
+    if ((tmp = transport_fd_close(SOCK_TCP_FD(sock))))
         log_warn("error closing socket after connect error: %s", error_name(tmp));
 
     return ERROR_CODE(err);
 }
 
-err_t sock_tcp_connect_async_begin (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
+/**
+ * External interface
+ */
+void sock_tcp_init (struct sock_tcp *sock)
+{
+    struct event_base *ev_base = _sock_stream_ctx.ev_base;
+
+    // init without any fd
+    transport_fd_init(SOCK_TCP_FD(sock), ev_base, TRANSPORT_FD_INVALID);
+
+}
+
+err_t sock_tcp_connect_async (struct sock_tcp *sock, const char *hostname, const char *service, error_t *err)
 {
     struct addrinfo hints;
     int ret;
 
-    // hints
+    // build hints
     memset(&hints, 0, sizeof(hints));
     hints.ai_family = AF_UNSPEC;
     hints.ai_socktype = SOCK_STREAM;
 
-    // resolve
+    // resolve (blocking)
     if ((ret = getaddrinfo(hostname, service, &hints, &sock->async_res)))
         RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);
     
-    // start connecting
-    if (sock_tcp_connect_async_continue(sock, sock->async_res, err))
+    // start connecting on the first result
+    if (sock_tcp_connect_continue(sock, sock->async_res, err))
         goto error;
 
     // ok
@@ -225,128 +268,54 @@
 
 error:
     // cleanup
-    if (sock->async_res) {
-        freeaddrinfo(sock->async_res);
-        sock->async_res = NULL;
-    }
-
+    if (sock->async_res)
+        sock_tcp_connect_cleanup(sock);
+    
     return ERROR_CODE(err);
 }
 
-err_t sock_tcp_connect_blocking (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
+void sock_tcp_destroy (struct sock_tcp *sock)
 {
-    struct addrinfo hints, *res, *r;
-    int ret;
-    
-    // zero error code
-    RESET_ERROR(err);
+    // cleanup our stuff
+    if (sock->async_res)
+        sock_tcp_connect_cleanup(sock);
     
-    // hints
-    memset(&hints, 0, sizeof(hints));
-    hints.ai_family = AF_UNSPEC;
-    hints.ai_socktype = SOCK_STREAM;
-
-    // resolve
-    if ((ret = getaddrinfo(hostname, service, &hints, &res)))
-        RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);
-
-    // try each result in turn
-    for (r = res; r; r = r->ai_next) {
-        // create the socket
-        if ((SOCK_TCP_FD(sock)->fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) < 0) {
-            // remember error
-            SET_ERROR_ERRNO(err, ERR_SOCKET);
-
-            // skip to next one
-            continue;
-        }
-        
-        // connect to remote address
-        if (connect(SOCK_TCP_FD(sock)->fd, r->ai_addr, r->ai_addrlen)) {
-            // remember error
-            SET_ERROR_ERRNO(err, ERR_CONNECT);
-            
-            // close/invalidate socket
-            sock_fd_close(SOCK_TCP_FD(sock));
-
-            // skip to next one
-            continue;
-        }
-        
-        // valid socket, use this
-        break;
-    }
-    
-    // ensure we got some valid socket, else return last error code
-    if (SOCK_TCP_FD(sock)->fd < 0) {
-        // did we hit some error?
-        if (IS_ERROR(err))
-            // return last error
-            return ERROR_CODE(err);
-        
-        else
-            // no results
-            return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
-    }
-
-    // ok, done
-    return 0;    
+    // destroy lower level
+    transport_fd_destroy(SOCK_TCP_FD(sock)); 
 }
 
-void sock_tcp_free (struct sock_tcp *sock)
-{
-    // free
-    free(sock);
-}
-
-err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err)
+/**
+ * Public interface
+ */
+err_t sock_tcp_connect (const struct transport_info *info, transport_t **transport_ptr, 
+        const char *host, const char *service, error_t *err)
 {
     struct sock_tcp *sock;
+ 
+    // alloc
+    if ((sock = calloc(1, sizeof(*sock))) == NULL)
+        return ERR_CALLOC;
     
-    // allocate
-    if ((ERROR_CODE(err) = sock_tcp_alloc(&sock, NULL, NULL)))
-        return ERROR_CODE(err);
+    // initialize transport
+    transport_init(SOCK_TCP_TRANSPORT(sock), &sock_tcp_type, info);
     
+    // init our state
+    sock_tcp_init(sock);
+ 
     // connect    
-    if (sock_tcp_connect_blocking(sock, host, service, err))
+    if (sock_tcp_connect_async(sock, host, service, err))
         goto error;
 
     // good
-    *sock_ptr = SOCK_TCP_BASE(sock);
+    *transport_ptr = SOCK_TCP_TRANSPORT(sock);
 
     return 0;
 
 error:
     // cleanup
-    sock_tcp_free(sock);
+    sock_tcp_destroy(sock);
         
     // return error code
     return ERROR_CODE(err);
 }
 
-err_t sock_tcp_connect_async (struct sock_stream **sock_ptr, const char *host, const char *service, 
-        sock_stream_connect_cb cb_func, void *cb_arg, struct error_info *err)
-{
-    struct sock_tcp *sock;
-    
-    // allocate and init
-    if ((ERROR_CODE(err) = sock_tcp_alloc(&sock, cb_func, cb_arg)))
-        return ERROR_CODE(err);
-    
-    // connect    
-    if (sock_tcp_connect_async_begin(sock, host, service, err))
-        goto error;
-
-    // good
-    *sock_ptr = SOCK_TCP_BASE(sock);
-
-    return 0;
-
-error:
-    // cleanup
-    sock_tcp_free(sock);
-        
-    // return error code
-    return ERROR_CODE(err);
-}
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/sock_tcp.h	Tue Apr 28 20:27:45 2009 +0300
@@ -0,0 +1,28 @@
+#ifndef SOCK_TCP_H
+#define SOCK_TCP_H
+
+/**
+ * @file
+ *
+ * TCP transport implementation.
+ *
+ * XXX: provide some TCP-specific type/functions?
+ */
+#include "transport.h"
+
+/**
+ * Connect the given transport via TCP to the given host/service. The transport will not be ready for use until the
+ * transport_callbacks::on_connect callback has been invoked.
+ *
+ * XXX: blocking DNS resolution
+ *
+ * @param info the transport setup info
+ * @param transport_ptr returned transport
+ * @param host the hostname to connect to
+ * @param service the service name (i.e. port) to connect to
+ * @param err returned error info
+ */
+err_t sock_tcp_connect (const struct transport_info *info, transport_t **transport_ptr, 
+        const char *host, const char *service, error_t *err);
+
+#endif
--- a/src/sock_tcp_internal.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/sock_tcp_internal.h	Tue Apr 28 20:27:45 2009 +0300
@@ -6,77 +6,60 @@
  *
  * Internal interface of the sock_tcp transport implementation.
  */
-#include "sock_internal.h"
-#include "sock_fd.h"
+#include "sock_tcp.h"
+#include "transport_fd.h"
 #include <netdb.h>
 
 /**
- * Contains the base sock_stream struct, and the file descriptor
+ * Our transport type struct
+ */
+extern struct transport_type sock_tcp_type;
+
+/**
+ * TCP transport state
  */
 struct sock_tcp {
-    /** The base struct for sock_stream_* functions */
-    struct sock_fd base_fd;
+    /** Base fd-based transport state */
+    struct transport_fd base_fd;
     
-    /** The current connect_async resolved address */
+    /** The resolver state for the async connect process */
     struct addrinfo *async_res, *async_cur;
 };
 
 /**
- * Get a sock_fd pointer from a sock_tcp pointer
+ * Get a transport_fd pointer from a sock_tcp pointer
  */
 #define SOCK_TCP_FD(sock_ptr) (&(sock_ptr)->base_fd)
 
 /**
- * Get a sock_base pointer from a sock_tcp pointer
- */
-#define SOCK_TCP_BASE(sock_ptr) SOCK_FD_BASE(SOCK_TCP_FD(sock_ptr))
-
-/**
- * Get the sock_stream.err pointer from a sock_tcp pointer
+ * Get a transport pointer from a sock_tcp pointer
  */
-#define SOCK_TCP_ERR(sock_ptr) SOCK_ERR(SOCK_TCP_BASE(sock_ptr))
-
-/**
- * Initialize a blank sock_tcp by creating a new socket (using the socket() syscall), but doesn't do anything further.
- *
- * This uses the ai_family, ai_socktype and ai_protocol fields from the given addrinfo.
- */
-err_t sock_tcp_init_socket (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err);
+#define SOCK_TCP_TRANSPORT(sock_ptr) TRANSPORT_FD_BASE(SOCK_TCP_FD(sock_ptr))
 
 /**
- * Initiate an async connection operation on the given socket to the given address. Once the connect() completes,
- * either the on_connect or the on_error callback will be called.
- *
- * If, for some weird reason, this ends up doing a blocking connect(), the on_connect callback will be called directly.
- * If the async connect fails, this just returns the error.
- *
- * @param sock the unconnected TCP sockect to connect with
- * @param addr the address to connect to
- * @param err returned error info
+ * Initialize the sock_tcp state
  */
-err_t sock_tcp_connect_async_addr (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err);
+void sock_tcp_init (struct sock_tcp *sock);
 
 /**
- * Attempt to connect asyncronously to the given hostname/service. Once a connection has been established, the
- * on_connect() callback will be called.
+ * Attempt to connect asyncronously to the given hostname/service. Once a connection has been established, this will
+ * call transport_connected(), so you can register transport_methods::_connected if layering on top of TCP.
  *
- * In case of errors, either on_error() will be called, or an error returned - depending on when the error happaned.
+ * In case of errors while starting the async connect process, an error code will be returned. If the connect fails
+ * later on, transport_connected() will be called with the error code.
+ *
+ * The sock must have been initialized using sock_tcp_init().
  *
  * @param sock the unconnected TCP socket to connect with
  * @param hostname the hostname to resolve
  * @param service the service to connect to
- * @param err returned error info
+ * @param err returned error info for immediate errors
  */
-err_t sock_tcp_connect_async_begin (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err);
+err_t sock_tcp_connect_async (struct sock_tcp *sock, const char *hostname, const char *service, error_t *err);
 
 /**
- * Initialize a blank sock_tcp by connecting in a blocking fashion.
+ * Destroy the sock_tcp's state, including the transport_fd state.
  */
-err_t sock_tcp_connect_blocking (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err);
-
-/**
- * Free a non-connected sock_tcp
- */
-void sock_tcp_free (struct sock_tcp *sock);
+void sock_tcp_destroy (struct sock_tcp *sock);
 
 #endif /* SOCK_TCP_INTERNAL_H */
--- a/src/transport.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/transport.c	Tue Apr 28 20:27:45 2009 +0300
@@ -1,14 +1,15 @@
 #include "transport_internal.h"
 
+#include <assert.h>
 
-void transport_bind (transport_t *transport, const struct transport_type *type, const struct transport_info *info)
+void transport_init (transport_t *transport, const struct transport_type *type, const struct transport_info *info)
 {
     // not already bound
-    assert(!transport->type && !transport->info);
+    assert(!transport->type);
 
     // store
     transport->type = type;
-    *transport->info = info;
+    transport->info = *info;
 }
 
 void* transport_check (transport_t *transport, const struct transport_type *type)
@@ -19,3 +20,44 @@
     // ok
     return transport;
 }
+
+void transport_connected (transport_t *transport, const error_t *err, bool direct)
+{
+    // update state
+    transport->connected = true;
+
+    if (direct || !transport->type->methods._connected) {
+        // user callback
+        if (err)
+            // connect failed
+            transport->info.cb_tbl->on_error(transport, err, transport->info.cb_arg);
+        else
+            // connect succesfull
+            transport->info.cb_tbl->on_connect(transport, transport->info.cb_arg);
+
+    } else {
+        // wrapper method
+        transport->type->methods._connected(transport, err);
+    }
+}
+
+void transport_invoke (transport_t *transport, short what)
+{
+    // on_ready
+    if (what & TRANSPORT_READ && transport->info.cb_tbl->on_read)
+        transport->info.cb_tbl->on_read(transport, transport->info.cb_arg);
+
+    // on_write
+    if (what & TRANSPORT_WRITE && transport->info.cb_tbl->on_write)
+        transport->info.cb_tbl->on_write(transport, transport->info.cb_arg);
+
+}
+
+void transport_error (transport_t *transport, const error_t *err)
+{
+    // update state
+    transport->connected = false;
+
+    // invoke callback
+    transport->info.cb_tbl->on_error(transport, err, transport->info.cb_arg);
+}
--- a/src/transport.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/transport.h	Tue Apr 28 20:27:45 2009 +0300
@@ -42,7 +42,15 @@
      * An asynchronous error has occured. This is only called for errors that occur while being called directly from
      * the underlying event loop, and never from inside an API function.
      */
-    void (*on_error) (transport_t *transport, error_t *err, void *arg);
+    void (*on_error) (transport_t *transport, const error_t *err, void *arg);
+};
+
+/**
+ * Bitmask of available events
+ */
+enum transport_event {
+    TRANSPORT_READ  = 0x01,
+    TRANSPORT_WRITE = 0x02,
 };
 
 /**
@@ -54,6 +62,9 @@
 
     /** The callback context argument */
     void *cb_arg;
+
+    /** Initial event mask using transport_event flags */
+    short ev_mask;
 };
 
 /**
@@ -61,7 +72,8 @@
  * the number of bytes read (which will be less than or equal to \a len). If the transport is nonblocking, and there is
  * no data available, this returns zero, and need not be called again until transport_callbacks::on_read is invoked.
  *
- * On errors, this returns the negative error code, and more info via \a err.
+ * On errors, this returns the negative error code, and more info via \a err. Note that as opposed to read(2), EOF is
+ * handled as an error, returning ERR_EOF.
  *
  * @param transport the transport state
  * @param buf the buffer to read the bytes into
@@ -89,6 +101,11 @@
 int transport_write (transport_t *transport, const void *buf, size_t len, error_t *err);
 
 /**
+ * Change the mask of enabled events
+ */
+void transport_events (transport_t *transport, short mask);
+
+/**
  * Close and destroy the transport immediately, severing any established connection rudely.
  *
  * This will release all resources associated with the transport, including the transport itself, which must not be
--- a/src/transport_fd.c	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/transport_fd.c	Tue Apr 28 20:27:45 2009 +0300
@@ -1,5 +1,7 @@
 #include "transport_fd.h"
 
+#include "log.h"
+
 #include <fcntl.h>
 #include <unistd.h>
 #include <assert.h>
@@ -25,7 +27,7 @@
 /**
  * Our transport_methods implementations
  */
-err_t transport_fd_read (transport_t *transport, void *buf, size_t *len, error_t *err)
+err_t transport_fd_methods_read (transport_t *transport, void *buf, size_t *len, error_t *err)
 {
     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
     int ret;
@@ -37,7 +39,7 @@
     
     else if (ret == 0)
         // EOF
-        return SET_ERROR(err, ERR_READ_EOF);
+        return SET_ERROR(err, ERR_EOF);
 
 
     if (ret < 0) {
@@ -53,9 +55,9 @@
     return SUCCESS;
 }
 
-err_t transport_fd_write (transport_t *transport, const void *buf, size_t *len, struct error_info *err)
+err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, struct error_info *err)
 {
-    struct transport_fd *sock = transport_check(transport, &transport_fd_type);
+    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
     int ret;
     
     // write(), and detect non-EAGAIN or EOF
@@ -80,15 +82,18 @@
     return SUCCESS;
 }
 
-err_t transport_fd_destroy (transport_t *transport)
+void _transport_fd_destroy (transport_t *transport)
 {
-    // XXX: implement
+
+    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
+
+    transport_fd_destroy(fd);
 }
 
-struct transport_methods transport_fd_methods = {
-    .read       = transport_fd_read,
-    .write      = transport_fd_write,
-    .destroy    = transport_fd_destroy
+const struct transport_methods transport_fd_methods = {
+    .read       = transport_fd_methods_read,
+    .write      = transport_fd_methods_write,
+    .destroy    = _transport_fd_destroy
 };
 
 /**
@@ -99,6 +104,7 @@
     // sanity-check
     assert(!fd->fd);
     assert(!fd->ev_read && !fd->ev_write);
+    assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
 
     // initialize
     fd->ev_base = ev_base;
@@ -108,6 +114,8 @@
 
 err_t transport_fd_nonblock (struct transport_fd *fd, bool nonblock)
 {
+    assert(fd->fd != TRANSPORT_FD_INVALID);
+
     // XXX: maintain old flags?
 
 
@@ -127,6 +135,7 @@
  */
 err_t transport_fd_install (struct transport_fd *fd)
 {
+    assert(fd->fd != TRANSPORT_FD_INVALID);
     assert(!fd->ev_read && !fd->ev_write);
 
     // create new events
@@ -149,11 +158,11 @@
 err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg)
 {
     // requires a valid fd
-    assert(fd->fd >= 0);
+    assert(fd->fd != TRANSPORT_FD_INVALID);
     
     // store
-    fd->ev_cb = ev_cb;
-    fd->ev_arg = cb_arg;
+    fd->cb_func = cb_func;
+    fd->cb_arg = cb_arg;
     
     // install the event handlers?
     if (!fd->ev_read || !fd->ev_write)
@@ -184,14 +193,14 @@
     if (fd->ev_read)
         event_free(fd->ev_read);
 
-    if (sock->ev_write)
-        event_free(sock->ev_write);
+    if (fd->ev_write)
+        event_free(fd->ev_write);
     
     fd->ev_read = NULL;
     fd->ev_write = NULL;
 }
 
-void transport_fd_clear (struct transport_fd *fd);
+void transport_fd_clear (struct transport_fd *fd)
 {
     // remove the events
     transport_fd_remove(fd);
@@ -202,32 +211,60 @@
 
 err_t transport_fd_set (struct transport_fd *fd, int _fd)
 {
+    assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
+
     // close the old stuff
     transport_fd_close(fd);
     
     // set the new one
-    sock->fd = fd;
+    fd->fd = _fd;
     
     // do we have callbacks that we need to setup?
-    if (sock->cb_func)
+    if (fd->cb_func)
         return transport_fd_install(fd);
     else 
         return SUCCESS;
 }
 
+void transport_fd_invoke (struct transport_fd *fd, short what)
+{
+    short _what = 0;
+
+    if (what & EV_READ)
+        _what |= TRANSPORT_READ;
+
+    if (what & EV_WRITE)
+        _what |= TRANSPORT_WRITE;
+    
+    // invoke
+    transport_invoke(TRANSPORT_FD_BASE(fd), _what);
+}
+
 err_t transport_fd_close (struct transport_fd *fd)
 {
+    int _fd = fd->fd;
+
     // remove any installed events
     transport_fd_remove(fd);
 
-    // close the socket itself
-    if (fd->fd >= 0 && close(fd->fd))
+    // invalidate fd
+    fd->fd = TRANSPORT_FD_INVALID;
+ 
+    // close the fd
+    if (_fd != TRANSPORT_FD_INVALID && close(_fd))
         return ERR_CLOSE;
 
-    // invalidate
-    fd->fd = -1;
-    
 
     return SUCCESS;
 }
 
+void transport_fd_destroy (struct transport_fd *fd)
+{
+    err_t tmp;
+
+    // XXX: this might block
+    if ((tmp = transport_fd_close(fd)))
+        log_warn_err_code(tmp, "close");
+
+}
+
--- a/src/transport_fd.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/transport_fd.h	Tue Apr 28 20:27:45 2009 +0300
@@ -13,6 +13,9 @@
 #include <event2/event.h>
 #include <stdbool.h>
 
+// forward-declare
+struct transport_fd;
+
 /**
  * Low-level callback
  */
@@ -26,7 +29,7 @@
     struct transport base;
 
     /** Libevent base to use */
-    struct ev_base *ev_base;
+    struct event_base *ev_base;
     
     /** OS file descriptor */
     evutil_socket_t fd;
@@ -47,21 +50,34 @@
  */
 #define TRANSPORT_FD_BASE(tp_ptr) (&(tp_ptr)->base)
 
+/**
+ * Invalid OS FD
+ */
+#define TRANSPORT_FD_INVALID ((evutil_socket_t) -1)
 
 /**
- * Generic implementations of the transport_methods
+ * Implementation of transport_methods::read
+ */
+err_t transport_fd_methods_read (transport_t *transport, void *buf, size_t *len, error_t *err);
+
+/**
+ * Implementation of transport_methods::write
+ */
+err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, struct error_info *err);
+
+/**
+ * The transport_methods struct
  */
 extern const struct transport_methods transport_fd_methods;
 
-
 /**
- * Initialize the transport_fd to use the given, connected fd, or -1 if we don't yet have an fd.
+ * Initialize the transport_fd to use the given, connected fd, or TRANSPORT_FD_INVALID if we don't yet have an fd.
  *
  * It is an error to call this if the transport_fd already has an fd set
  *
  * @param fd the transport_fd state
  * @param ev_base the libevent base to use
- * @param _fd the OS file descriptor, or -1
+ * @param _fd the OS file descriptor, or TRANSPORT_FD_INVALID
  */
 void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd);
 
@@ -84,6 +100,8 @@
 
 /**
  * Remove any old event callback present, so it will not be called anymore.
+ *
+ * It is perfectly safe to call this without any callbacks installed.
  */
 void transport_fd_clear (struct transport_fd *fd);
 
@@ -94,9 +112,18 @@
 err_t transport_fd_set (struct transport_fd *fd, int _fd);
 
 /**
+ * Invoke the transport_callbacks based on the given mask of libevent EV_* bits
+ */
+void transport_fd_invoke (struct transport_fd *fd, short what);
+
+/**
  * Close an opened fd, releasing all resources within our state.
  */
 err_t transport_fd_close (struct transport_fd *fd);
 
+/**
+ * Destroy the fd immediately.
+ */
+void transport_fd_destroy (struct transport_fd *fd);
 
 #endif
--- a/src/transport_internal.h	Tue Apr 28 17:52:48 2009 +0300
+++ b/src/transport_internal.h	Tue Apr 28 20:27:45 2009 +0300
@@ -8,6 +8,8 @@
  */ 
 #include "transport.h"
 
+#include <stdbool.h>
+
 /**
  * Method table for implementation stuff
  */
@@ -18,8 +20,24 @@
     /** For transport_write() */
     err_t (*write) (transport_t *transport, const void *buf, size_t *len, error_t *err);
 
-    /** Release the transport's state, but not the transport itself */
+    /** 
+     * Release the transport's internal state, but not the transport itself.
+     *
+     * In other words, this should release everything inside the transport_t, but not free() the transport_t itself.
+     */
     void (*destroy) (transport_t *transport);
+
+    /**
+     * Used by layered transports to handle transport_connected.
+     *
+     * If this is NULL, transport_connected will call the user callback directly, otherwise, it will proxy through this.
+     *
+     * The \a err param follows the same rules as for transport_connected() - NULL for success, error info otherwise.
+     *
+     * @param transport the transport state
+     * @param err error info if the connect failed
+     */
+    void (*_connected) (transport_t *transport, const error_t *err);
 };
 
 /**
@@ -39,6 +57,9 @@
 
     /** User info */
     struct transport_info info;
+
+    /** Are we connected? */
+    bool connected;
 };
 
 /**
@@ -46,7 +67,7 @@
  *
  * It is a bug to call this with a transport that is already bound.
  */
-void transport_bind (transport_t *transport, const struct transport_type *type, const struct transport_info *info);
+void transport_init (transport_t *transport, const struct transport_type *type, const struct transport_info *info);
 
 /**
  * Check the type of the transport, and return the transport as a void* suitable for casting to the appropriate struct
@@ -56,5 +77,33 @@
  */
 void* transport_check (transport_t *transport, const struct transport_type *type);
 
+/**
+ * Mark the transport as connected, calling transport_methods::_connected if it exists and \a direct is not given,
+ * transport_callbacks::on_connected/transport_callbacks::on_error otherwise.
+ *
+ * If the connect succeeded, \a err should be given as NULL. If the connect failed, \a err should contain the error
+ * info.
+ *
+ * If called from the transport_methods::_connected method, pass in direct to avoid recursion.
+ *
+ * This sets the transport::connected flag, regardless of which callback it invokes.
+ *
+ * XXX: implement proper layering of types, linkig transport_type's together
+ *
+ * @param transport the transport state
+ * @param err NULL for success, otherwise connect error code
+ * @param direct call the user callback directly, ignoring any method
+ */
+void transport_connected (transport_t *transport, const error_t *err, bool direct);
+
+/**
+ * Invoke the user callbacks based on the given TRANSPORT_* flags
+ */
+void transport_invoke (transport_t *transport, short what);
+
+/**
+ * Mark the transport as failed, calling transport_methods::on_error with the given error code.
+ */
+void transport_error (transport_t *transport, const error_t *err);
 
 #endif /* TRANSPORT_INTERNAL_H */