update sock_stream_read/write semantics for EOF/EAGAIN, tentative event-based gnutls code
authorTero Marttila <terom@fixme.fi>
Sat, 28 Feb 2009 18:48:10 +0200
changeset 12 4147fae232d9
parent 11 14e79683c48c
child 13 ca16f3a8f3b7
update sock_stream_read/write semantics for EOF/EAGAIN, tentative event-based gnutls code
src/error.h
src/line_proto.c
src/line_proto.h
src/nexus.c
src/sock.c
src/sock.h
src/sock_gnutls.c
src/sock_gnutls.h
src/sock_internal.h
src/sock_tcp.c
src/sock_tcp.h
--- a/src/error.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/error.h	Sat Feb 28 18:48:10 2009 +0200
@@ -50,7 +50,8 @@
     _ERROR_CODE( ERR_READ,                          0x000401,   ERRNO   ),
     _ERROR_CODE( ERR_READ_EOF,                      0x000402,   NONE    ),
     _ERROR_CODE( ERR_WRITE,                         0x000403,   ERRNO   ),
-    _ERROR_CODE( ERR_FCNTL,                         0x000404,   ERRNO   ),
+    _ERROR_CODE( ERR_WRITE_EOF,                     0x000404,   NONE    ),
+    _ERROR_CODE( ERR_FCNTL,                         0x000405,   ERRNO   ),
 
     /* GnuTLS errors */
     _ERROR_CODE( ERR_GNUTLS_CERT_ALLOC_CRED,        0x010101,   GNUTLS  ),
@@ -61,6 +62,7 @@
     _ERROR_CODE( ERR_GNUTLS_HANDSHAKE,              0x010106,   GNUTLS  ),
     _ERROR_CODE( ERR_GNUTLS_RECORD_SEND,            0x010107,   GNUTLS  ),
     _ERROR_CODE( ERR_GNUTLS_RECORD_RECV,            0x010108,   GNUTLS  ),
+    _ERROR_CODE( ERR_GNUTLS_RECORD_GET_DIRECTION,   0x010109,   GNUTLS  ),
 
     /* Libevent errors */
     _ERROR_CODE( ERR_EVENT_NEW,                     0x010201,   NONE    ),
--- a/src/line_proto.c	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/line_proto.c	Sat Feb 28 18:48:10 2009 +0200
@@ -154,7 +154,7 @@
     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
     int ret;
 
-    // adjust offset from previous data
+    // adjust offset to beyond previous data (as will be moved next)
     recv_offset = lp->tail_len;
 
     // move trailing data from previous line to front of buffer
@@ -183,23 +183,16 @@
         assert(recv_offset < lp->buf_len);
         
         // otherwise, read more data
-        if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
-            // we can special-case EAGAIN, as it's expected
-            if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
-                // return a NULL *line_ptr
-                *line_ptr = NULL;
-                break;
+        if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0)
+            // store and return NULL on errors
+            RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
 
-            } else {
-                // store and return NULL on errors
-                RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
-            }
+        // EAGAIN?
+        if (ret == 0) {
+            // return a NULL *line_ptr
+            *line_ptr = NULL;
+            break;
         }
-
-        // EOF?
-        if (ret == 0)
-            return SET_ERROR(&lp->err, ERR_READ_EOF);
         
         // update recv_offset
         recv_offset += ret;
--- a/src/line_proto.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/line_proto.h	Sat Feb 28 18:48:10 2009 +0200
@@ -35,6 +35,11 @@
 err_t line_proto_read (struct line_proto *lp, const char **line_ptr);
 
 /*
+ * Signify that the line read with line_proto_read() was handled and can be discarded
+ */
+void line_proto_discard (struct line_proto *lp);
+
+/*
  * Get current error_info*
  */
 const struct error_info* line_proto_error (struct line_proto *lp);
--- a/src/nexus.c	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/nexus.c	Sat Feb 28 18:48:10 2009 +0200
@@ -12,8 +12,8 @@
 #include "sock.h"
 #include "line_proto.h"
 
-#define CONNECT_HOST "irc.fixme.fi"
-#define CONNECT_SERV "6667"
+#define CONNECT_HOST "localhost"
+#define CONNECT_SERV "5001"
 #define LINE_LENGTH 512
 
 void on_line (const char *line, void *arg) {
--- a/src/sock.c	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock.c	Sat Feb 28 18:48:10 2009 +0200
@@ -74,3 +74,12 @@
     // return pointer
     return SOCK_ERR(sock);
 }
+
+void sock_stream_invoke_callbacks (struct sock_stream *sock, short what)
+{
+    if (what & EV_READ && sock->cb_info->on_read)
+        sock->cb_info->on_read(sock, sock->cb_arg);
+
+    if (what & EV_WRITE && sock->cb_info->on_write)
+        sock->cb_info->on_read(sock, sock->cb_arg);
+}
--- a/src/sock.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock.h	Sat Feb 28 18:48:10 2009 +0200
@@ -51,7 +51,10 @@
 err_t sock_gnutls_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err);
 
 /*
- * The generic read/write API for stream sockets.
+ * The generic read/write API for stream sockets. These are mostly identical to the equivalent read/write syscalls, but
+ * the handling of EOF and EAGAIN is different. Normally, these return the (positive) number of bytes written. For
+ * EAGAIN, these return zero. For EOF, these return -ERR_READ_EOF/ERR_WRITE_EOF. Otherwise, these return the -ERR_*
+ * code.
  */
 int sock_stream_read (struct sock_stream *sock, void *buf, size_t len);
 int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len);
@@ -65,10 +68,11 @@
 err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg);
 
 /*
- * Prime the callbacks set up earlier with sock_stream_event_init to fire once ready.
+ * Enable the events for this sock, as set up earlier with event_init. Mask should contain EV_READ/EV_WRITE.
  *
- * \a mask is a bitmask of EV_* bits, such as EV_READ, EV_WRITE or EV_PERSIST. See event_set() for more info about the
- * behaviour of those.
+ * The implementation of this is slightly hazy for complex protocols; this should only be used to map from
+ * sock_stream_read/write to the corresponding sock_stream_callback. That is, if sock_stream_read returns zero, then
+ * call event_enable(EV_READ), wherepon on_read will later be called.
  */
 err_t sock_stream_event_enable (struct sock_stream *sock, short mask);
 
--- a/src/sock_gnutls.c	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock_gnutls.c	Sat Feb 28 18:48:10 2009 +0200
@@ -8,14 +8,29 @@
 static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t *len)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    struct error_info *err = SOCK_GNUTLS_ERR(sock);
     int ret;
     
-    // just map to gnutls_record_recv
-    if ((ret = gnutls_record_recv(sock->session, buf, *len)) < 0)
-        RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_RECV);
+    // read gnutls record
+    ret = gnutls_record_recv(sock->session, buf, *len);
     
-    // updated length
-    *len = ret;
+    // errors
+    if (ret < 0 && ret != GNUTLS_E_AGAIN)
+        RETURN_SET_ERROR_EXTRA(err, ERR_GNUTLS_RECORD_RECV, ret);
+    
+    else if (ret == 0)
+        return SET_ERROR(err, ERR_READ_EOF);
+
+
+    // eagain?
+    if (ret == 0) {
+        *len = 0;
+
+    } else {
+        // updated length
+        *len = ret;
+
+    }
 
     return SUCCESS;
 }
@@ -23,29 +38,87 @@
 static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t *len)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    struct error_info *err = SOCK_GNUTLS_ERR(sock);
     int ret;
+ 
+    // read gnutls record
+    ret = gnutls_record_send(sock->session, buf, *len);
     
-    // just map to gnutls_record_send
-    if ((ret = gnutls_record_send(sock->session, buf, *len)) < 0)
-        RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_SEND);
+    // errors
+    if (ret < 0 && ret != GNUTLS_E_AGAIN)
+        RETURN_SET_ERROR_EXTRA(err, ERR_GNUTLS_RECORD_RECV, ret);
     
-    // updated length
-    *len = ret;
+    else if (ret == 0)
+        return SET_ERROR(err, ERR_READ_EOF);
+
+
+    // eagain?
+    if (ret == 0) {
+        *len = 0;
+
+    } else {
+        // updated length
+        *len = ret;
+    }
 
     return SUCCESS;
 }
 
+static void sock_gnutls_event_handler (int fd, short what, void *arg)
+{
+    struct sock_gnutls *sock = arg;
+    
+    // gnutls might be able to proceed now, so ask user to try what didn't work before now, using the mask given to
+    // event_enable().
+    sock_stream_invoke_callbacks(SOCK_GNUTLS_BASE(sock), sock->ev_mask);
+}
+
 static err_t sock_gnutls_event_init (struct sock_stream *base_sock)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
-    
+
+    err_t err;
+
+    // set nonblocking
+    if ((err = sock_tcp_set_nonblock(SOCK_GNUTLS_TCP(sock), 1)))
+        return err;
+
+    // add ourselves as the event handler
+    if ((err = sock_tcp_init_ev(SOCK_GNUTLS_TCP(sock), &sock_gnutls_event_handler, sock)))
+        return err;
+
+    // ok
     return SUCCESS;
 }
 
 static err_t sock_gnutls_event_enable (struct sock_stream *base_sock, short mask)
 {
     struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+    int ret;
     
+    // store the ev_mask. We don't care about it here, because we assume that event_enable is only called once read or
+    // write, respectively, return zero. This is really the only case we can handle with gnutls.
+    sock->ev_mask = mask;
+
+    // gnutls_record_get_direction tells us what I/O operation gnutls would have required for the last
+    // operation, so we can use that to determine what events to register
+    switch ((ret = gnutls_record_get_direction(sock->session))) {
+        case 0: 
+            // read more data
+            sock_tcp_add_event(SOCK_GNUTLS_TCP(sock), EV_READ); 
+            break;
+        
+        case 1:
+            // write buffer full
+            sock_tcp_add_event(SOCK_GNUTLS_TCP(sock), EV_WRITE);
+            break;
+        
+        default:
+            // random error
+            RETURN_SET_ERROR_EXTRA(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_GET_DIRECTION, ret);
+    }
+    
+    // ok... wait
     return SUCCESS;
 }
 
--- a/src/sock_gnutls.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock_gnutls.h	Sat Feb 28 18:48:10 2009 +0200
@@ -31,6 +31,9 @@
 
     /* The GnuTLS session for this connection */
     gnutls_session_t session;
+
+    /* The current event_enable mask */
+    int ev_mask;
 };
 
 #define SOCK_GNUTLS_BASE(sock_ptr) (&(sock_ptr)->base_tcp.base)
--- a/src/sock_internal.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock_internal.h	Sat Feb 28 18:48:10 2009 +0200
@@ -63,4 +63,9 @@
  */
 void sock_stream_init (struct sock_stream *sock, struct sock_stream_type *type);
 
+/*
+ * Invoke the appropriate callbacks for the given EV_* bitmask.
+ */
+void sock_stream_invoke_callbacks (struct sock_stream *sock, short what);
+
 #endif /* SOCK_INTERNAL_H */
--- a/src/sock_tcp.c	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock_tcp.c	Sat Feb 28 18:48:10 2009 +0200
@@ -18,11 +18,7 @@
     struct sock_tcp *sock = arg;
 
     // invoke appropriate callback
-    if (what & EV_READ && SOCK_TCP_BASE(sock)->cb_info->on_read)
-        SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
-
-    if (what & EV_WRITE && SOCK_TCP_BASE(sock)->cb_info->on_write)
-        SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
+    sock_stream_invoke_callbacks(SOCK_TCP_BASE(sock), what);
 }
 
 /*
@@ -33,14 +29,26 @@
     struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
     int ret;
     
-    // map directly to read(2)
-    if ((ret = read(sock->fd, buf, *len)) < 0)
-        // errno
+    // read(), and detect non-EAGAIN or EOF
+    if ((ret = read(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
+        // unexpected error
         RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ);
+    
+    else if (ret == 0)
+        // EOF
+        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_READ_EOF);
 
-    // bytes read
-    *len = ret;
 
+    if (ret < 0) {
+        // EAGAIN -> zero bytes
+        *len = 0;
+
+    } else {
+        // normal -> bytes read
+        *len = ret;
+    }
+
+    // ok
     return SUCCESS;
 }
 
@@ -52,13 +60,24 @@
     struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
     int ret;
     
-    // map directly to write(2)
-    if ((ret = write(sock->fd, buf, *len)) < 0)
-        // errno
+    // write(), and detect non-EAGAIN or EOF
+    if ((ret = write(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
+        // unexpected error
         RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE);
+    
+    else if (ret == 0)
+        // EOF
+        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_WRITE_EOF);
 
-    // bytes read
-    *len = ret;
+
+    if (ret < 0) {
+        // EAGAIN -> zero bytes
+        *len = 0;
+
+    } else {
+        // normal -> bytes read
+        *len = ret;
+    }
 
     return SUCCESS;
 }
@@ -83,16 +102,9 @@
 static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask)
 {
     struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
-
-    // just add the appropraite events
-    if (mask & EV_READ && event_add(sock->ev_read, NULL))
-        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
- 
-    if (mask & EV_WRITE && event_add(sock->ev_write, NULL))
-        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
     
-    // done
-    return SUCCESS;
+    // implemented in sock_tcp_add_event
+    return sock_tcp_add_event(sock, mask);
 }
 
 /*
@@ -149,6 +161,19 @@
     return SUCCESS;
 }
 
+err_t sock_tcp_add_event (struct sock_tcp *sock, short mask)
+{
+    // just add the appropraite events
+    if (mask & EV_READ && event_add(sock->ev_read, NULL))
+        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
+ 
+    if (mask & EV_WRITE && event_add(sock->ev_write, NULL))
+        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
+    
+    // done
+    return SUCCESS;
+}
+
 err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service)
 {
     struct addrinfo hints, *res, *r;
--- a/src/sock_tcp.h	Sat Feb 28 17:39:37 2009 +0200
+++ b/src/sock_tcp.h	Sat Feb 28 18:48:10 2009 +0200
@@ -49,6 +49,11 @@
 err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock);
 
 /*
+ * event_add the specified ev_* events.
+ */
+err_t sock_tcp_add_event (struct sock_tcp *sock, short mask);
+
+/*
  * Release a non-connected sock_tcp
  */
 void sock_tcp_release (struct sock_tcp *sock);