# HG changeset patch # User Tero Marttila # Date 1219866147 -10800 # Node ID 0e503189af2feca8db5932cc0be2d1872f5fae9c # Parent 540737bf6bac2907b08eddbeadd237d313d42163 more reply-receiving code, but still incomplete diff -r 540737bf6bac -r 0e503189af2f memcache.h --- a/memcache.h Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache.h Wed Aug 27 22:42:27 2008 +0300 @@ -84,6 +84,10 @@ enum memcache_req_state { MEMCACHE_STATE_INVALID, + MEMCACHE_STATE_QUEUED, + MEMCACHE_STATE_SEND, + MEMCACHE_STATE_REPLY, + MEMCACHE_STATE_ERROR, }; diff -r 540737bf6bac -r 0e503189af2f memcache/command.c --- a/memcache/command.c Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache/command.c Wed Aug 27 22:42:27 2008 +0300 @@ -1,4 +1,5 @@ - +#include +#include #include #include "command.h" @@ -14,32 +15,28 @@ "prepend" // MEMCACHE_CMD_STORE_PREPEND }; -/* static struct memcache_reply_info { enum memcache_reply type; char *name; - int has_data; -} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = { - MEMCACHE_RPL_INVALID, - - MEMCACHE_RPL_ERROR, - MEMCACHE_RPL_CLIENT_ERROR, - MEMCACHE_RPL_SERVER_ERROR, +} memcache_cmd_replies[MEMCACHE_RPL_MAX] = { + { MEMCACHE_RPL_INVALID, NULL }, + { MEMCACHE_RPL_ERROR, "ERROR" }, + { MEMCACHE_RPL_CLIENT_ERROR, "CLIENT_ERROR" }, + { MEMCACHE_RPL_SERVER_ERROR, "SERVER_ERROR" }, // MEMCACHE_CMD_FETCH_* - MEMCACHE_RPL_VALUE, - MEMCACHE_RPL_END, + { MEMCACHE_RPL_VALUE, "VALUE" }, + { MEMCACHE_RPL_END, "END" }, // MEMCACHE_CMD_STORE_* - MEMCACHE_RPL_STORED, - MEMCACHE_RPL_NOT_STORED, - MEMCACHE_RPL_EXISTS, - MEMCACHE_RPL_NOT_FOUND, - + { MEMCACHE_RPL_STORED, "STORED" }, + { MEMCACHE_RPL_NOT_STORED, "NOT_STORED" }, + { MEMCACHE_RPL_EXISTS, "EXISTS" }, + { MEMCACHE_RPL_NOT_FOUND, "NOT_FOUND" } }; -*/ + int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) { // shouldn't already have a request header yet? @@ -117,7 +114,142 @@ } int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) { - // XXX: implement - assert(0); + size_t line_length, buf_size; + char *line = NULL, *token_cursor, *token, *invalid; + int i; + + // take note of how long the buffer is + buf_size = evbuffer_get_length(buf); + + // first, try and read a full line + if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) { + // check if any data was consumed + if (evbuffer_get_length(buf) != buf_size) { + // faaaail! + return -1; + + } else { + // no complete line found + return 0; + } + } + + // just check to make sure that it really is null-delimited + assert(line[line_length - 1] == '\0'); + + // empty lines? + if (line_length == 0) { + PWARNING("empty reply line !?!"); + return 0; + } + + // use strsep + token_cursor = line; + + // the first token should be the reply name + if ((token = strsep(&token_cursor, " ")) == NULL) + ERROR("no reply name in response line: %s", line); + + // figure out the reply type + for (i = MEMCACHE_RPL_INVALID + 1; i < MEMCACHE_RPL_MAX; i++) { + if (strcmp(token, memcache_cmd_replies[i].name) == 0) + break; + } + + // did we figure out what this reply means? + if (i == MEMCACHE_RPL_MAX) + ERROR("unrecognized reply: %s", line); + + // we found the type + *reply_type = memcache_cmd_replies[i].type; + + // default to no data + *has_data = 0; + + switch (*reply_type) { + case MEMCACHE_RPL_ERROR: + // no additional data + + break; + + case MEMCACHE_RPL_CLIENT_ERROR: + case MEMCACHE_RPL_SERVER_ERROR: + // the rest of the line is a human-readable error message + WARNING("received a %s reply: %s", token, token_cursor); + + break; + + case MEMCACHE_RPL_VALUE: + // [] + + // the key field + if ((key->buf = strsep(&token_cursor, " ")) == NULL) + ERROR("missing key in VALUE reply"); + + if ((key->len = strlen(key->buf)) == 0) + ERROR("zero-length key in VALUE reply"); + + // the flags field + if ((token = strsep(&token_cursor, " ")) == NULL) + ERROR("missing flags in VALUE reply"); + + obj->flags = (u_int32_t) strtol(token, &invalid, 10); + + if (*invalid != '\0') + ERROR("invalid flags in VALUE reply: %s (%s)", token, invalid); + + // the bytes field + if ((token = strsep(&token_cursor, " ")) == NULL) + ERROR("missing bytes in VALUE reply"); + + obj->bytes = (u_int32_t) strtol(token, &invalid, 10); + + if (*invalid != '\0') + ERROR("invalid bytes in VALUE reply: %s (%s)", token, invalid); + + // the optional cas field + if ((token = strsep(&token_cursor, " ")) != NULL) { + // there is a cas value present + obj->cas = (u_int64_t) strtoll(token, &invalid, 10); + + if (*invalid != '\0') + ERROR("invalid cas value in VALUE reply: %s (%s)", token, invalid); + + } else { + obj->cas = 0; + + } + + // and we do have data following this + *has_data = 1; + + break; + + case MEMCACHE_RPL_END: + // no additional data + + break; + + case MEMCACHE_RPL_STORED: + case MEMCACHE_RPL_NOT_STORED: + case MEMCACHE_RPL_EXISTS: + case MEMCACHE_RPL_NOT_FOUND: + // no additional data + + break; + + default: + assert(0); + }; + + // success + *header_data = line; + + return 0; + +error: + free(line); + + return -1; } diff -r 540737bf6bac -r 0e503189af2f memcache/command.o Binary file memcache/command.o has changed diff -r 540737bf6bac -r 0e503189af2f memcache/connection.c --- a/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache/connection.c Wed Aug 27 22:42:27 2008 +0300 @@ -17,6 +17,8 @@ static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); +static void memcache_conn_req_done (struct memcache_conn *conn); + struct memcache_conn *memcache_conn_open (struct memcache_server *server) { struct memcache_conn *conn = NULL; @@ -89,6 +91,9 @@ if (bufferevent_enable(conn->bev, EV_WRITE)) PERROR("bufferevent_enable"); + // tell the req that it is underway + memcache_req_send(req); + // wait for that to complete return 0; @@ -124,6 +129,14 @@ } /* + * Start reading reply data from the connection + */ +void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) { + // XXX: implement + assert(0); +} + +/* * The connect() has finished */ static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { @@ -185,6 +198,7 @@ static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { struct memcache_conn *conn = arg; struct evbuffer *in_buf = bufferevent_get_input(bev); + struct memcache_key key; char *header_data; enum memcache_reply reply_type; int has_data; @@ -193,10 +207,35 @@ assert(evbuffer_get_length(in_buf) > 0); // attempt to parse the response header - if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data)) + if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) ERROR("memcache_cmd_parse_header"); + + if (!header_data) { + // no complete header received yet + return; + } - // XXX: read reply data + // disable reads again + if (bufferevent_disable(bev, EV_READ)) + PERROR("bufferevent_disable"); + + // does the reply include data? + if (has_data) { + // check that they key is the same + if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0) + ERROR("got reply with wrong key !?!"); + + // start reading the data (including whatever might be left over in the bufferevent buffer...) + // XXX: what if this triggers a req notify before we do? + memcache_conn_handle_reply_data(conn, in_buf); + + } else { + // the request is done with + memcache_conn_req_done(conn); + } + + // notify the request + memcache_req_reply(conn->req, reply_type); error: // XXX: error handling @@ -253,7 +292,24 @@ assert(0); } +/* + * Detach the request + */ +static void memcache_conn_req_done (struct memcache_conn *conn) { + // ensure that we do currently have a req + assert(conn->req); + + // have the req detach and check it did so + memcache_req_done(conn->req); + assert(conn->req->conn == NULL); + + // we are now available again + conn->req = NULL; +} + void memcache_conn_free (struct memcache_conn *conn) { + // XXX: conn->req? + // ensure that the connection is not considered to be connected anymore assert(!conn->is_connected); diff -r 540737bf6bac -r 0e503189af2f memcache/request.c --- a/memcache/request.c Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache/request.c Wed Aug 27 22:42:27 2008 +0300 @@ -41,20 +41,47 @@ return NULL; } -static int _memcache_req_notify (struct memcache_req *req) { - return req->mc->cb_fn(req, req->cb_arg); +static void _memcache_req_notify (struct memcache_req *req) { + req->mc->cb_fn(req, req->cb_arg); } void memcache_req_error (struct memcache_req *req) { // forget our connection req->conn = NULL; - - // enter ERROR state + req->state = MEMCACHE_STATE_ERROR; - // notify - if (_memcache_req_notify(req)) - WARNING("req error callback failed, ignoring"); + _memcache_req_notify(req); +} + +void memcache_req_queued (struct memcache_req *req) { + req->state = MEMCACHE_STATE_QUEUED; + +// _memcache_req_notify(req); +} + +void memcache_req_send (struct memcache_req *req) { + req->state = MEMCACHE_STATE_SEND; + +// _memcache_req_notify(req); +} + +void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type) { + req->state = MEMCACHE_STATE_REPLY; + req->reply_type = reply_type; + + _memcache_req_notify(req); +} + +void memcache_req_done (struct memcache_req *req) { + // make sure we are in the STATE_SEND state + assert(req->state == MEMCACHE_STATE_SEND); + + // forget the connection + req->conn = NULL; + + // our state is currently indeterminate until req_reply is called + req->state = MEMCACHE_STATE_INVALID; } void memcache_req_free (struct memcache_req *req) { diff -r 540737bf6bac -r 0e503189af2f memcache/request.h --- a/memcache/request.h Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache/request.h Wed Aug 27 22:42:27 2008 +0300 @@ -13,6 +13,9 @@ // the command to execute enum memcache_command cmd_type; + // the reply we have received, or MEMCACHE_REPLY_INVALID + enum memcache_reply reply_type; + // our key/obj struct memcache_key key; struct memcache_obj obj; @@ -36,7 +39,6 @@ */ struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg); - /* * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued * anymore, and the req should not be accessed by memcache_* code after this. @@ -44,6 +46,28 @@ void memcache_req_error (struct memcache_req *req); /* + * The request has been queued. + */ +void memcache_req_queued (struct memcache_req *req); + +/* + * The request is being sent. + */ +void memcache_req_send (struct memcache_req *req); + +/* + * The response has been received, although if the respones also contains data, that will be notified separately + */ +void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type); + +/* + * The request was sent and is now done, and is not associated with the connection anymore. + * + * This will be called before req_reply/req_data, but not before/after req_error. + */ +void memcache_req_done (struct memcache_req *req); + +/* * Free an unused req. Should always be called by the user, not via internal means. */ void memcache_req_free (struct memcache_req *req); diff -r 540737bf6bac -r 0e503189af2f memcache/server.c --- a/memcache/server.c Wed Aug 27 21:30:32 2008 +0300 +++ b/memcache/server.c Wed Aug 27 22:42:27 2008 +0300 @@ -74,6 +74,9 @@ // XXX: queue size limits TAILQ_INSERT_TAIL(&server->req_queue, req, reqqueue_node); + + // notify the req + memcache_req_queued(req); return 0; }