--- a/memcache.h Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache.h Thu Aug 28 00:29:39 2008 +0300
@@ -87,6 +87,10 @@
MEMCACHE_STATE_QUEUED,
MEMCACHE_STATE_SEND,
MEMCACHE_STATE_REPLY,
+ MEMCACHE_STATE_REPLY_DATA,
+
+ MEMCACHE_STATE_DONE,
+ MEMCACHE_STATE_DATA_DONE,
MEMCACHE_STATE_ERROR,
};
--- a/memcache/command.c Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/command.c Thu Aug 28 00:29:39 2008 +0300
@@ -92,7 +92,7 @@
case MEMCACHE_CMD_STORE_PREPEND:
assert(key != NULL && obj != NULL);
assert(key->len > 0 && key->buf != NULL);
- assert(obj->bytes > 0);
+ // XXX: ensure that we have a valid buf
if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes))
ERROR("evbuffer_add_printf");
@@ -117,32 +117,33 @@
size_t line_length, buf_size;
char *line = NULL, *token_cursor, *token, *invalid;
int i;
-
- // take note of how long the buffer is
- buf_size = evbuffer_get_length(buf);
+
+ // read lines until we find one that isn't empty
+ do {
+ // take note of how long the buffer is
+ buf_size = evbuffer_get_length(buf);
- // first, try and read a full line
- if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
- // check if any data was consumed
- if (evbuffer_get_length(buf) != buf_size) {
- // faaaail!
- return -1;
+ // free the prvious line in case it was empty
+ free(line);
- } else {
- // no complete line found
- return 0;
+ // try and read a line
+ if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
+ // check if any data was consumed
+ if (evbuffer_get_length(buf) != buf_size) {
+ // faaaail!
+ return -1;
+
+ } else {
+ // no complete line found
+ return 0;
+ }
}
- }
+
+ } while (line_length == 0);
// just check to make sure that it really is null-delimited
assert(line[line_length - 1] == '\0');
- // empty lines?
- if (line_length == 0) {
- PWARNING("empty reply line !?!");
- return 0;
- }
-
// use strsep
token_cursor = line;
--- a/memcache/command.h Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/command.h Thu Aug 28 00:29:39 2008 +0300
@@ -13,6 +13,17 @@
int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+
+/*
+ * Attempt to parse a response line from the given buf. *header_data will be set to NULL if no complete response line
+ * was found, or a pointer to the line received (which may contain NULs between tokens), which must be freed by the
+ * caller. *reply_type will be set to the parsed `enum memcache_reply` type. If the reply is of type
+ * MEMCACHE_RPL_VALUE, then key and obj will be updated to correspond to what was returned (key.buf will point into
+ * *header_data). has_data will be set to true if the response also contains cache data.
+ *
+ * Empty lines will be skipped based on the assumption that they are, in fact, the newline between the data and the
+ * MEMCACHE_RPL_END line of a MEMCACHE_RPL_VALUE reply.
+ */
int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data);
#endif /* MEMCACHE_COMMAND_H */
Binary file memcache/command.o has changed
--- a/memcache/connection.c Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/connection.c Thu Aug 28 00:29:39 2008 +0300
@@ -16,7 +16,10 @@
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
+static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
+static void memcache_conn_error (struct memcache_conn *conn);
+static void memcache_conn_req_error (struct memcache_conn *conn);
static void memcache_conn_req_done (struct memcache_conn *conn);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
@@ -52,10 +55,10 @@
assert(conn->fd > 0);
// set up the connect event
- event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
+ event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
// add it
- if (event_add(&conn->ev, NULL))
+ if (event_add(&conn->ev_connect, NULL))
PERROR("event_add");
// success
@@ -76,7 +79,7 @@
return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
}
-int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
+void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
assert(conn->fd > 0 && conn->is_connected);
assert(conn->req == NULL);
@@ -84,27 +87,36 @@
conn->req = req;
// write the request header into our bufferevent's output buffer
- if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+ if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
+ // just fail the request
+ memcache_conn_req_error(conn);
+
ERROR("failed to init the cmd");
+ }
// tell our bufferevent to send it
- if (bufferevent_enable(conn->bev, EV_WRITE))
+ if (bufferevent_enable(conn->bev, EV_WRITE)) {
+ // fail the entire connection
+ memcache_conn_error(conn);
+
PERROR("bufferevent_enable");
+ }
// tell the req that it is underway
memcache_req_send(req);
- // wait for that to complete
- return 0;
+error:
-error:
- return -1;
+ return;
}
/*
* Start writing out the request data
*/
void memcache_conn_send_req_data (struct memcache_conn *conn) {
+ // set up the ev_write
+ event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
+
// just fake a call to the event handler
_memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
}
@@ -116,24 +128,89 @@
// ensure that we either didn't have a command, or it has been sent
assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
- // start reading on the bufferevent
+ // start/continue reading on the bufferevent
if (bufferevent_enable(conn->bev, EV_READ))
PERROR("bufferevent_enable");
+ // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
+ // will consume all available data iteratively.
+
// ok, wait for the reply
return;
error:
- // XXX: error handling
- assert(0);
+ memcache_conn_req_error(conn);
}
/*
* Start reading reply data from the connection
*/
void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
- // XXX: implement
- assert(0);
+ int ret;
+
+ // check that the buf doesn't contain any data
+ assert(conn->req->buf.data == NULL);
+
+ // bytes *may* be zero if we have an empty cache entry
+ if (conn->req->obj.bytes > 0) {
+ // allocate a buffer for the reply data
+ if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
+ ERROR("malloc");
+
+ // update the length
+ conn->req->buf.len = conn->req->obj.bytes;
+
+ // set offset to zero
+ conn->req->buf.offset = 0;
+
+ // do we have any data in the buf that we need to copy?
+ if (evbuffer_get_length(buf) > 0) {
+ // read the data into the memcache_buf
+ ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len);
+
+ // sanity check...
+ assert(ret > 0 && ret <= conn->req->buf.len);
+
+ // update offset
+ conn->req->buf.offset += ret;
+ }
+
+ // still need to receive more data?
+ if (conn->req->buf.offset < conn->req->buf.len) {
+
+ // disable the bufferevent while we read the data
+ if (bufferevent_disable(conn->bev, EV_READ))
+ PERROR("bufferevent_disable");
+
+ // set up the ev_read
+ event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn);
+
+ // then receive what data is left to receive
+ _memcache_conn_ev_read(conn->fd, EV_READ, conn);
+
+ // wait for the data to arrive
+ return;
+
+ } else {
+ // the buffer already contained the cache data, no need to read any more
+
+ }
+ } else {
+ // there is no data to receive for this item, so we can ignore this
+
+ }
+
+ // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ
+ // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then
+ // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called.
+ // Elegant!
+ memcache_conn_handle_reply(conn);
+
+ // ok
+ return;
+
+error:
+ memcache_conn_req_error(conn);
}
/*
@@ -168,8 +245,7 @@
return;
error:
- // notify the server
- memcache_server_conn_dead(conn->server, conn);
+ memcache_conn_error(conn);
}
/*
@@ -205,47 +281,56 @@
// ensure that we do indeed have some data
assert(evbuffer_get_length(in_buf) > 0);
-
- // attempt to parse the response header
- if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
- ERROR("memcache_cmd_parse_header");
- if (!header_data) {
- // no complete header received yet
- return;
- }
+ // consume as much data as possible
+ do {
+ // attempt to parse the response header
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
+ ERROR("memcache_cmd_parse_header");
+
+ if (!header_data) {
+ // no complete header received yet
+ return;
+ }
- // disable reads again
- if (bufferevent_disable(bev, EV_READ))
- PERROR("bufferevent_disable");
-
- // does the reply include data?
- if (has_data) {
- // check that they key is the same
- if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+ // if the reply contains data, check that they key is the same
+ if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0))
ERROR("got reply with wrong key !?!");
- // start reading the data (including whatever might be left over in the bufferevent buffer...)
- // XXX: what if this triggers a req notify before we do?
- memcache_conn_handle_reply_data(conn, in_buf);
+ // notify the request (no reply data is ready for reading yet, though)
+ memcache_req_reply(conn->req, reply_type);
+
+ // does the reply include data?
+ if (has_data) {
+ // start reading the data (including whatever might be left over in the bufferevent buffer...)
+ memcache_conn_handle_reply_data(conn, in_buf);
- } else {
- // the request is done with
- memcache_conn_req_done(conn);
- }
+ } else {
+ // the request is done with
+ memcache_conn_req_done(conn);
+ }
+
+ // free the header data
+ free(header_data);
- // notify the request
- memcache_req_reply(conn->req, reply_type);
+ } while (evbuffer_get_length(in_buf) > 0);
+
+ // done
+ return;
+
+error:
+ // free the header data read from the buf
+ free(header_data);
-error:
- // XXX: error handling
- return;
+ memcache_conn_req_error(conn);
}
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
- // XXX: error handling
- assert(0);
+ struct memcache_conn *conn = arg;
+
+ // fail the entire connection
+ memcache_conn_error(conn);
}
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
@@ -265,7 +350,7 @@
// should never be the case... ?
if (ret == 0)
- ERROR("write returned 0 !?!");
+ ERROR("write returned EOF !?!");
// did we manage to write some data?
if (ret > 0) {
@@ -276,7 +361,7 @@
// data left to write?
if (buf->offset < buf->len) {
// reschedule
- if (event_add(&conn->ev, NULL))
+ if (event_add(&conn->ev_write, NULL))
PERROR("event_add");
} else {
@@ -288,8 +373,87 @@
return;
error:
- // XXX: error handling
- assert(0);
+ // fail the entire connection
+ memcache_conn_error(conn);
+}
+
+static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
+ struct memcache_conn *conn = arg;
+ struct memcache_buf *buf = &conn->req->buf;
+ int ret;
+
+ // correct event
+ assert(event == EV_READ);
+
+ // we do indeed expect to receive data
+ assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+
+ // do the actual read()
+ if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+ PERROR("read");
+
+ // should never be the case... ?
+ if (ret == 0)
+ ERROR("read returned EOF !?!");
+
+ // did we manage to read some data?
+ if (ret > 0) {
+ // update offset
+ buf->offset += ret;
+ }
+
+ // only notify the req if new data was received, and we won't be calling req_done next.
+ if (ret > 0 && buf->offset < buf->len) {
+ // notify the req
+ memcache_req_data(conn->req);
+ }
+
+ // data left to read?
+ if (buf->offset < buf->len) {
+ // reschedule
+ if (event_add(&conn->ev_read, NULL))
+ PERROR("event_add");
+
+
+ } else {
+ // done! We can let the bufferenvet handle the rest of the reply now
+ memcache_conn_handle_reply(conn);
+ }
+
+ // success
+ return;
+
+error:
+ // fail the entire connection
+ memcache_conn_error(conn);
+}
+
+/*
+ * The entire connection failed
+ */
+static void memcache_conn_error (struct memcache_conn *conn) {
+ // fail the request, if we have one
+ if (conn->req)
+ memcache_conn_req_error(conn);
+
+ // tell the server we failed
+ memcache_server_conn_dead(conn->server, conn);
+}
+
+/*
+ * Request failed somehow
+ */
+static void memcache_conn_req_error (struct memcache_conn *conn) {
+ // ensure that we do currently have a req
+ assert(conn->req);
+
+ // error out the req
+ memcache_req_error(conn->req);
+ assert(conn->req->conn == NULL);
+
+ // we are now available again
+ conn->req = NULL;
+
}
/*
@@ -308,7 +472,8 @@
}
void memcache_conn_free (struct memcache_conn *conn) {
- // XXX: conn->req?
+ // ensure we don't have a req bound to us
+ assert(conn->req == NULL);
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
@@ -321,9 +486,15 @@
conn->fd = 0;
}
- // ensure that the event is not pending anymore
- assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0);
+ // ensure that the events are not pending anymore
+ assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
+ assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
+ assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
+ // free the bufferevent
+ if (conn->bev)
+ bufferevent_free(conn->bev);
+
// free it
free(conn);
}
--- a/memcache/connection.h Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/connection.h Thu Aug 28 00:29:39 2008 +0300
@@ -20,8 +20,10 @@
// our socket fd
int fd;
- // socket event
- struct event ev;
+ // socket events
+ struct event ev_connect;
+ struct event ev_read;
+ struct event ev_write;
// socket bufferevent
struct bufferevent *bev;
@@ -56,7 +58,7 @@
/*
* The given connection must be idle, whereupon the given request is processed.
*/
-int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
+void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
/*
* Free all resources associated with a closed/failed connection
--- a/memcache/request.c Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/request.c Thu Aug 28 00:29:39 2008 +0300
@@ -45,15 +45,6 @@
req->mc->cb_fn(req, req->cb_arg);
}
-void memcache_req_error (struct memcache_req *req) {
- // forget our connection
- req->conn = NULL;
-
- req->state = MEMCACHE_STATE_ERROR;
-
- _memcache_req_notify(req);
-}
-
void memcache_req_queued (struct memcache_req *req) {
req->state = MEMCACHE_STATE_QUEUED;
@@ -73,21 +64,41 @@
_memcache_req_notify(req);
}
+void memcache_req_data (struct memcache_req *req) {
+ assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA);
+
+ req->state = MEMCACHE_STATE_REPLY_DATA;
+
+ _memcache_req_notify(req);
+}
+
void memcache_req_done (struct memcache_req *req) {
- // make sure we are in the STATE_SEND state
- assert(req->state == MEMCACHE_STATE_SEND);
+ // make sure we are in the REPLY/REPLY_DATA state
+ assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA);
+
+ // make sure we really have the full data, if applicable
+ assert(req->buf.data == NULL || req->buf.offset == req->buf.len);
// forget the connection
req->conn = NULL;
+
+ // state depends on if we have data or not...
+ req->state = req->buf.data ? MEMCACHE_STATE_DATA_DONE : MEMCACHE_STATE_DONE;
+}
- // our state is currently indeterminate until req_reply is called
- req->state = MEMCACHE_STATE_INVALID;
+void memcache_req_error (struct memcache_req *req) {
+ // forget our connection
+ req->conn = NULL;
+
+ req->state = MEMCACHE_STATE_ERROR;
+
+ _memcache_req_notify(req);
}
void memcache_req_free (struct memcache_req *req) {
// must be unused
assert(req->conn == NULL);
- assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR);
+ assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DATA_DONE);
free(req->key.buf);
free(req);
--- a/memcache/request.h Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/request.h Thu Aug 28 00:29:39 2008 +0300
@@ -40,12 +40,6 @@
struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
/*
- * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
- * anymore, and the req should not be accessed by memcache_* code after this.
- */
-void memcache_req_error (struct memcache_req *req);
-
-/*
* The request has been queued.
*/
void memcache_req_queued (struct memcache_req *req);
@@ -56,11 +50,31 @@
void memcache_req_send (struct memcache_req *req);
/*
- * The response has been received, although if the respones also contains data, that will be notified separately
+ * The reply has been received, although if the respones also contains data, that will be notified separately.
+ *
+ * If the response doesn't contain any data, req_done will be called after this is. Otherwise, this will be called
+ * first, followed by zero or more calls to req_data, followed by req_reply/RPL_END + req_done.
+ *
+ * Note that no req data will not be available when this is first called, only once req_data/req_done is called.
*/
void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type);
/*
+ * Some amount of reply data has been received. This may be called between the req_reply/MEMCACHE_RPL_VALUE and
+ * req_reply/MEMCACHE_RPL_END + req_done notifications.
+ *
+ * Note that this is not always called, it is possible that one might have:
+ * memcache_req_reply(req, MEMCACHE_RPL_VALUE)
+ * memcache_req_reply(req, MEMCACHE_RPL_END)
+ * memcache_req_done(req)
+ *
+ * This will happen if both the value and the end replies are received in the same chunk.
+ *
+ * Note also that replies may have zero-length entries, in which case req_data will never be called either.
+ */
+void memcache_req_data (struct memcache_req *req);
+
+/*
* The request was sent and is now done, and is not associated with the connection anymore.
*
* This will be called before req_reply/req_data, but not before/after req_error.
@@ -68,6 +82,12 @@
void memcache_req_done (struct memcache_req *req);
/*
+ * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
+ * anymore, and the req should not be accessed by memcache_* code after this.
+ */
+void memcache_req_error (struct memcache_req *req);
+
+/*
* Free an unused req. Should always be called by the user, not via internal means.
*/
void memcache_req_free (struct memcache_req *req);
--- a/memcache/server.c Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/server.c Thu Aug 28 00:29:39 2008 +0300
@@ -67,7 +67,10 @@
if (conn != NULL) {
// we found an available connection
- return memcache_conn_do_req(conn, req);
+ // if the request fails, then we will know via conn_dead
+ memcache_conn_do_req(conn, req);
+
+ return 0;
} else {
// enqueue the request until a connection is available
@@ -92,12 +95,8 @@
// remove it from the queue and execute it
TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
- if (memcache_conn_do_req(conn, req)) {
- WARNING("processing enqueued request failed");
-
- // notify the request/user, the req is now out of our hands
- memcache_req_error(req);
- }
+ // this will take care of any error handling by itself
+ memcache_conn_do_req(conn, req);
}
}
--- a/memcache/server.h Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/server.h Thu Aug 28 00:29:39 2008 +0300
@@ -44,7 +44,7 @@
void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn);
/*
- * The given connection failed/died
+ * The given connection failed/died. This will take care of freeing it/reconnecting it
*/
void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn);