# HG changeset patch # User Tero Marttila # Date 1235290588 -7200 # Node ID 9fe218576d13ea665f24fa60520d04a51d50dc33 # Parent 4c4c906cc64920f37b2ce88c237a21f7ede64585 fix sock_stream read/write return value, move line buffer inside of line_proto, add some initial code for event-based non-blocking operation diff -r 4c4c906cc649 -r 9fe218576d13 src/error.c --- a/src/error.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/error.c Sun Feb 22 10:16:28 2009 +0200 @@ -23,6 +23,7 @@ ERROR_NAME( ERR_READ, "read" ); ERROR_NAME( ERR_READ_EOF, "read: EOF" ); ERROR_NAME( ERR_WRITE, "write" ); + ERROR_NAME( ERR_FCNTL, "fcntl" ); ERROR_NAME( ERR_GNUTLS_CERT_ALLOC_CRED, "gnutls_certificate_allocate_credentials" ); ERROR_NAME( ERR_GNUTLS_GLOBAL_INIT, "gnutls_global_init" ); ERROR_NAME( ERR_GNUTLS_INIT, "gnutls_init" ); diff -r 4c4c906cc649 -r 9fe218576d13 src/error.h --- a/src/error.h Sun Feb 22 08:48:21 2009 +0200 +++ b/src/error.h Sun Feb 22 10:16:28 2009 +0200 @@ -50,6 +50,7 @@ _ERROR_CODE( ERR_READ, 0x000401, ERRNO ), _ERROR_CODE( ERR_READ_EOF, 0x000402, NONE ), _ERROR_CODE( ERR_WRITE, 0x000403, ERRNO ), + _ERROR_CODE( ERR_FCNTL, 0x000404, ERRNO ), /* GnuTLS errors */ _ERROR_CODE( ERR_GNUTLS_CERT_ALLOC_CRED, 0x010101, GNUTLS ), @@ -58,6 +59,11 @@ _ERROR_CODE( ERR_GNUTLS_SET_DEFAULT_PRIORITY, 0x010104, GNUTLS ), _ERROR_CODE( ERR_GNUTLS_CRED_SET, 0x010105, GNUTLS ), _ERROR_CODE( ERR_GNUTLS_HANDSHAKE, 0x010106, GNUTLS ), + _ERROR_CODE( ERR_GNUTLS_RECORD_SEND, 0x010107, GNUTLS ), + _ERROR_CODE( ERR_GNUTLS_RECORD_RECV, 0x010108, GNUTLS ), + + /* Libevent errors */ + _ERROR_CODE( ERR_EVENT_NEW, 0x010201, NONE ), // mask of bits used for the error_code value _ERROR_CODE_MASK = 0xffffff, @@ -106,6 +112,9 @@ /* Compare error_info.code != 0 */ #define IS_ERROR(err_info_ptr) (!!(err_info_ptr)->code) +/* Compare the err_code/err_extra for an err_info */ +#define MATCH_ERROR(err_info_ptr, err_code, err_extra) ((err_info_ptr)->code == (err_code) && (err_info_ptr)->extra == (err_extra)) + /* Set error_info.code, but leave err_extra as-is. Evaluates to err_code */ #define SET_ERROR(err_info_ptr, err_code) ((err_info_ptr)->code = (err_code)) diff -r 4c4c906cc649 -r 9fe218576d13 src/line_proto.c --- a/src/line_proto.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/line_proto.c Sun Feb 22 10:16:28 2009 +0200 @@ -5,6 +5,8 @@ #include #include +#include + /* * Our state */ @@ -12,6 +14,12 @@ /* The sock_stream we read/write with */ struct sock_stream *sock; + /* The incoming line buffer */ + char *buf; + + /* Incoming buffer size */ + size_t buf_len; + /* Offset of trailing data in buf */ size_t tail_offset; @@ -20,18 +28,82 @@ /* Last error */ struct error_info err; + + /* Callback info */ + line_proto_read_cb cb_read; + void *cb_arg; }; -err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err) +// function prototypes +static err_t line_proto_schedule_events (struct line_proto *lp, short what); + +/* + * Our sock_stream on_read handler + */ +static void line_proto_on_read (struct sock_stream *sock, void *arg) +{ + struct line_proto *lp = arg; + const char *line; + + // sanity-check + assert(lp->tail_offset < lp->buf_len); + + do { + // attempt to read a line + if (line_proto_read(lp, &line)) + // XXX: fail + errx(1, "line_proto_read: %s", error_msg(&lp->err)); + + // got a line? + if (line) + lp->cb_read(lp, line, lp->cb_arg); + + } while (line); + + // reschedule + if (line_proto_schedule_events(lp, EV_READ)) + errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err)); +} + +/* + * Schedule our sock_stream callback + */ +static err_t line_proto_schedule_events (struct line_proto *lp, short what) +{ + // just use sock_stream's interface + if (sock_stream_event_enable(lp->sock, what)) + RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); + + return SUCCESS; +} + +struct sock_stream_callbacks line_proto_sock_stream_callbacks = { + .on_read = &line_proto_on_read, +}; + +err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size, + line_proto_read_cb cb_func, void *cb_arg, struct error_info *err) { struct line_proto *lp; // allocate if ((lp = calloc(1, sizeof(*lp))) == NULL) return SET_ERROR(err, ERR_CALLOC); + + // allocate buffer + if ((lp->buf = malloc(buf_size)) == NULL) { + free(lp); + return SET_ERROR(err, ERR_CALLOC); + } // store lp->sock = sock; + lp->buf_len = buf_size; + lp->cb_read = cb_func; + lp->cb_arg = cb_arg; + + // initialize event-based stuff + sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp); // return *lp_ptr = lp; @@ -75,7 +147,7 @@ return i + 2; } -err_t line_proto_read (struct line_proto *lp, char *buf, size_t len) +err_t line_proto_read (struct line_proto *lp, const char **line_ptr) { // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line size_t recv_offset = 0, peek_offset = 0, next_offset = 0; @@ -87,7 +159,7 @@ // move trailing data from previous line to front of buffer if (lp->tail_offset) { // move to front - memmove(buf, buf + lp->tail_offset, lp->tail_len); + memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len); // reset lp->tail_offset = 0; @@ -97,15 +169,32 @@ // readline loop do { // parse any line at the beginning of the buffer - if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0) + if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) { + // store a valid *line_ptr + *line_ptr = lp->buf; + + // exit loop and return break; + } - // ensure there's enough space for it - assert(recv_offset < len); + // ensure there's enough space for the rest of the line + // XXX: this must be an error, not an assert + assert(recv_offset < lp->buf_len); // otherwise, read more data - if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0) - RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); + if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) { + // we can special-case EAGAIN, as it's expected + if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) { + // return a NULL *line_ptr + *line_ptr = NULL; + break; + + } else { + // store and return NULL on errors + RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); + + } + } // EOF? if (ret == 0) diff -r 4c4c906cc649 -r 9fe218576d13 src/line_proto.h --- a/src/line_proto.h Sun Feb 22 08:48:21 2009 +0200 +++ b/src/line_proto.h Sun Feb 22 10:16:28 2009 +0200 @@ -13,9 +13,19 @@ struct line_proto; /* + * The callback for receiving lines + */ +typedef void (*line_proto_read_cb)(struct line_proto *lp, const char *line, void *arg); + +/* * Create a new line_proto off the the given sock_stream. The newly allocated line_proto will be returned via *lp_ptr. + * + * The incoming lines are buffered in a buffer of \a buf_size bytes. This imposes a maximum limit on the line length. + * + * The given callback function/argument will be used to provide event-based recv support. */ -err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err); +err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size, + line_proto_read_cb cb_func, void *cb_arg, struct error_info *err); /* * Receive one line into the given buffer. The line will be terminated with '\r\n', and said terminator will be @@ -23,7 +33,7 @@ * * Note: currently this uses the buffer to store intermediate state, so always pass the same buffer (for now). */ -err_t line_proto_read (struct line_proto *lp, char *buf, size_t len); +err_t line_proto_read (struct line_proto *lp, const char **line_ptr); /* * Get current error_info* diff -r 4c4c906cc649 -r 9fe218576d13 src/nexus.c --- a/src/nexus.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/nexus.c Sun Feb 22 10:16:28 2009 +0200 @@ -20,7 +20,7 @@ struct event_base *ev_base; struct sock_stream *sock; struct line_proto *lp; - char line_buf[LINE_LENGTH + 1]; + char *line; struct error_info _err; // initialize libevent @@ -36,17 +36,17 @@ errx(1, "sock_gnutls_connect: %s", error_msg(&_err)); // line protocol - if (line_proto_create(&lp, sock, &_err)) + if (line_proto_create(&lp, sock, LINE_LENGTH, NULL, NULL, &_err)) errx(1, "line_proto_create: %s", error_msg(&_err)); // read lines and dump them out do { // recv - if (line_proto_read(lp, line_buf, sizeof(line_buf))) + if (line_proto_read(lp, &line)) errx(1, "line_proto_read: %s", error_msg(line_proto_error(lp))); // printf - printf("<<< %s\n", line_buf); + printf("<<< %s\n", line); } while (1); diff -r 4c4c906cc649 -r 9fe218576d13 src/sock.c --- a/src/sock.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock.c Sun Feb 22 10:16:28 2009 +0200 @@ -29,23 +29,44 @@ sock->type = type; } -void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg) +int sock_stream_read (struct sock_stream *sock, void *buf, size_t len) +{ + err_t err; + + // proxy off to method handler + if ((err = sock->type->methods.read(sock, buf, &len))) + return err; + + // return updated bytes-read len + return len; +} + +int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len) +{ + err_t err; + + // proxy off to method handler + if ((err = sock->type->methods.write(sock, buf, &len))) + return err; + + // return updated bytes-written len + return len; +} + +err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg) { // store sock->cb_info = callbacks; sock->cb_arg = arg; + + // run method + return sock->type->methods.event_init(sock); } -err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len) +err_t sock_stream_event_enable (struct sock_stream *sock, short mask) { - // proxy off to method handler - return sock->type->methods.read(sock, buf, len); -} - -err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len) -{ - // proxy off to method handler - return sock->type->methods.write(sock, buf, len); + // run method + return sock->type->methods.event_enable(sock, mask); } const struct error_info* sock_stream_error (struct sock_stream *sock) diff -r 4c4c906cc649 -r 9fe218576d13 src/sock.h --- a/src/sock.h Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock.h Sun Feb 22 10:16:28 2009 +0200 @@ -51,15 +51,26 @@ err_t sock_gnutls_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err); /* - * Set the callbacks for a sock_stream. Note that the callbacks struct isn't copied - it's used as-is-given. + * The generic read/write API for stream sockets. */ -void sock_stream_set_callbacks (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg); +int sock_stream_read (struct sock_stream *sock, void *buf, size_t len); +int sock_stream_write (struct sock_stream *sock, const void *buf, size_t len); /* - * The generic read/write API for stream sockets. + * Initialize event-based operation for this sock_stream. This will set the stream into nonblocking mode, and the given + * callbacks will be fired once enabled using sock_stream_event_enable(). + * + * Note that the callbacks struct isn't copied - it's used as-is-given. */ -err_t sock_stream_read (struct sock_stream *sock, void *buf, size_t len); -err_t sock_stream_write (struct sock_stream *sock, const void *buf, size_t len); +err_t sock_stream_event_init (struct sock_stream *sock, const struct sock_stream_callbacks *callbacks, void *arg); + +/* + * Prime the callbacks set up earlier with sock_stream_event_init to fire once ready. + * + * \a mask is a bitmask of EV_* bits, such as EV_READ, EV_WRITE or EV_PERSIST. See event_set() for more info about the + * behaviour of those. + */ +err_t sock_stream_event_enable (struct sock_stream *sock, short mask); /** * Get current error_info for \a sock. diff -r 4c4c906cc649 -r 9fe218576d13 src/sock_gnutls.c --- a/src/sock_gnutls.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock_gnutls.c Sun Feb 22 10:16:28 2009 +0200 @@ -4,28 +4,59 @@ #include #include -static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t len) +// XXX: errors +static err_t sock_gnutls_read (struct sock_stream *base_sock, void *buf, size_t *len) +{ + struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls); + int ret; + + // just map to gnutls_record_recv + if ((ret = gnutls_record_recv(sock->session, buf, *len)) < 0) + RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_RECV); + + // updated length + *len = ret; + + return SUCCESS; +} + +static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t *len) +{ + struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls); + int ret; + + // just map to gnutls_record_send + if ((ret = gnutls_record_send(sock->session, buf, *len)) < 0) + RETURN_SET_ERROR_ERRNO(SOCK_GNUTLS_ERR(sock), ERR_GNUTLS_RECORD_SEND); + + // updated length + *len = ret; + + return SUCCESS; +} + +static err_t sock_gnutls_event_init (struct sock_stream *base_sock) { struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls); - // just map to gnutls_record_recv - return gnutls_record_recv(sock->session, buf, len); + return SUCCESS; } -static err_t sock_gnutls_write (struct sock_stream *base_sock, const void *buf, size_t len) +static err_t sock_gnutls_event_enable (struct sock_stream *base_sock, short mask) { struct sock_gnutls *sock = SOCK_FROM_BASE(base_sock, struct sock_gnutls); - // just map to gnutls_record_send - return gnutls_record_send(sock->session, buf, len); + return SUCCESS; } /* * Our sock_stream_Type */ struct sock_stream_type sock_gnutls_type = { - .methods.read = &sock_gnutls_read, - .methods.write = &sock_gnutls_write, + .methods.read = &sock_gnutls_read, + .methods.write = &sock_gnutls_write, + .methods.event_init = &sock_gnutls_event_init, + .methods.event_enable = &sock_gnutls_event_enable, }; /* diff -r 4c4c906cc649 -r 9fe218576d13 src/sock_internal.h --- a/src/sock_internal.h Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock_internal.h Sun Feb 22 10:16:28 2009 +0200 @@ -19,10 +19,16 @@ /* method table */ struct sock_stream_methods { /* Normal read(2) */ - err_t (*read) (struct sock_stream *sock, void *buf, size_t len); + err_t (*read) (struct sock_stream *sock, void *buf, size_t *len); /* Normal write(2) */ - err_t (*write) (struct sock_stream *sock, const void *buf, size_t len); + err_t (*write) (struct sock_stream *sock, const void *buf, size_t *len); + + /* Initialize events. cb_info/cb_arg are already updated */ + err_t (*event_init) (struct sock_stream *sock); + + /* Enable events as specified */ + err_t (*event_enable) (struct sock_stream *sock, short mask); } methods; }; diff -r 4c4c906cc649 -r 9fe218576d13 src/sock_tcp.c --- a/src/sock_tcp.c Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock_tcp.c Sun Feb 22 10:16:28 2009 +0200 @@ -6,53 +6,87 @@ #include #include #include +#include #include #include /* * Our sock_stream_methods.read method */ -static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t len) +static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t *len) { struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp); int ret; // map directly to read(2) - if ((ret = read(sock->fd, buf, len)) < 0) + if ((ret = read(sock->fd, buf, *len)) < 0) // errno RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ); - else - // bytes read - return ret; + // bytes read + *len = ret; + + return SUCCESS; } /* * Our sock_stream_methods.write method */ -static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t len) +static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t *len) { struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp); int ret; // map directly to write(2) - if ((ret = write(sock->fd, buf, len)) < 0) + if ((ret = write(sock->fd, buf, *len)) < 0) // errno RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE); - else - // bytes read - return ret; + // bytes read + *len = ret; + + return SUCCESS; +} + +static err_t sock_tcp_event_init (struct sock_stream *base_sock) +{ + struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp); + + return SUCCESS; +} + +static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask) +{ + struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp); + + return SUCCESS; } /* * Our sock_stream_type */ struct sock_stream_type sock_tcp_type = { - .methods.read = &sock_tcp_read, - .methods.write = &sock_tcp_write, + .methods.read = &sock_tcp_read, + .methods.write = &sock_tcp_write, + .methods.event_init = &sock_tcp_event_init, + .methods.event_enable = &sock_tcp_event_enable, }; +/* + * Our basic socket event handler for driving our callbacks + */ +static void sock_tcp_event (evutil_socket_t fd, short what, void *arg) +{ + struct sock_tcp *sock = arg; + + // invoke appropriate callback + if (what & EV_READ && SOCK_TCP_BASE(sock)->cb_info->on_read) + SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg); + + if (what & EV_WRITE && SOCK_TCP_BASE(sock)->cb_info->on_write) + SOCK_TCP_BASE(sock)->cb_info->on_read(SOCK_TCP_BASE(sock), SOCK_TCP_BASE(sock)->cb_arg); +} + err_t sock_tcp_alloc (struct sock_tcp **sock_ptr) { // alloc @@ -78,6 +112,19 @@ return SUCCESS; } +err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb)(evutil_socket_t, short, void *), void *cb_arg) +{ + // require valid fd + assert(sock->fd >= 0); + + // create new event + if ((sock->ev = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_READ, ev_cb, cb_arg)) == NULL) + return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW); + + // ok + return SUCCESS; +} + err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service) { struct addrinfo hints, *res, *r; @@ -137,6 +184,26 @@ return 0; } +err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock) +{ + // fcntl it + // XXX: maintain old flags? + if (fcntl(sock->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0) + RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_FCNTL); + + // ok + return SUCCESS; +} + +void sock_tcp_release (struct sock_tcp *sock) +{ + // must not be connected + assert(sock->fd < 0); + + // free + free(sock); +} + err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err_info) { struct sock_tcp *sock; @@ -164,11 +231,4 @@ return 0; } -void sock_tcp_release (struct sock_tcp *sock) -{ - // must not be connected - assert(sock->fd < 0); - // free - free(sock); -} diff -r 4c4c906cc649 -r 9fe218576d13 src/sock_tcp.h --- a/src/sock_tcp.h Sun Feb 22 08:48:21 2009 +0200 +++ b/src/sock_tcp.h Sun Feb 22 10:16:28 2009 +0200 @@ -15,6 +15,9 @@ /* The OS file descriptor */ int fd; + + /* The libevent struct */ + struct event *ev; }; #define SOCK_TCP_BASE(sock_ptr) (&(sock_ptr)->base) @@ -31,11 +34,22 @@ err_t sock_tcp_init_fd (struct sock_tcp *sock, int fd); /* + * Initialize sock_tcp.ev to use the socket's fd with the given callback. By default, this is created with EV_READ + * flags, but is not added. + */ +err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb) (evutil_socket_t, short, void *), void *arg); + +/* * Initialize a blank sock_tcp by connecting */ err_t sock_tcp_init_connect (struct sock_tcp *sock, const char *hostname, const char *service); /* + * Set the socket's nonblock mode + */ +err_t sock_tcp_set_nonblock (struct sock_tcp *sock, int nonblock); + +/* * Release a non-connected sock_tcp */ void sock_tcp_release (struct sock_tcp *sock);