--- a/memcache.h Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache.h Wed Aug 27 22:42:27 2008 +0300
@@ -84,6 +84,10 @@
enum memcache_req_state {
MEMCACHE_STATE_INVALID,
+ MEMCACHE_STATE_QUEUED,
+ MEMCACHE_STATE_SEND,
+ MEMCACHE_STATE_REPLY,
+
MEMCACHE_STATE_ERROR,
};
--- a/memcache/command.c Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/command.c Wed Aug 27 22:42:27 2008 +0300
@@ -1,4 +1,5 @@
-
+#include <string.h>
+#include <stdlib.h>
#include <assert.h>
#include "command.h"
@@ -14,32 +15,28 @@
"prepend" // MEMCACHE_CMD_STORE_PREPEND
};
-/*
static struct memcache_reply_info {
enum memcache_reply type;
char *name;
- int has_data;
-} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
- MEMCACHE_RPL_INVALID,
-
- MEMCACHE_RPL_ERROR,
- MEMCACHE_RPL_CLIENT_ERROR,
- MEMCACHE_RPL_SERVER_ERROR,
+} memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
+ { MEMCACHE_RPL_INVALID, NULL },
+ { MEMCACHE_RPL_ERROR, "ERROR" },
+ { MEMCACHE_RPL_CLIENT_ERROR, "CLIENT_ERROR" },
+ { MEMCACHE_RPL_SERVER_ERROR, "SERVER_ERROR" },
// MEMCACHE_CMD_FETCH_*
- MEMCACHE_RPL_VALUE,
- MEMCACHE_RPL_END,
+ { MEMCACHE_RPL_VALUE, "VALUE" },
+ { MEMCACHE_RPL_END, "END" },
// MEMCACHE_CMD_STORE_*
- MEMCACHE_RPL_STORED,
- MEMCACHE_RPL_NOT_STORED,
- MEMCACHE_RPL_EXISTS,
- MEMCACHE_RPL_NOT_FOUND,
-
+ { MEMCACHE_RPL_STORED, "STORED" },
+ { MEMCACHE_RPL_NOT_STORED, "NOT_STORED" },
+ { MEMCACHE_RPL_EXISTS, "EXISTS" },
+ { MEMCACHE_RPL_NOT_FOUND, "NOT_FOUND" }
};
-*/
+
int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
// shouldn't already have a request header yet?
@@ -117,7 +114,142 @@
}
int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) {
- // XXX: implement
- assert(0);
+ size_t line_length, buf_size;
+ char *line = NULL, *token_cursor, *token, *invalid;
+ int i;
+
+ // take note of how long the buffer is
+ buf_size = evbuffer_get_length(buf);
+
+ // first, try and read a full line
+ if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
+ // check if any data was consumed
+ if (evbuffer_get_length(buf) != buf_size) {
+ // faaaail!
+ return -1;
+
+ } else {
+ // no complete line found
+ return 0;
+ }
+ }
+
+ // just check to make sure that it really is null-delimited
+ assert(line[line_length - 1] == '\0');
+
+ // empty lines?
+ if (line_length == 0) {
+ PWARNING("empty reply line !?!");
+ return 0;
+ }
+
+ // use strsep
+ token_cursor = line;
+
+ // the first token should be the reply name
+ if ((token = strsep(&token_cursor, " ")) == NULL)
+ ERROR("no reply name in response line: %s", line);
+
+ // figure out the reply type
+ for (i = MEMCACHE_RPL_INVALID + 1; i < MEMCACHE_RPL_MAX; i++) {
+ if (strcmp(token, memcache_cmd_replies[i].name) == 0)
+ break;
+ }
+
+ // did we figure out what this reply means?
+ if (i == MEMCACHE_RPL_MAX)
+ ERROR("unrecognized reply: %s", line);
+
+ // we found the type
+ *reply_type = memcache_cmd_replies[i].type;
+
+ // default to no data
+ *has_data = 0;
+
+ switch (*reply_type) {
+ case MEMCACHE_RPL_ERROR:
+ // no additional data
+
+ break;
+
+ case MEMCACHE_RPL_CLIENT_ERROR:
+ case MEMCACHE_RPL_SERVER_ERROR:
+ // the rest of the line is a human-readable error message
+ WARNING("received a %s reply: %s", token, token_cursor);
+
+ break;
+
+ case MEMCACHE_RPL_VALUE:
+ // <key> <flags> <bytes> [<cas unique>]
+
+ // the key field
+ if ((key->buf = strsep(&token_cursor, " ")) == NULL)
+ ERROR("missing key in VALUE reply");
+
+ if ((key->len = strlen(key->buf)) == 0)
+ ERROR("zero-length key in VALUE reply");
+
+ // the flags field
+ if ((token = strsep(&token_cursor, " ")) == NULL)
+ ERROR("missing flags in VALUE reply");
+
+ obj->flags = (u_int32_t) strtol(token, &invalid, 10);
+
+ if (*invalid != '\0')
+ ERROR("invalid flags in VALUE reply: %s (%s)", token, invalid);
+
+ // the bytes field
+ if ((token = strsep(&token_cursor, " ")) == NULL)
+ ERROR("missing bytes in VALUE reply");
+
+ obj->bytes = (u_int32_t) strtol(token, &invalid, 10);
+
+ if (*invalid != '\0')
+ ERROR("invalid bytes in VALUE reply: %s (%s)", token, invalid);
+
+ // the optional cas field
+ if ((token = strsep(&token_cursor, " ")) != NULL) {
+ // there is a cas value present
+ obj->cas = (u_int64_t) strtoll(token, &invalid, 10);
+
+ if (*invalid != '\0')
+ ERROR("invalid cas value in VALUE reply: %s (%s)", token, invalid);
+
+ } else {
+ obj->cas = 0;
+
+ }
+
+ // and we do have data following this
+ *has_data = 1;
+
+ break;
+
+ case MEMCACHE_RPL_END:
+ // no additional data
+
+ break;
+
+ case MEMCACHE_RPL_STORED:
+ case MEMCACHE_RPL_NOT_STORED:
+ case MEMCACHE_RPL_EXISTS:
+ case MEMCACHE_RPL_NOT_FOUND:
+ // no additional data
+
+ break;
+
+ default:
+ assert(0);
+ };
+
+ // success
+ *header_data = line;
+
+ return 0;
+
+error:
+ free(line);
+
+ return -1;
}
Binary file memcache/command.o has changed
--- a/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/connection.c Wed Aug 27 22:42:27 2008 +0300
@@ -17,6 +17,8 @@
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
+static void memcache_conn_req_done (struct memcache_conn *conn);
+
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -89,6 +91,9 @@
if (bufferevent_enable(conn->bev, EV_WRITE))
PERROR("bufferevent_enable");
+ // tell the req that it is underway
+ memcache_req_send(req);
+
// wait for that to complete
return 0;
@@ -124,6 +129,14 @@
}
/*
+ * Start reading reply data from the connection
+ */
+void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+ // XXX: implement
+ assert(0);
+}
+
+/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -185,6 +198,7 @@
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
struct evbuffer *in_buf = bufferevent_get_input(bev);
+ struct memcache_key key;
char *header_data;
enum memcache_reply reply_type;
int has_data;
@@ -193,10 +207,35 @@
assert(evbuffer_get_length(in_buf) > 0);
// attempt to parse the response header
- if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
ERROR("memcache_cmd_parse_header");
+
+ if (!header_data) {
+ // no complete header received yet
+ return;
+ }
- // XXX: read reply data
+ // disable reads again
+ if (bufferevent_disable(bev, EV_READ))
+ PERROR("bufferevent_disable");
+
+ // does the reply include data?
+ if (has_data) {
+ // check that they key is the same
+ if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+ ERROR("got reply with wrong key !?!");
+
+ // start reading the data (including whatever might be left over in the bufferevent buffer...)
+ // XXX: what if this triggers a req notify before we do?
+ memcache_conn_handle_reply_data(conn, in_buf);
+
+ } else {
+ // the request is done with
+ memcache_conn_req_done(conn);
+ }
+
+ // notify the request
+ memcache_req_reply(conn->req, reply_type);
error:
// XXX: error handling
@@ -253,7 +292,24 @@
assert(0);
}
+/*
+ * Detach the request
+ */
+static void memcache_conn_req_done (struct memcache_conn *conn) {
+ // ensure that we do currently have a req
+ assert(conn->req);
+
+ // have the req detach and check it did so
+ memcache_req_done(conn->req);
+ assert(conn->req->conn == NULL);
+
+ // we are now available again
+ conn->req = NULL;
+}
+
void memcache_conn_free (struct memcache_conn *conn) {
+ // XXX: conn->req?
+
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
--- a/memcache/request.c Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/request.c Wed Aug 27 22:42:27 2008 +0300
@@ -41,20 +41,47 @@
return NULL;
}
-static int _memcache_req_notify (struct memcache_req *req) {
- return req->mc->cb_fn(req, req->cb_arg);
+static void _memcache_req_notify (struct memcache_req *req) {
+ req->mc->cb_fn(req, req->cb_arg);
}
void memcache_req_error (struct memcache_req *req) {
// forget our connection
req->conn = NULL;
-
- // enter ERROR state
+
req->state = MEMCACHE_STATE_ERROR;
- // notify
- if (_memcache_req_notify(req))
- WARNING("req error callback failed, ignoring");
+ _memcache_req_notify(req);
+}
+
+void memcache_req_queued (struct memcache_req *req) {
+ req->state = MEMCACHE_STATE_QUEUED;
+
+// _memcache_req_notify(req);
+}
+
+void memcache_req_send (struct memcache_req *req) {
+ req->state = MEMCACHE_STATE_SEND;
+
+// _memcache_req_notify(req);
+}
+
+void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type) {
+ req->state = MEMCACHE_STATE_REPLY;
+ req->reply_type = reply_type;
+
+ _memcache_req_notify(req);
+}
+
+void memcache_req_done (struct memcache_req *req) {
+ // make sure we are in the STATE_SEND state
+ assert(req->state == MEMCACHE_STATE_SEND);
+
+ // forget the connection
+ req->conn = NULL;
+
+ // our state is currently indeterminate until req_reply is called
+ req->state = MEMCACHE_STATE_INVALID;
}
void memcache_req_free (struct memcache_req *req) {
--- a/memcache/request.h Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/request.h Wed Aug 27 22:42:27 2008 +0300
@@ -13,6 +13,9 @@
// the command to execute
enum memcache_command cmd_type;
+ // the reply we have received, or MEMCACHE_REPLY_INVALID
+ enum memcache_reply reply_type;
+
// our key/obj
struct memcache_key key;
struct memcache_obj obj;
@@ -36,7 +39,6 @@
*/
struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
-
/*
* An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
* anymore, and the req should not be accessed by memcache_* code after this.
@@ -44,6 +46,28 @@
void memcache_req_error (struct memcache_req *req);
/*
+ * The request has been queued.
+ */
+void memcache_req_queued (struct memcache_req *req);
+
+/*
+ * The request is being sent.
+ */
+void memcache_req_send (struct memcache_req *req);
+
+/*
+ * The response has been received, although if the respones also contains data, that will be notified separately
+ */
+void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type);
+
+/*
+ * The request was sent and is now done, and is not associated with the connection anymore.
+ *
+ * This will be called before req_reply/req_data, but not before/after req_error.
+ */
+void memcache_req_done (struct memcache_req *req);
+
+/*
* Free an unused req. Should always be called by the user, not via internal means.
*/
void memcache_req_free (struct memcache_req *req);
--- a/memcache/server.c Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/server.c Wed Aug 27 22:42:27 2008 +0300
@@ -74,6 +74,9 @@
// XXX: queue size limits
TAILQ_INSERT_TAIL(&server->req_queue, req, reqqueue_node);
+
+ // notify the req
+ memcache_req_queued(req);
return 0;
}