# HG changeset patch # User Tero Marttila # Date 1219872579 -10800 # Node ID e5b714190dee5333ac5cb56b3e65e704e6b6cf28 # Parent 0e503189af2feca8db5932cc0be2d1872f5fae9c the request/reply code should be complete now, but still needs testing diff -r 0e503189af2f -r e5b714190dee memcache.h --- a/memcache.h Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache.h Thu Aug 28 00:29:39 2008 +0300 @@ -87,6 +87,10 @@ MEMCACHE_STATE_QUEUED, MEMCACHE_STATE_SEND, MEMCACHE_STATE_REPLY, + MEMCACHE_STATE_REPLY_DATA, + + MEMCACHE_STATE_DONE, + MEMCACHE_STATE_DATA_DONE, MEMCACHE_STATE_ERROR, }; diff -r 0e503189af2f -r e5b714190dee memcache/command.c --- a/memcache/command.c Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/command.c Thu Aug 28 00:29:39 2008 +0300 @@ -92,7 +92,7 @@ case MEMCACHE_CMD_STORE_PREPEND: assert(key != NULL && obj != NULL); assert(key->len > 0 && key->buf != NULL); - assert(obj->bytes > 0); + // XXX: ensure that we have a valid buf if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes)) ERROR("evbuffer_add_printf"); @@ -117,32 +117,33 @@ size_t line_length, buf_size; char *line = NULL, *token_cursor, *token, *invalid; int i; - - // take note of how long the buffer is - buf_size = evbuffer_get_length(buf); + + // read lines until we find one that isn't empty + do { + // take note of how long the buffer is + buf_size = evbuffer_get_length(buf); - // first, try and read a full line - if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) { - // check if any data was consumed - if (evbuffer_get_length(buf) != buf_size) { - // faaaail! - return -1; + // free the prvious line in case it was empty + free(line); - } else { - // no complete line found - return 0; + // try and read a line + if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) { + // check if any data was consumed + if (evbuffer_get_length(buf) != buf_size) { + // faaaail! + return -1; + + } else { + // no complete line found + return 0; + } } - } + + } while (line_length == 0); // just check to make sure that it really is null-delimited assert(line[line_length - 1] == '\0'); - // empty lines? - if (line_length == 0) { - PWARNING("empty reply line !?!"); - return 0; - } - // use strsep token_cursor = line; diff -r 0e503189af2f -r e5b714190dee memcache/command.h --- a/memcache/command.h Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/command.h Thu Aug 28 00:29:39 2008 +0300 @@ -13,6 +13,17 @@ int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj); int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj); + +/* + * Attempt to parse a response line from the given buf. *header_data will be set to NULL if no complete response line + * was found, or a pointer to the line received (which may contain NULs between tokens), which must be freed by the + * caller. *reply_type will be set to the parsed `enum memcache_reply` type. If the reply is of type + * MEMCACHE_RPL_VALUE, then key and obj will be updated to correspond to what was returned (key.buf will point into + * *header_data). has_data will be set to true if the response also contains cache data. + * + * Empty lines will be skipped based on the assumption that they are, in fact, the newline between the data and the + * MEMCACHE_RPL_END line of a MEMCACHE_RPL_VALUE reply. + */ int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data); #endif /* MEMCACHE_COMMAND_H */ diff -r 0e503189af2f -r e5b714190dee memcache/command.o Binary file memcache/command.o has changed diff -r 0e503189af2f -r e5b714190dee memcache/connection.c --- a/memcache/connection.c Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/connection.c Thu Aug 28 00:29:39 2008 +0300 @@ -16,7 +16,10 @@ static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); +static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg); +static void memcache_conn_error (struct memcache_conn *conn); +static void memcache_conn_req_error (struct memcache_conn *conn); static void memcache_conn_req_done (struct memcache_conn *conn); struct memcache_conn *memcache_conn_open (struct memcache_server *server) { @@ -52,10 +55,10 @@ assert(conn->fd > 0); // set up the connect event - event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn); + event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn); // add it - if (event_add(&conn->ev, NULL)) + if (event_add(&conn->ev_connect, NULL)) PERROR("event_add"); // success @@ -76,7 +79,7 @@ return (conn->fd > 0 && conn->is_connected && conn->req == NULL); } -int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) { +void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) { assert(conn->fd > 0 && conn->is_connected); assert(conn->req == NULL); @@ -84,27 +87,36 @@ conn->req = req; // write the request header into our bufferevent's output buffer - if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) + if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) { + // just fail the request + memcache_conn_req_error(conn); + ERROR("failed to init the cmd"); + } // tell our bufferevent to send it - if (bufferevent_enable(conn->bev, EV_WRITE)) + if (bufferevent_enable(conn->bev, EV_WRITE)) { + // fail the entire connection + memcache_conn_error(conn); + PERROR("bufferevent_enable"); + } // tell the req that it is underway memcache_req_send(req); - // wait for that to complete - return 0; +error: -error: - return -1; + return; } /* * Start writing out the request data */ void memcache_conn_send_req_data (struct memcache_conn *conn) { + // set up the ev_write + event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn); + // just fake a call to the event handler _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); } @@ -116,24 +128,89 @@ // ensure that we either didn't have a command, or it has been sent assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); - // start reading on the bufferevent + // start/continue reading on the bufferevent if (bufferevent_enable(conn->bev, EV_READ)) PERROR("bufferevent_enable"); + // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback + // will consume all available data iteratively. + // ok, wait for the reply return; error: - // XXX: error handling - assert(0); + memcache_conn_req_error(conn); } /* * Start reading reply data from the connection */ void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) { - // XXX: implement - assert(0); + int ret; + + // check that the buf doesn't contain any data + assert(conn->req->buf.data == NULL); + + // bytes *may* be zero if we have an empty cache entry + if (conn->req->obj.bytes > 0) { + // allocate a buffer for the reply data + if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL) + ERROR("malloc"); + + // update the length + conn->req->buf.len = conn->req->obj.bytes; + + // set offset to zero + conn->req->buf.offset = 0; + + // do we have any data in the buf that we need to copy? + if (evbuffer_get_length(buf) > 0) { + // read the data into the memcache_buf + ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len); + + // sanity check... + assert(ret > 0 && ret <= conn->req->buf.len); + + // update offset + conn->req->buf.offset += ret; + } + + // still need to receive more data? + if (conn->req->buf.offset < conn->req->buf.len) { + + // disable the bufferevent while we read the data + if (bufferevent_disable(conn->bev, EV_READ)) + PERROR("bufferevent_disable"); + + // set up the ev_read + event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn); + + // then receive what data is left to receive + _memcache_conn_ev_read(conn->fd, EV_READ, conn); + + // wait for the data to arrive + return; + + } else { + // the buffer already contained the cache data, no need to read any more + + } + } else { + // there is no data to receive for this item, so we can ignore this + + } + + // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ + // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then + // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called. + // Elegant! + memcache_conn_handle_reply(conn); + + // ok + return; + +error: + memcache_conn_req_error(conn); } /* @@ -168,8 +245,7 @@ return; error: - // notify the server - memcache_server_conn_dead(conn->server, conn); + memcache_conn_error(conn); } /* @@ -205,47 +281,56 @@ // ensure that we do indeed have some data assert(evbuffer_get_length(in_buf) > 0); - - // attempt to parse the response header - if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) - ERROR("memcache_cmd_parse_header"); - if (!header_data) { - // no complete header received yet - return; - } + // consume as much data as possible + do { + // attempt to parse the response header + if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) + ERROR("memcache_cmd_parse_header"); + + if (!header_data) { + // no complete header received yet + return; + } - // disable reads again - if (bufferevent_disable(bev, EV_READ)) - PERROR("bufferevent_disable"); - - // does the reply include data? - if (has_data) { - // check that they key is the same - if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0) + // if the reply contains data, check that they key is the same + if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)) ERROR("got reply with wrong key !?!"); - // start reading the data (including whatever might be left over in the bufferevent buffer...) - // XXX: what if this triggers a req notify before we do? - memcache_conn_handle_reply_data(conn, in_buf); + // notify the request (no reply data is ready for reading yet, though) + memcache_req_reply(conn->req, reply_type); + + // does the reply include data? + if (has_data) { + // start reading the data (including whatever might be left over in the bufferevent buffer...) + memcache_conn_handle_reply_data(conn, in_buf); - } else { - // the request is done with - memcache_conn_req_done(conn); - } + } else { + // the request is done with + memcache_conn_req_done(conn); + } + + // free the header data + free(header_data); - // notify the request - memcache_req_reply(conn->req, reply_type); + } while (evbuffer_get_length(in_buf) > 0); + + // done + return; + +error: + // free the header data read from the buf + free(header_data); -error: - // XXX: error handling - return; + memcache_conn_req_error(conn); } static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { - // XXX: error handling - assert(0); + struct memcache_conn *conn = arg; + + // fail the entire connection + memcache_conn_error(conn); } static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { @@ -265,7 +350,7 @@ // should never be the case... ? if (ret == 0) - ERROR("write returned 0 !?!"); + ERROR("write returned EOF !?!"); // did we manage to write some data? if (ret > 0) { @@ -276,7 +361,7 @@ // data left to write? if (buf->offset < buf->len) { // reschedule - if (event_add(&conn->ev, NULL)) + if (event_add(&conn->ev_write, NULL)) PERROR("event_add"); } else { @@ -288,8 +373,87 @@ return; error: - // XXX: error handling - assert(0); + // fail the entire connection + memcache_conn_error(conn); +} + +static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) { + struct memcache_conn *conn = arg; + struct memcache_buf *buf = &conn->req->buf; + int ret; + + // correct event + assert(event == EV_READ); + + // we do indeed expect to receive data + assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); + + // do the actual read() + if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) + PERROR("read"); + + // should never be the case... ? + if (ret == 0) + ERROR("read returned EOF !?!"); + + // did we manage to read some data? + if (ret > 0) { + // update offset + buf->offset += ret; + } + + // only notify the req if new data was received, and we won't be calling req_done next. + if (ret > 0 && buf->offset < buf->len) { + // notify the req + memcache_req_data(conn->req); + } + + // data left to read? + if (buf->offset < buf->len) { + // reschedule + if (event_add(&conn->ev_read, NULL)) + PERROR("event_add"); + + + } else { + // done! We can let the bufferenvet handle the rest of the reply now + memcache_conn_handle_reply(conn); + } + + // success + return; + +error: + // fail the entire connection + memcache_conn_error(conn); +} + +/* + * The entire connection failed + */ +static void memcache_conn_error (struct memcache_conn *conn) { + // fail the request, if we have one + if (conn->req) + memcache_conn_req_error(conn); + + // tell the server we failed + memcache_server_conn_dead(conn->server, conn); +} + +/* + * Request failed somehow + */ +static void memcache_conn_req_error (struct memcache_conn *conn) { + // ensure that we do currently have a req + assert(conn->req); + + // error out the req + memcache_req_error(conn->req); + assert(conn->req->conn == NULL); + + // we are now available again + conn->req = NULL; + } /* @@ -308,7 +472,8 @@ } void memcache_conn_free (struct memcache_conn *conn) { - // XXX: conn->req? + // ensure we don't have a req bound to us + assert(conn->req == NULL); // ensure that the connection is not considered to be connected anymore assert(!conn->is_connected); @@ -321,9 +486,15 @@ conn->fd = 0; } - // ensure that the event is not pending anymore - assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0); + // ensure that the events are not pending anymore + assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0); + assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0); + assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0); + // free the bufferevent + if (conn->bev) + bufferevent_free(conn->bev); + // free it free(conn); } diff -r 0e503189af2f -r e5b714190dee memcache/connection.h --- a/memcache/connection.h Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/connection.h Thu Aug 28 00:29:39 2008 +0300 @@ -20,8 +20,10 @@ // our socket fd int fd; - // socket event - struct event ev; + // socket events + struct event ev_connect; + struct event ev_read; + struct event ev_write; // socket bufferevent struct bufferevent *bev; @@ -56,7 +58,7 @@ /* * The given connection must be idle, whereupon the given request is processed. */ -int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req); +void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req); /* * Free all resources associated with a closed/failed connection diff -r 0e503189af2f -r e5b714190dee memcache/request.c --- a/memcache/request.c Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/request.c Thu Aug 28 00:29:39 2008 +0300 @@ -45,15 +45,6 @@ req->mc->cb_fn(req, req->cb_arg); } -void memcache_req_error (struct memcache_req *req) { - // forget our connection - req->conn = NULL; - - req->state = MEMCACHE_STATE_ERROR; - - _memcache_req_notify(req); -} - void memcache_req_queued (struct memcache_req *req) { req->state = MEMCACHE_STATE_QUEUED; @@ -73,21 +64,41 @@ _memcache_req_notify(req); } +void memcache_req_data (struct memcache_req *req) { + assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA); + + req->state = MEMCACHE_STATE_REPLY_DATA; + + _memcache_req_notify(req); +} + void memcache_req_done (struct memcache_req *req) { - // make sure we are in the STATE_SEND state - assert(req->state == MEMCACHE_STATE_SEND); + // make sure we are in the REPLY/REPLY_DATA state + assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA); + + // make sure we really have the full data, if applicable + assert(req->buf.data == NULL || req->buf.offset == req->buf.len); // forget the connection req->conn = NULL; + + // state depends on if we have data or not... + req->state = req->buf.data ? MEMCACHE_STATE_DATA_DONE : MEMCACHE_STATE_DONE; +} - // our state is currently indeterminate until req_reply is called - req->state = MEMCACHE_STATE_INVALID; +void memcache_req_error (struct memcache_req *req) { + // forget our connection + req->conn = NULL; + + req->state = MEMCACHE_STATE_ERROR; + + _memcache_req_notify(req); } void memcache_req_free (struct memcache_req *req) { // must be unused assert(req->conn == NULL); - assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR); + assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DATA_DONE); free(req->key.buf); free(req); diff -r 0e503189af2f -r e5b714190dee memcache/request.h --- a/memcache/request.h Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/request.h Thu Aug 28 00:29:39 2008 +0300 @@ -40,12 +40,6 @@ struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg); /* - * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued - * anymore, and the req should not be accessed by memcache_* code after this. - */ -void memcache_req_error (struct memcache_req *req); - -/* * The request has been queued. */ void memcache_req_queued (struct memcache_req *req); @@ -56,11 +50,31 @@ void memcache_req_send (struct memcache_req *req); /* - * The response has been received, although if the respones also contains data, that will be notified separately + * The reply has been received, although if the respones also contains data, that will be notified separately. + * + * If the response doesn't contain any data, req_done will be called after this is. Otherwise, this will be called + * first, followed by zero or more calls to req_data, followed by req_reply/RPL_END + req_done. + * + * Note that no req data will not be available when this is first called, only once req_data/req_done is called. */ void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type); /* + * Some amount of reply data has been received. This may be called between the req_reply/MEMCACHE_RPL_VALUE and + * req_reply/MEMCACHE_RPL_END + req_done notifications. + * + * Note that this is not always called, it is possible that one might have: + * memcache_req_reply(req, MEMCACHE_RPL_VALUE) + * memcache_req_reply(req, MEMCACHE_RPL_END) + * memcache_req_done(req) + * + * This will happen if both the value and the end replies are received in the same chunk. + * + * Note also that replies may have zero-length entries, in which case req_data will never be called either. + */ +void memcache_req_data (struct memcache_req *req); + +/* * The request was sent and is now done, and is not associated with the connection anymore. * * This will be called before req_reply/req_data, but not before/after req_error. @@ -68,6 +82,12 @@ void memcache_req_done (struct memcache_req *req); /* + * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued + * anymore, and the req should not be accessed by memcache_* code after this. + */ +void memcache_req_error (struct memcache_req *req); + +/* * Free an unused req. Should always be called by the user, not via internal means. */ void memcache_req_free (struct memcache_req *req); diff -r 0e503189af2f -r e5b714190dee memcache/server.c --- a/memcache/server.c Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/server.c Thu Aug 28 00:29:39 2008 +0300 @@ -67,7 +67,10 @@ if (conn != NULL) { // we found an available connection - return memcache_conn_do_req(conn, req); + // if the request fails, then we will know via conn_dead + memcache_conn_do_req(conn, req); + + return 0; } else { // enqueue the request until a connection is available @@ -92,12 +95,8 @@ // remove it from the queue and execute it TAILQ_REMOVE(&server->req_queue, req, reqqueue_node); - if (memcache_conn_do_req(conn, req)) { - WARNING("processing enqueued request failed"); - - // notify the request/user, the req is now out of our hands - memcache_req_error(req); - } + // this will take care of any error handling by itself + memcache_conn_do_req(conn, req); } } diff -r 0e503189af2f -r e5b714190dee memcache/server.h --- a/memcache/server.h Wed Aug 27 22:42:27 2008 +0300 +++ b/memcache/server.h Thu Aug 28 00:29:39 2008 +0300 @@ -44,7 +44,7 @@ void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn); /* - * The given connection failed/died + * The given connection failed/died. This will take care of freeing it/reconnecting it */ void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn);