--- a/Makefile Thu Aug 28 01:42:28 2008 +0300
+++ b/Makefile Thu Aug 28 03:12:11 2008 +0300
@@ -16,7 +16,7 @@
web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
coro_test: coro_test.o common.o config.o socket.o
cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
-memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o memcache/command.o socket.o config.o
+memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o memcache/command.o memcache/strings.o socket.o config.o
clean :
-rm *.o ${EXECS}
--- a/memcache.h Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache.h Thu Aug 28 03:12:11 2008 +0300
@@ -42,6 +42,9 @@
/*
* The length of the key buffer in bytes. This does not include a NUL byte.
+ *
+ * If this is given as zero, then the length will be calculsted from buf using strlen(). Empty keys are not
+ * allowed, so this will result in an error if buf is an empty string.
*/
size_t len;
};
@@ -282,7 +285,7 @@
* This is called whenever the request's state changes, including when new data is received in the STATE_REPLY_DATA
* state.
*/
-typedef int (*memcache_cb) (struct memcache_req *, void*);
+typedef void (*memcache_cb) (struct memcache_req *, void*);
/*
* Allocate a new memcache context for use with other methods.
@@ -403,4 +406,11 @@
*/
void memcache_req_free (struct memcache_req *req);
+/*
+ * Translate commands/replies/states into strings.
+ */
+const char *memcache_command_str (enum memcache_command cmd);
+const char *memcache_reply_str (enum memcache_reply reply);
+const char *memcache_state_str (enum memcache_req_state state);
+
#endif /* MEMCACHE_H */
--- a/memcache/command.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/command.c Thu Aug 28 03:12:11 2008 +0300
@@ -94,7 +94,7 @@
assert(key->len > 0 && key->buf != NULL);
// XXX: ensure that we have a valid buf
- if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes))
+ if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes) == -1)
ERROR("evbuffer_add_printf");
break;
@@ -142,7 +142,7 @@
} while (line_length == 0);
// just check to make sure that it really is null-delimited
- assert(line[line_length - 1] == '\0');
+ assert(line[line_length] == '\0');
// use strsep
token_cursor = line;
--- a/memcache/command.h Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/command.h Thu Aug 28 03:12:11 2008 +0300
@@ -12,6 +12,11 @@
int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+/*
+ * Write the request header corresponding to the given command/key/obj to the given evbuffer.
+ *
+ * This must be atomic, so if it fails, it must not modify buf.
+ */
int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
/*
Binary file memcache/command.o has changed
--- a/memcache/connection.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/connection.c Thu Aug 28 03:12:11 2008 +0300
@@ -19,9 +19,10 @@
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
static void memcache_conn_error (struct memcache_conn *conn);
-static void memcache_conn_req_error (struct memcache_conn *conn);
static void memcache_conn_req_done (struct memcache_conn *conn);
+void memcache_conn_close (struct memcache_conn *conn);
+
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -83,31 +84,30 @@
assert(conn->fd > 0 && conn->is_connected);
assert(conn->req == NULL);
- // store the req
- conn->req = req;
-
// write the request header into our bufferevent's output buffer
if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
- // just fail the request
- memcache_conn_req_error(conn);
-
ERROR("failed to init the cmd");
}
+ // store the req
+ conn->req = req;
+
// tell our bufferevent to send it
- if (bufferevent_enable(conn->bev, EV_WRITE)) {
- // fail the entire connection
- memcache_conn_error(conn);
-
+ if (bufferevent_enable(conn->bev, EV_WRITE))
PERROR("bufferevent_enable");
- }
// tell the req that it is underway
memcache_req_send(req);
+
+ // success
+ return;
error:
+ if (conn->req)
+ memcache_conn_error(conn);
- return;
+ else
+ memcache_req_error(req);
}
/*
@@ -122,6 +122,20 @@
}
/*
+ * Write out the final \r\n to terminate the request data
+ */
+void memcache_conn_finish_req_data (struct memcache_conn *conn) {
+ if (bufferevent_write(conn->bev, "\r\n", 2))
+ PERROR("bufferevent_write");
+
+ // ok
+ return;
+
+error:
+ memcache_conn_error(conn);
+}
+
+/*
* Start reading a reply from the connection
*/
void memcache_conn_handle_reply (struct memcache_conn *conn) {
@@ -139,7 +153,7 @@
return;
error:
- memcache_conn_req_error(conn);
+ memcache_conn_error(conn);
}
/*
@@ -210,7 +224,7 @@
return;
error:
- memcache_conn_req_error(conn);
+ memcache_conn_error(conn);
}
/*
@@ -258,7 +272,8 @@
assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
// does this request have some data to be included in the request?
- if (conn->req->buf.data > 0) {
+ // if the data has already been sent (we handle the final \r\n as well), then skip this.
+ if (conn->req->have_buf && conn->req->buf.offset == 0) {
// we need to send the request data next
memcache_conn_send_req_data(conn);
@@ -322,7 +337,7 @@
// free the header data read from the buf
free(header_data);
- memcache_conn_req_error(conn);
+ memcache_conn_error(conn);
}
@@ -365,8 +380,8 @@
PERROR("event_add");
} else {
- // done! We can handle the reply now
- memcache_conn_handle_reply(conn);
+ // done! Send the terminating \r\n next
+ memcache_conn_finish_req_data(conn);
}
// success
@@ -428,35 +443,30 @@
memcache_conn_error(conn);
}
+// XXX: need to flush/disable buffers/events on errors
+
/*
* The entire connection failed
*/
static void memcache_conn_error (struct memcache_conn *conn) {
// fail the request, if we have one
- if (conn->req)
- memcache_conn_req_error(conn);
+ if (conn->req) {
+ // error out the req
+ memcache_req_error(conn->req);
+ assert(conn->req->conn == NULL);
+
+ // we are now available again
+ conn->req = NULL;
+ }
+
+ // close the connection
+ memcache_conn_close(conn);
// tell the server we failed
memcache_server_conn_dead(conn->server, conn);
}
/*
- * Request failed somehow
- */
-static void memcache_conn_req_error (struct memcache_conn *conn) {
- // ensure that we do currently have a req
- assert(conn->req);
-
- // error out the req
- memcache_req_error(conn->req);
- assert(conn->req->conn == NULL);
-
- // we are now available again
- conn->req = NULL;
-
-}
-
-/*
* Detach the request
*/
static void memcache_conn_req_done (struct memcache_conn *conn) {
@@ -469,15 +479,11 @@
// we are now available again
conn->req = NULL;
+
+ memcache_server_conn_ready(conn->server, conn);
}
-void memcache_conn_free (struct memcache_conn *conn) {
- // ensure we don't have a req bound to us
- assert(conn->req == NULL);
-
- // ensure that the connection is not considered to be connected anymore
- assert(!conn->is_connected);
-
+void memcache_conn_close (struct memcache_conn *conn) {
// close the fd if needed
if (conn->fd > 0) {
if (close(conn->fd))
@@ -492,9 +498,23 @@
assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
// free the bufferevent
- if (conn->bev)
+ if (conn->bev) {
bufferevent_free(conn->bev);
+ conn->bev = NULL;
+ }
+
+ // not connected anymore
+ conn->is_connected = 0;
+}
+
+void memcache_conn_free (struct memcache_conn *conn) {
+ // ensure we don't have a req bound to us
+ assert(conn->req == NULL);
+
+ // ensure that the connection is not considered to be connected anymore
+ assert(!conn->is_connected);
+
// free it
free(conn);
}
--- a/memcache/memcache.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/memcache.c Thu Aug 28 03:12:11 2008 +0300
@@ -55,7 +55,7 @@
struct memcache_server *server = NULL;
// alloc the request
- if ((req = memcache_req_alloc(mc, MEMCACHE_CMD_FETCH_GET, key, obj, buf, cb_arg)) == NULL)
+ if ((req = memcache_req_alloc(mc, cmd, key, obj, buf, cb_arg)) == NULL)
ERROR("failed to allocate request");
// pick a server
--- a/memcache/request.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/request.c Thu Aug 28 03:12:11 2008 +0300
@@ -19,12 +19,20 @@
// state
req->state = MEMCACHE_STATE_INVALID;
+ // key length?
+ if (key->len == 0)
+ req->key.len = strlen(key->buf);
+ else
+ req->key.len = key->len;
+
+ if (req->key.len == 0)
+ ERROR("zero-length key");
+
// copy the key
- if ((req->key.buf = malloc(key->len)) == NULL)
+ if ((req->key.buf = malloc(req->key.len)) == NULL)
ERROR("malloc key buf");
- memcpy(req->key.buf, key->buf, key->len);
- req->key.len = key->len;
+ memcpy(req->key.buf, key->buf, req->key.len);
// copy the obj if provided
if (obj) {
@@ -32,10 +40,16 @@
req->have_obj = 1;
}
- // copy the buf if provided
+ // copy the buf if provided, and reset the offset to zero
if (buf) {
+ // ensure that it is a valid buffer
+ assert(buf->data && buf->len > 0 && buf->offset == buf->len);
+
memcpy(&req->buf, buf, sizeof(req->buf));
req->have_buf = 1;
+
+ // set offset to zero
+ req->buf.offset = 0;
}
// store the other data
--- a/memcache/server.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache/server.c Thu Aug 28 03:12:11 2008 +0300
@@ -17,6 +17,10 @@
// store the vars
server->endpoint = endpoint;
server->max_connections = max_connections;
+
+ // init lists
+ LIST_INIT(&server->conn_list);
+ TAILQ_INIT(&server->req_queue);
// grow connpool so as to have a connection ready
if (memcache_server_grow_connpool(server))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/strings.c Thu Aug 28 03:12:11 2008 +0300
@@ -0,0 +1,50 @@
+
+#include "../memcache.h"
+
+#define CASE_STR(val) case val: return #val
+#define CASE_DEFAULT(prefix) default: return #prefix "_???"
+
+const char *memcache_command_str (enum memcache_command cmd) {
+ switch (cmd) {
+ CASE_STR(MEMCACHE_CMD_INVALID);
+ CASE_STR(MEMCACHE_CMD_FETCH_GET);
+ CASE_STR(MEMCACHE_CMD_STORE_SET);
+ CASE_STR(MEMCACHE_CMD_STORE_ADD);
+ CASE_STR(MEMCACHE_CMD_STORE_REPLACE);
+ CASE_STR(MEMCACHE_CMD_STORE_APPEND);
+ CASE_STR(MEMCACHE_CMD_STORE_PREPEND);
+ CASE_STR(MEMCACHE_CMD_STORE_CAS);
+ CASE_DEFAULT(MEMCACHE_CMD);
+ }
+}
+
+const char *memcache_reply_str (enum memcache_reply reply) {
+ switch (reply) {
+ CASE_STR(MEMCACHE_RPL_INVALID);
+ CASE_STR(MEMCACHE_RPL_ERROR);
+ CASE_STR(MEMCACHE_RPL_CLIENT_ERROR);
+ CASE_STR(MEMCACHE_RPL_SERVER_ERROR);
+ CASE_STR(MEMCACHE_RPL_VALUE);
+ CASE_STR(MEMCACHE_RPL_END);
+ CASE_STR(MEMCACHE_RPL_STORED);
+ CASE_STR(MEMCACHE_RPL_NOT_STORED);
+ CASE_STR(MEMCACHE_RPL_EXISTS);
+ CASE_STR(MEMCACHE_RPL_NOT_FOUND);
+ CASE_DEFAULT(MEMCACHE_RPL);
+ }
+}
+
+const char *memcache_state_str (enum memcache_req_state state) {
+ switch (state) {
+ CASE_STR(MEMCACHE_STATE_INVALID);
+ CASE_STR(MEMCACHE_STATE_QUEUED);
+ CASE_STR(MEMCACHE_STATE_SEND);
+ CASE_STR(MEMCACHE_STATE_REPLY);
+ CASE_STR(MEMCACHE_STATE_REPLY_DATA);
+ CASE_STR(MEMCACHE_STATE_DONE);
+ CASE_STR(MEMCACHE_STATE_DATA_DONE);
+ CASE_STR(MEMCACHE_STATE_ERROR);
+ CASE_DEFAULT(MEMCACHE_STATE);
+ }
+}
+
--- a/memcache_test.c Thu Aug 28 01:42:28 2008 +0300
+++ b/memcache_test.c Thu Aug 28 03:12:11 2008 +0300
@@ -9,12 +9,38 @@
static struct memcache *mc;
static struct config_endpoint server_endpoint;
+static char *data_1 = "rei4quohV8Oocio1ua0co8ni4Ae1re4houcheixahchoh3ioghie0aShooShoh6Ahboequ9eiX5eashuu6Chu1quo"
+ "o0suph7cheiyai1ea0ooh7Aevoo4feihubupohDeephahwee2Ooz7chiediec7neit7keTh6xuheash8chaeKa5vi"
+ "ekooqu7ooj6Eezooroi6Nequ9ca2yi6iSoigh3loowaey9eiphaphaiJ0souy7wohpa9eXo5Ahu2sa";
+static char *data_2 = "iefaek7ighi5UpueThageish5ieshohyeil1raiceerahjahng5ui7vuzie9quu4dai5ar2aiXi5ieth4looweigi"
+ "e3fo5ieri1queengaiphuaghaic1xahvoo9joo6baiNaig8puCootheowah4moocohDoiquoh3quieka5ao3aeNg9"
+ "Aimei1soangu4Duch5pho5buu2ohzaich4chahz9iTh3Pei4beep1ongie6au1aafoosh2vierei5E";
-int _memcache_cb (struct memcache_req *req, void *arg) {
- return 0;
+void _memcache_cb (struct memcache_req *req, void *arg) {
+ char *key = arg;
+ const struct memcache_obj *obj;
+ const struct memcache_buf *buf;
+
+ INFO("[%s]: cmd=%15s state=%15s reply=%15s", key,
+ memcache_command_str(memcache_req_cmd(req)),
+ memcache_state_str(memcache_req_state(req)),
+ memcache_reply_str(memcache_req_reply(req))
+ );
+
+ if ((obj = memcache_req_obj(req)))
+ INFO("\tobj: flags=0x%04X exptime=%9zu bytes=%6zu cas=%llu", obj->flags, obj->exptime, obj->bytes, obj->cas);
+
+ if ((buf = memcache_req_buf(req)))
+ INFO("\tbuf: data=%p len=%6zu offset=%6zu", buf->data, buf->len, buf->offset);
+
+ INFO("%s", "");
}
void begin_test () {
+ struct memcache_key key_1, key_2;
+ struct memcache_obj obj_1, obj_2;
+ struct memcache_buf buf_1, buf_2;
+
if ((mc = memcache_alloc(&_memcache_cb)) == NULL)
ERROR("memcache_alloc");
@@ -29,8 +55,38 @@
ERROR("memcache_add_server");
// add a request or two
+ key_1.buf = "memcache_test_k1";
+ key_2.buf = "memcache_test_k2";
+ key_1.len = key_2.len = 0;
+
+ obj_1.flags = 0x1A;
+ obj_2.flags = 0x2B;
+
+ obj_1.exptime = 0;
+ obj_2.exptime = 3600;
+
+ obj_1.bytes = strlen(data_1);
+ obj_2.bytes = strlen(data_2);
+
+ buf_1.data = data_1;
+ buf_1.len = strlen(data_1);
+ buf_1.offset = buf_1.len;
+
+ buf_2.data = data_2;
+ buf_2.len = strlen(data_2);
+ buf_2.offset = buf_2.len;
+
+ if (memcache_store(mc, MEMCACHE_CMD_STORE_SET, &key_1, &obj_1, &buf_1, key_1.buf))
+ ERROR("memcache_store: key_1");
+ if (memcache_store(mc, MEMCACHE_CMD_STORE_ADD, &key_2, &obj_2, &buf_2, key_2.buf))
+ ERROR("memcache_store: key_2");
+ if (memcache_fetch(mc, &key_1, key_1.buf))
+ ERROR("memcache_fetch: key_1");
+
+ if (memcache_fetch(mc, &key_2, key_2.buf))
+ ERROR("memcache_fetch: key_2");
error:
return;