fix sock_stream read/write return value, move line buffer inside of line_proto, add some initial code for event-based non-blocking operation
authorTero Marttila <terom@fixme.fi>
Sun, 22 Feb 2009 10:16:28 +0200
changeset 10 9fe218576d13
parent 9 4c4c906cc649
child 11 14e79683c48c
fix sock_stream read/write return value, move line buffer inside of line_proto, add some initial code for event-based non-blocking operation
src/error.c
src/error.h
src/line_proto.c
src/line_proto.h
src/nexus.c
src/sock.c
src/sock.h
src/sock_gnutls.c
src/sock_internal.h
src/sock_tcp.c
src/sock_tcp.h
--- a/src/error.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/error.c	Sun Feb 22 10:16:28 2009 +0200
@@ -23,6 +23,7 @@
         ERROR_NAME( ERR_READ,                           "read"                                          );
         ERROR_NAME( ERR_READ_EOF,                       "read: EOF"                                     );
         ERROR_NAME( ERR_WRITE,                          "write"                                         );
+        ERROR_NAME( ERR_FCNTL,                          "fcntl"                                         );
         ERROR_NAME( ERR_GNUTLS_CERT_ALLOC_CRED,         "gnutls_certificate_allocate_credentials"       );
         ERROR_NAME( ERR_GNUTLS_GLOBAL_INIT,             "gnutls_global_init"                            );
         ERROR_NAME( ERR_GNUTLS_INIT,                    "gnutls_init"                                   );
--- a/src/error.h	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/error.h	Sun Feb 22 10:16:28 2009 +0200
@@ -50,6 +50,7 @@
     _ERROR_CODE( ERR_READ,                          0x000401,   ERRNO   ),
     _ERROR_CODE( ERR_READ_EOF,                      0x000402,   NONE    ),
     _ERROR_CODE( ERR_WRITE,                         0x000403,   ERRNO   ),
+    _ERROR_CODE( ERR_FCNTL,                         0x000404,   ERRNO   ),
 
     /* GnuTLS errors */
     _ERROR_CODE( ERR_GNUTLS_CERT_ALLOC_CRED,        0x010101,   GNUTLS  ),
@@ -58,6 +59,11 @@
     _ERROR_CODE( ERR_GNUTLS_SET_DEFAULT_PRIORITY,   0x010104,   GNUTLS  ),
     _ERROR_CODE( ERR_GNUTLS_CRED_SET,               0x010105,   GNUTLS  ),
     _ERROR_CODE( ERR_GNUTLS_HANDSHAKE,              0x010106,   GNUTLS  ),
+    _ERROR_CODE( ERR_GNUTLS_RECORD_SEND,            0x010107,   GNUTLS  ),
+    _ERROR_CODE( ERR_GNUTLS_RECORD_RECV,            0x010108,   GNUTLS  ),
+
+    /* Libevent errors */
+    _ERROR_CODE( ERR_EVENT_NEW,                     0x010201,   NONE    ),
     
     // mask of bits used for the error_code value
     _ERROR_CODE_MASK    = 0xffffff,
@@ -106,6 +112,9 @@
 /* Compare error_info.code != 0 */
 #define IS_ERROR(err_info_ptr) (!!(err_info_ptr)->code)
 
+/* Compare the err_code/err_extra for an err_info */
+#define MATCH_ERROR(err_info_ptr, err_code, err_extra) ((err_info_ptr)->code == (err_code) && (err_info_ptr)->extra == (err_extra))
+
 /* Set error_info.code, but leave err_extra as-is. Evaluates to err_code */
 #define SET_ERROR(err_info_ptr, err_code) ((err_info_ptr)->code = (err_code))
 
--- a/src/line_proto.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.c	Sun Feb 22 10:16:28 2009 +0200
@@ -5,6 +5,8 @@
 #include <stdlib.h>
 #include <assert.h>
 
+#include <err.h>
+
 /*
  * Our state
  */
@@ -12,6 +14,12 @@
     /* The sock_stream we read/write with */
     struct sock_stream *sock;
 
+    /* The incoming line buffer */
+    char *buf;
+
+    /* Incoming buffer size */
+    size_t buf_len;
+
     /* Offset of trailing data in buf */
     size_t tail_offset;
 
@@ -20,18 +28,82 @@
 
     /* Last error */
     struct error_info err;
+
+    /* Callback info */
+    line_proto_read_cb cb_read;
+    void *cb_arg;
 };
 
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err)
+// function prototypes
+static err_t line_proto_schedule_events (struct line_proto *lp, short what);
+
+/*
+ * Our sock_stream on_read handler
+ */
+static void line_proto_on_read (struct sock_stream *sock, void *arg)
+{
+    struct line_proto *lp = arg;
+    const char *line;
+
+    // sanity-check
+    assert(lp->tail_offset < lp->buf_len);
+    
+    do {
+        // attempt to read a line
+        if (line_proto_read(lp, &line))
+            // XXX: fail
+            errx(1, "line_proto_read: %s", error_msg(&lp->err));
+        
+        // got a line?
+        if (line)
+            lp->cb_read(lp, line, lp->cb_arg);
+
+    } while (line);
+
+    // reschedule
+    if (line_proto_schedule_events(lp, EV_READ))
+        errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
+}
+
+/*
+ * Schedule our sock_stream callback
+ */
+static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
+{
+    // just use sock_stream's interface
+    if (sock_stream_event_enable(lp->sock, what))
+        RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+    return SUCCESS;
+}
+
+struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+    .on_read    = &line_proto_on_read,
+};
+
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+        line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
 {
     struct line_proto *lp;
 
     // allocate
     if ((lp = calloc(1, sizeof(*lp))) == NULL)
         return SET_ERROR(err, ERR_CALLOC);
+    
+    // allocate buffer
+    if ((lp->buf = malloc(buf_size)) == NULL) {
+        free(lp);
+        return SET_ERROR(err, ERR_CALLOC);
+    }
 
     // store
     lp->sock = sock;
+    lp->buf_len = buf_size;
+    lp->cb_read = cb_func;
+    lp->cb_arg = cb_arg;
+
+    // initialize event-based stuff
+    sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp);
 
     // return
     *lp_ptr = lp;
@@ -75,7 +147,7 @@
     return i + 2;
 }
 
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len)
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr)
 {
     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
@@ -87,7 +159,7 @@
     // move trailing data from previous line to front of buffer
     if (lp->tail_offset) {
         // move to front
-        memmove(buf, buf + lp->tail_offset, lp->tail_len);
+        memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
 
         // reset
         lp->tail_offset = 0;
@@ -97,15 +169,32 @@
     // readline loop
     do {
         // parse any line at the beginning of the buffer
-        if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0)
+        if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
+            // store a valid *line_ptr
+            *line_ptr = lp->buf;
+            
+            // exit loop and return
             break;
+        }
 
-        // ensure there's enough space for it
-        assert(recv_offset < len);
+        // ensure there's enough space for the rest of the line
+        // XXX: this must be an error, not an assert
+        assert(recv_offset < lp->buf_len);
         
         // otherwise, read more data
-        if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0)
-            RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+        if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
+            // we can special-case EAGAIN, as it's expected
+            if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
+                // return a NULL *line_ptr
+                *line_ptr = NULL;
+                break;
+
+            } else {
+                // store and return NULL on errors
+                RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+            }
+        }
 
         // EOF?
         if (ret == 0)
--- a/src/line_proto.h	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.h	Sun Feb 22 10:16:28 2009 +0200
@@ -13,9 +13,19 @@
 struct line_proto;
 
 /*
+ * The callback for receiving lines
+ */
+typedef void (*line_proto_read_cb)(struct line_proto *lp, const char *line, void *arg);
+
+/*
  * Create a new line_proto off the the given sock_stream. The newly allocated line_proto will be returned via *lp_ptr.
+ *
+ * The incoming lines are buffered in a buffer of \a buf_size bytes. This imposes a maximum limit on the line length.
+ *
+ * The given callback function/argument will be used to provide event-based recv support.
  */
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err);
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+        line_proto_read_cb cb_func, void *cb_arg, struct error_info *err);
 
 /*
  * Receive one line into the given buffer. The line will be terminated with '\r\n', and said terminator will be
@@ -23,7 +33,7 @@
  *
  * Note: currently this uses the buffer to store intermediate state, so always pass the same buffer (for now).
  */
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len);
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr);
 
 /*
  * Get current error_info*
--- a/src/nexus.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/nexus.c	Sun Feb 22 10:16:28 2009 +0200
@@ -20,7 +20,7 @@
     struct event_base *ev_base;
     struct sock_stream *sock;
     struct line_proto *lp;
-    char line_buf[LINE_LENGTH + 1];
+    char *line;
     struct error_info _err;
 
     // initialize libevent
@@ -36,17 +36,17 @@
         errx(1, "sock_gnutls_connect: %s", error_msg(&_err));
 
     // line protocol
-    if (line_proto_create(&lp, sock, &_err))
+    if (line_proto_create(&lp, sock, LINE_LENGTH, NULL, NULL, &_err))
         errx(1, "line_proto_create: %s", error_msg(&_err));
 
     // read lines and dump them out
     do {
         // recv
-        if (line_proto_read(lp, line_buf, sizeof(line_buf)))
+        if (line_proto_read(lp, &line))
             errx(1, "line_proto_read: %s", error_msg(line_proto_error(lp)));
 
         // printf
-        printf("<<< %s\n", line_buf);
+        printf("<<< %s\n", line);
 
     } while (1);
     
--- a/src/sock.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock.c	Sun Feb 22 10:16:28 2009 +0200
@@ -29,23 +29,44 @@
     sock->type = type;
 }
 
-void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg)
+int sock_stream_read (struct sock_stream *sock, void *buf, size_t len)
+{
+    err_t err;
+
+    // proxy off to method handler
+    if ((err = sock->type->methods.read(sock, buf, &len)))
+        return err;
+    
+    // return updated bytes-read len
+    return len;
+}
+
+int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len)
+{
+    err_t err;
+
+    // proxy off to method handler
+    if ((err = sock->type->methods.write(sock, buf, &len)))
+        return err;
+
+    // return updated bytes-written len
+    return len;
+}
+
+err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg)
 {
     // store
     sock->cb_info = callbacks;
     sock->cb_arg = arg;
+
+    // run method
+    return sock->type->methods.event_init(sock);
 }
 
-err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len)
+err_t sock_stream_event_enable (struct sock_stream *sock, short mask)
 {
-    // proxy off to method handler
-    return sock->type->methods.read(sock, buf, len);
-}
-
-err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len)
-{
-    // proxy off to method handler
-    return sock->type->methods.write(sock, buf, len);
+    // run method
+    return sock->type->methods.event_enable(sock, mask);
 }
 
 const struct error_info* sock_stream_error (struct sock_stream *sock)
--- a/src/sock.h	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock.h	Sun Feb 22 10:16:28 2009 +0200
@@ -51,15 +51,26 @@
 err_t sock_gnutls_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err);
 
 /*
- * Set the callbacks for a sock_stream. Note that the callbacks struct isn't copied - it's used as-is-given.
+ * The generic read/write API for stream sockets.
  */
-void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg);
+int sock_stream_read (struct sock_stream *sock, void *buf, size_t len);
+int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len);
 
 /*
- * The generic read/write API for stream sockets.
+ * Initialize event-based operation for this sock_stream. This will set the stream into nonblocking mode, and the given
+ * callbacks will be fired once enabled using sock_stream_event_enable().
+ *
+ * Note that the callbacks struct isn't copied - it's used as-is-given.
  */
-err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len);
-err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len);
+err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg);
+
+/*
+ * Prime the callbacks set up earlier with sock_stream_event_init to fire once ready.
+ *
+ * \a mask is a bitmask of EV_* bits, such as EV_READ, EV_WRITE or EV_PERSIST. See event_set() for more info about the
+ * behaviour of those.
+ */
+err_t sock_stream_event_enable (struct sock_stream *sock, short mask);
 
 /**
  * Get current error_info for \a sock.
--- a/src/sock_gnutls.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_gnutls.c	Sun Feb 22 10:16:28 2009 +0200
@@ -4,28 +4,59 @@
 #include <stdlib.h>
 #include <err.h>
 
-static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t len)
+// XXX: errors
+static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t *len)
+{
+    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    int ret;
+    
+    // just map to gnutls_record_recv
+    if ((ret = gnutls_record_recv(sock->session, buf, *len)) < 0)
+        RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_RECV);
+    
+    // updated length
+    *len = ret;
+
+    return SUCCESS;
+}
+
+static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t *len)
+{
+    struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    int ret;
+    
+    // just map to gnutls_record_send
+    if ((ret = gnutls_record_send(sock->session, buf, *len)) < 0)
+        RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_SEND);
+    
+    // updated length
+    *len = ret;
+
+    return SUCCESS;
+}
+
+static err_t sock_gnutls_event_init (struct sock_stream *base_sock)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
     
-    // just map to gnutls_record_recv
-    return gnutls_record_recv(sock->session, buf, len);
+    return SUCCESS;
 }
 
-static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t len)
+static err_t sock_gnutls_event_enable (struct sock_stream *base_sock, short mask)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
     
-    // just map to gnutls_record_send
-    return gnutls_record_send(sock->session, buf, len);
+    return SUCCESS;
 }
 
 /*
  * Our sock_stream_Type
  */
 struct sock_stream_type sock_gnutls_type = {
-    .methods.read   = &sock_gnutls_read,
-    .methods.write  = &sock_gnutls_write,
+    .methods.read           = &sock_gnutls_read,
+    .methods.write          = &sock_gnutls_write,
+    .methods.event_init     = &sock_gnutls_event_init,
+    .methods.event_enable   = &sock_gnutls_event_enable,
 };
 
 /*
--- a/src/sock_internal.h	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_internal.h	Sun Feb 22 10:16:28 2009 +0200
@@ -19,10 +19,16 @@
     /* method table */
     struct sock_stream_methods {
         /* Normal read(2) */
-        err_t (*read) (struct sock_stream *sock, void *buf, size_t len);
+        err_t (*read) (struct sock_stream *sock, void *buf, size_t *len);
 
         /* Normal write(2) */
-        err_t (*write) (struct sock_stream *sock, const void *buf, size_t len);
+        err_t (*write) (struct sock_stream *sock, const void *buf, size_t *len);
+
+        /* Initialize events. cb_info/cb_arg are already updated */
+        err_t (*event_init) (struct sock_stream *sock);
+
+        /* Enable events as specified */
+        err_t (*event_enable) (struct sock_stream *sock, short mask);
 
     } methods;
 };
--- a/src/sock_tcp.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_tcp.c	Sun Feb 22 10:16:28 2009 +0200
@@ -6,53 +6,87 @@
 #include <sys/socket.h>
 #include <netdb.h>
 #include <unistd.h>
+#include <fcntl.h>
 #include <string.h>
 #include <assert.h>
 
 /*
  * Our sock_stream_methods.read method
  */
-static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t len)
+static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t *len)
 {
     struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
     int ret;
     
     // map directly to read(2)
-    if ((ret = read(sock->fd, buf, len)) < 0)
+    if ((ret = read(sock->fd, buf, *len)) < 0)
         // errno
         RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ);
 
-    else 
-        // bytes read
-        return ret;
+    // bytes read
+    *len = ret;
+
+    return SUCCESS;
 }
 
 /*
  * Our sock_stream_methods.write method
  */
-static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t len)
+static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t *len)
 {
     struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
     int ret;
     
     // map directly to write(2)
-    if ((ret = write(sock->fd, buf, len)) < 0)
+    if ((ret = write(sock->fd, buf, *len)) < 0)
         // errno
         RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE);
 
-    else
-        // bytes read
-        return ret;
+    // bytes read
+    *len = ret;
+
+    return SUCCESS;
+}
+
+static err_t sock_tcp_event_init (struct sock_stream *base_sock)
+{
+    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
+    
+    return SUCCESS;
+}
+
+static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask)
+{
+    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
+    
+    return SUCCESS;
 }
 
 /*
  * Our sock_stream_type
  */
 struct sock_stream_type sock_tcp_type = {
-    .methods.read   = &sock_tcp_read,
-    .methods.write  = &sock_tcp_write,
+    .methods.read           = &sock_tcp_read,
+    .methods.write          = &sock_tcp_write,
+    .methods.event_init     = &sock_tcp_event_init,
+    .methods.event_enable   = &sock_tcp_event_enable,
 };
 
+/*
+ * Our basic socket event handler for driving our callbacks
+ */
+static void sock_tcp_event (evutil_socket_t fd, short what, void *arg) 
+{
+    struct sock_tcp *sock = arg;
+
+    // invoke appropriate callback
+    if (what & EV_READ && SOCK_TCP_BASE(sock)->cb_info->on_read)
+        SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
+
+    if (what & EV_WRITE && SOCK_TCP_BASE(sock)->cb_info->on_write)
+        SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
+}
+
 err_t sock_tcp_alloc (struct sock_tcp **sock_ptr)
 {
     // alloc
@@ -78,6 +112,19 @@
     return SUCCESS;
 }
 
+err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb)(evutil_socket_t, short, void *), void *cb_arg)
+{
+    // require valid fd
+    assert(sock->fd >= 0);
+    
+    // create new event
+    if ((sock->ev = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_READ, ev_cb, cb_arg)) == NULL)
+        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);
+
+    // ok
+    return SUCCESS;
+}
+
 err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service)
 {
     struct addrinfo hints, *res, *r;
@@ -137,6 +184,26 @@
     return 0;    
 }
 
+err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock)
+{
+    // fcntl it
+    // XXX: maintain old flags?
+    if (fcntl(sock->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
+        RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_FCNTL);
+
+    // ok
+    return SUCCESS;
+}
+
+void sock_tcp_release (struct sock_tcp *sock)
+{
+    // must not be connected
+    assert(sock->fd < 0);
+
+    // free
+    free(sock);
+}
+
 err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err_info)
 {
     struct sock_tcp *sock;
@@ -164,11 +231,4 @@
     return 0;
 }
 
-void sock_tcp_release (struct sock_tcp *sock)
-{
-    // must not be connected
-    assert(sock->fd < 0);
 
-    // free
-    free(sock);
-}
--- a/src/sock_tcp.h	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_tcp.h	Sun Feb 22 10:16:28 2009 +0200
@@ -15,6 +15,9 @@
     
     /* The OS file descriptor */
     int fd;
+
+    /* The libevent struct */
+    struct event *ev;
 };
 
 #define SOCK_TCP_BASE(sock_ptr) (&(sock_ptr)->base)
@@ -31,11 +34,22 @@
 err_t sock_tcp_init_fd (struct sock_tcp *sock, int fd);
 
 /*
+ * Initialize sock_tcp.ev to use the socket's fd with the given callback. By default, this is created with EV_READ
+ * flags, but is not added.
+ */
+err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb) (evutil_socket_t, short, void *), void *arg);
+
+/*
  * Initialize a blank sock_tcp by connecting
  */
 err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service);
 
 /*
+ * Set the socket's nonblock mode
+ */
+err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock);
+
+/*
  * Release a non-connected sock_tcp
  */
 void sock_tcp_release (struct sock_tcp *sock);