fix sock_stream read/write return value, move line buffer inside of line_proto, add some initial code for event-based non-blocking operation
--- a/src/error.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/error.c Sun Feb 22 10:16:28 2009 +0200
@@ -23,6 +23,7 @@
ERROR_NAME( ERR_READ, "read" );
ERROR_NAME( ERR_READ_EOF, "read: EOF" );
ERROR_NAME( ERR_WRITE, "write" );
+ ERROR_NAME( ERR_FCNTL, "fcntl" );
ERROR_NAME( ERR_GNUTLS_CERT_ALLOC_CRED, "gnutls_certificate_allocate_credentials" );
ERROR_NAME( ERR_GNUTLS_GLOBAL_INIT, "gnutls_global_init" );
ERROR_NAME( ERR_GNUTLS_INIT, "gnutls_init" );
--- a/src/error.h Sun Feb 22 08:48:21 2009 +0200
+++ b/src/error.h Sun Feb 22 10:16:28 2009 +0200
@@ -50,6 +50,7 @@
_ERROR_CODE( ERR_READ, 0x000401, ERRNO ),
_ERROR_CODE( ERR_READ_EOF, 0x000402, NONE ),
_ERROR_CODE( ERR_WRITE, 0x000403, ERRNO ),
+ _ERROR_CODE( ERR_FCNTL, 0x000404, ERRNO ),
/* GnuTLS errors */
_ERROR_CODE( ERR_GNUTLS_CERT_ALLOC_CRED, 0x010101, GNUTLS ),
@@ -58,6 +59,11 @@
_ERROR_CODE( ERR_GNUTLS_SET_DEFAULT_PRIORITY, 0x010104, GNUTLS ),
_ERROR_CODE( ERR_GNUTLS_CRED_SET, 0x010105, GNUTLS ),
_ERROR_CODE( ERR_GNUTLS_HANDSHAKE, 0x010106, GNUTLS ),
+ _ERROR_CODE( ERR_GNUTLS_RECORD_SEND, 0x010107, GNUTLS ),
+ _ERROR_CODE( ERR_GNUTLS_RECORD_RECV, 0x010108, GNUTLS ),
+
+ /* Libevent errors */
+ _ERROR_CODE( ERR_EVENT_NEW, 0x010201, NONE ),
// mask of bits used for the error_code value
_ERROR_CODE_MASK = 0xffffff,
@@ -106,6 +112,9 @@
/* Compare error_info.code != 0 */
#define IS_ERROR(err_info_ptr) (!!(err_info_ptr)->code)
+/* Compare the err_code/err_extra for an err_info */
+#define MATCH_ERROR(err_info_ptr, err_code, err_extra) ((err_info_ptr)->code == (err_code) && (err_info_ptr)->extra == (err_extra))
+
/* Set error_info.code, but leave err_extra as-is. Evaluates to err_code */
#define SET_ERROR(err_info_ptr, err_code) ((err_info_ptr)->code = (err_code))
--- a/src/line_proto.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.c Sun Feb 22 10:16:28 2009 +0200
@@ -5,6 +5,8 @@
#include <stdlib.h>
#include <assert.h>
+#include <err.h>
+
/*
* Our state
*/
@@ -12,6 +14,12 @@
/* The sock_stream we read/write with */
struct sock_stream *sock;
+ /* The incoming line buffer */
+ char *buf;
+
+ /* Incoming buffer size */
+ size_t buf_len;
+
/* Offset of trailing data in buf */
size_t tail_offset;
@@ -20,18 +28,82 @@
/* Last error */
struct error_info err;
+
+ /* Callback info */
+ line_proto_read_cb cb_read;
+ void *cb_arg;
};
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err)
+// function prototypes
+static err_t line_proto_schedule_events (struct line_proto *lp, short what);
+
+/*
+ * Our sock_stream on_read handler
+ */
+static void line_proto_on_read (struct sock_stream *sock, void *arg)
+{
+ struct line_proto *lp = arg;
+ const char *line;
+
+ // sanity-check
+ assert(lp->tail_offset < lp->buf_len);
+
+ do {
+ // attempt to read a line
+ if (line_proto_read(lp, &line))
+ // XXX: fail
+ errx(1, "line_proto_read: %s", error_msg(&lp->err));
+
+ // got a line?
+ if (line)
+ lp->cb_read(lp, line, lp->cb_arg);
+
+ } while (line);
+
+ // reschedule
+ if (line_proto_schedule_events(lp, EV_READ))
+ errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
+}
+
+/*
+ * Schedule our sock_stream callback
+ */
+static err_t line_proto_schedule_events (struct line_proto *lp, short what)
+{
+ // just use sock_stream's interface
+ if (sock_stream_event_enable(lp->sock, what))
+ RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+ return SUCCESS;
+}
+
+struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+ .on_read = &line_proto_on_read,
+};
+
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+ line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
{
struct line_proto *lp;
// allocate
if ((lp = calloc(1, sizeof(*lp))) == NULL)
return SET_ERROR(err, ERR_CALLOC);
+
+ // allocate buffer
+ if ((lp->buf = malloc(buf_size)) == NULL) {
+ free(lp);
+ return SET_ERROR(err, ERR_CALLOC);
+ }
// store
lp->sock = sock;
+ lp->buf_len = buf_size;
+ lp->cb_read = cb_func;
+ lp->cb_arg = cb_arg;
+
+ // initialize event-based stuff
+ sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp);
// return
*lp_ptr = lp;
@@ -75,7 +147,7 @@
return i + 2;
}
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len)
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr)
{
// offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
@@ -87,7 +159,7 @@
// move trailing data from previous line to front of buffer
if (lp->tail_offset) {
// move to front
- memmove(buf, buf + lp->tail_offset, lp->tail_len);
+ memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
// reset
lp->tail_offset = 0;
@@ -97,15 +169,32 @@
// readline loop
do {
// parse any line at the beginning of the buffer
- if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0)
+ if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
+ // store a valid *line_ptr
+ *line_ptr = lp->buf;
+
+ // exit loop and return
break;
+ }
- // ensure there's enough space for it
- assert(recv_offset < len);
+ // ensure there's enough space for the rest of the line
+ // XXX: this must be an error, not an assert
+ assert(recv_offset < lp->buf_len);
// otherwise, read more data
- if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0)
- RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+ if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
+ // we can special-case EAGAIN, as it's expected
+ if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
+ // return a NULL *line_ptr
+ *line_ptr = NULL;
+ break;
+
+ } else {
+ // store and return NULL on errors
+ RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+ }
+ }
// EOF?
if (ret == 0)
--- a/src/line_proto.h Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.h Sun Feb 22 10:16:28 2009 +0200
@@ -13,9 +13,19 @@
struct line_proto;
/*
+ * The callback for receiving lines
+ */
+typedef void (*line_proto_read_cb)(struct line_proto *lp, const char *line, void *arg);
+
+/*
* Create a new line_proto off the the given sock_stream. The newly allocated line_proto will be returned via *lp_ptr.
+ *
+ * The incoming lines are buffered in a buffer of \a buf_size bytes. This imposes a maximum limit on the line length.
+ *
+ * The given callback function/argument will be used to provide event-based recv support.
*/
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err);
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+ line_proto_read_cb cb_func, void *cb_arg, struct error_info *err);
/*
* Receive one line into the given buffer. The line will be terminated with '\r\n', and said terminator will be
@@ -23,7 +33,7 @@
*
* Note: currently this uses the buffer to store intermediate state, so always pass the same buffer (for now).
*/
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len);
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr);
/*
* Get current error_info*
--- a/src/nexus.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/nexus.c Sun Feb 22 10:16:28 2009 +0200
@@ -20,7 +20,7 @@
struct event_base *ev_base;
struct sock_stream *sock;
struct line_proto *lp;
- char line_buf[LINE_LENGTH + 1];
+ char *line;
struct error_info _err;
// initialize libevent
@@ -36,17 +36,17 @@
errx(1, "sock_gnutls_connect: %s", error_msg(&_err));
// line protocol
- if (line_proto_create(&lp, sock, &_err))
+ if (line_proto_create(&lp, sock, LINE_LENGTH, NULL, NULL, &_err))
errx(1, "line_proto_create: %s", error_msg(&_err));
// read lines and dump them out
do {
// recv
- if (line_proto_read(lp, line_buf, sizeof(line_buf)))
+ if (line_proto_read(lp, &line))
errx(1, "line_proto_read: %s", error_msg(line_proto_error(lp)));
// printf
- printf("<<< %s\n", line_buf);
+ printf("<<< %s\n", line);
} while (1);
--- a/src/sock.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock.c Sun Feb 22 10:16:28 2009 +0200
@@ -29,23 +29,44 @@
sock->type = type;
}
-void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg)
+int sock_stream_read (struct sock_stream *sock, void *buf, size_t len)
+{
+ err_t err;
+
+ // proxy off to method handler
+ if ((err = sock->type->methods.read(sock, buf, &len)))
+ return err;
+
+ // return updated bytes-read len
+ return len;
+}
+
+int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len)
+{
+ err_t err;
+
+ // proxy off to method handler
+ if ((err = sock->type->methods.write(sock, buf, &len)))
+ return err;
+
+ // return updated bytes-written len
+ return len;
+}
+
+err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg)
{
// store
sock->cb_info = callbacks;
sock->cb_arg = arg;
+
+ // run method
+ return sock->type->methods.event_init(sock);
}
-err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len)
+err_t sock_stream_event_enable (struct sock_stream *sock, short mask)
{
- // proxy off to method handler
- return sock->type->methods.read(sock, buf, len);
-}
-
-err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len)
-{
- // proxy off to method handler
- return sock->type->methods.write(sock, buf, len);
+ // run method
+ return sock->type->methods.event_enable(sock, mask);
}
const struct error_info* sock_stream_error (struct sock_stream *sock)
--- a/src/sock.h Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock.h Sun Feb 22 10:16:28 2009 +0200
@@ -51,15 +51,26 @@
err_t sock_gnutls_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err);
/*
- * Set the callbacks for a sock_stream. Note that the callbacks struct isn't copied - it's used as-is-given.
+ * The generic read/write API for stream sockets.
*/
-void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg);
+int sock_stream_read (struct sock_stream *sock, void *buf, size_t len);
+int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len);
/*
- * The generic read/write API for stream sockets.
+ * Initialize event-based operation for this sock_stream. This will set the stream into nonblocking mode, and the given
+ * callbacks will be fired once enabled using sock_stream_event_enable().
+ *
+ * Note that the callbacks struct isn't copied - it's used as-is-given.
*/
-err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len);
-err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len);
+err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg);
+
+/*
+ * Prime the callbacks set up earlier with sock_stream_event_init to fire once ready.
+ *
+ * \a mask is a bitmask of EV_* bits, such as EV_READ, EV_WRITE or EV_PERSIST. See event_set() for more info about the
+ * behaviour of those.
+ */
+err_t sock_stream_event_enable (struct sock_stream *sock, short mask);
/**
* Get current error_info for \a sock.
--- a/src/sock_gnutls.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_gnutls.c Sun Feb 22 10:16:28 2009 +0200
@@ -4,28 +4,59 @@
#include <stdlib.h>
#include <err.h>
-static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t len)
+// XXX: errors
+static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t *len)
+{
+ struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+ int ret;
+
+ // just map to gnutls_record_recv
+ if ((ret = gnutls_record_recv(sock->session, buf, *len)) < 0)
+ RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_RECV);
+
+ // updated length
+ *len = ret;
+
+ return SUCCESS;
+}
+
+static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t *len)
+{
+ struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
+ int ret;
+
+ // just map to gnutls_record_send
+ if ((ret = gnutls_record_send(sock->session, buf, *len)) < 0)
+ RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_SEND);
+
+ // updated length
+ *len = ret;
+
+ return SUCCESS;
+}
+
+static err_t sock_gnutls_event_init (struct sock_stream *base_sock)
{
struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
- // just map to gnutls_record_recv
- return gnutls_record_recv(sock->session, buf, len);
+ return SUCCESS;
}
-static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t len)
+static err_t sock_gnutls_event_enable (struct sock_stream *base_sock, short mask)
{
struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls);
- // just map to gnutls_record_send
- return gnutls_record_send(sock->session, buf, len);
+ return SUCCESS;
}
/*
* Our sock_stream_Type
*/
struct sock_stream_type sock_gnutls_type = {
- .methods.read = &sock_gnutls_read,
- .methods.write = &sock_gnutls_write,
+ .methods.read = &sock_gnutls_read,
+ .methods.write = &sock_gnutls_write,
+ .methods.event_init = &sock_gnutls_event_init,
+ .methods.event_enable = &sock_gnutls_event_enable,
};
/*
--- a/src/sock_internal.h Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_internal.h Sun Feb 22 10:16:28 2009 +0200
@@ -19,10 +19,16 @@
/* method table */
struct sock_stream_methods {
/* Normal read(2) */
- err_t (*read) (struct sock_stream *sock, void *buf, size_t len);
+ err_t (*read) (struct sock_stream *sock, void *buf, size_t *len);
/* Normal write(2) */
- err_t (*write) (struct sock_stream *sock, const void *buf, size_t len);
+ err_t (*write) (struct sock_stream *sock, const void *buf, size_t *len);
+
+ /* Initialize events. cb_info/cb_arg are already updated */
+ err_t (*event_init) (struct sock_stream *sock);
+
+ /* Enable events as specified */
+ err_t (*event_enable) (struct sock_stream *sock, short mask);
} methods;
};
--- a/src/sock_tcp.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_tcp.c Sun Feb 22 10:16:28 2009 +0200
@@ -6,53 +6,87 @@
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
+#include <fcntl.h>
#include <string.h>
#include <assert.h>
/*
* Our sock_stream_methods.read method
*/
-static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t len)
+static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t *len)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
int ret;
// map directly to read(2)
- if ((ret = read(sock->fd, buf, len)) < 0)
+ if ((ret = read(sock->fd, buf, *len)) < 0)
// errno
RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ);
- else
- // bytes read
- return ret;
+ // bytes read
+ *len = ret;
+
+ return SUCCESS;
}
/*
* Our sock_stream_methods.write method
*/
-static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t len)
+static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t *len)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
int ret;
// map directly to write(2)
- if ((ret = write(sock->fd, buf, len)) < 0)
+ if ((ret = write(sock->fd, buf, *len)) < 0)
// errno
RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE);
- else
- // bytes read
- return ret;
+ // bytes read
+ *len = ret;
+
+ return SUCCESS;
+}
+
+static err_t sock_tcp_event_init (struct sock_stream *base_sock)
+{
+ struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
+
+ return SUCCESS;
+}
+
+static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask)
+{
+ struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
+
+ return SUCCESS;
}
/*
* Our sock_stream_type
*/
struct sock_stream_type sock_tcp_type = {
- .methods.read = &sock_tcp_read,
- .methods.write = &sock_tcp_write,
+ .methods.read = &sock_tcp_read,
+ .methods.write = &sock_tcp_write,
+ .methods.event_init = &sock_tcp_event_init,
+ .methods.event_enable = &sock_tcp_event_enable,
};
+/*
+ * Our basic socket event handler for driving our callbacks
+ */
+static void sock_tcp_event (evutil_socket_t fd, short what, void *arg)
+{
+ struct sock_tcp *sock = arg;
+
+ // invoke appropriate callback
+ if (what & EV_READ && SOCK_TCP_BASE(sock)->cb_info->on_read)
+ SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
+
+ if (what & EV_WRITE && SOCK_TCP_BASE(sock)->cb_info->on_write)
+ SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg);
+}
+
err_t sock_tcp_alloc (struct sock_tcp **sock_ptr)
{
// alloc
@@ -78,6 +112,19 @@
return SUCCESS;
}
+err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb)(evutil_socket_t, short, void *), void *cb_arg)
+{
+ // require valid fd
+ assert(sock->fd >= 0);
+
+ // create new event
+ if ((sock->ev = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_READ, ev_cb, cb_arg)) == NULL)
+ return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);
+
+ // ok
+ return SUCCESS;
+}
+
err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service)
{
struct addrinfo hints, *res, *r;
@@ -137,6 +184,26 @@
return 0;
}
+err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock)
+{
+ // fcntl it
+ // XXX: maintain old flags?
+ if (fcntl(sock->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
+ RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_FCNTL);
+
+ // ok
+ return SUCCESS;
+}
+
+void sock_tcp_release (struct sock_tcp *sock)
+{
+ // must not be connected
+ assert(sock->fd < 0);
+
+ // free
+ free(sock);
+}
+
err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err_info)
{
struct sock_tcp *sock;
@@ -164,11 +231,4 @@
return 0;
}
-void sock_tcp_release (struct sock_tcp *sock)
-{
- // must not be connected
- assert(sock->fd < 0);
- // free
- free(sock);
-}
--- a/src/sock_tcp.h Sun Feb 22 08:48:21 2009 +0200
+++ b/src/sock_tcp.h Sun Feb 22 10:16:28 2009 +0200
@@ -15,6 +15,9 @@
/* The OS file descriptor */
int fd;
+
+ /* The libevent struct */
+ struct event *ev;
};
#define SOCK_TCP_BASE(sock_ptr) (&(sock_ptr)->base)
@@ -31,11 +34,22 @@
err_t sock_tcp_init_fd (struct sock_tcp *sock, int fd);
/*
+ * Initialize sock_tcp.ev to use the socket's fd with the given callback. By default, this is created with EV_READ
+ * flags, but is not added.
+ */
+err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb) (evutil_socket_t, short, void *), void *arg);
+
+/*
* Initialize a blank sock_tcp by connecting
*/
err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service);
/*
+ * Set the socket's nonblock mode
+ */
+err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock);
+
+/*
* Release a non-connected sock_tcp
*/
void sock_tcp_release (struct sock_tcp *sock);