--- a/Makefile Wed Aug 27 10:13:38 2008 +0300
+++ b/Makefile Wed Aug 27 21:30:32 2008 +0300
@@ -16,7 +16,7 @@
web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
coro_test: coro_test.o common.o config.o socket.o
cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
-memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o socket.o config.o
+memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o memcache/command.o socket.o config.o
clean :
-rm *.o ${EXECS}
--- a/memcache.h Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache.h Wed Aug 27 21:30:32 2008 +0300
@@ -36,22 +36,55 @@
};
/*
+ * Object data
+ */
+struct memcache_buf {
+ char *data;
+ size_t len;
+ size_t offset;
+};
+
+/*
* Available commands
*/
enum memcache_command {
- FETCH_GET,
- STORE_SET,
- STORE_ADD,
- STORE_REPLACE,
- STORE_APPEND,
- STORE_PREPEND,
- STORE_CAS,
+ MEMCACHE_CMD_INVALID,
+
+ MEMCACHE_CMD_FETCH_GET,
+ MEMCACHE_CMD_STORE_SET,
+ MEMCACHE_CMD_STORE_ADD,
+ MEMCACHE_CMD_STORE_REPLACE,
+ MEMCACHE_CMD_STORE_APPEND,
+ MEMCACHE_CMD_STORE_PREPEND,
+ MEMCACHE_CMD_STORE_CAS,
+
+ MEMCACHE_CMD_MAX,
+};
+
+enum memcache_reply {
+ MEMCACHE_RPL_INVALID,
+
+ MEMCACHE_RPL_ERROR,
+ MEMCACHE_RPL_CLIENT_ERROR,
+ MEMCACHE_RPL_SERVER_ERROR,
+
+ // MEMCACHE_CMD_FETCH_*
+ MEMCACHE_RPL_VALUE,
+ MEMCACHE_RPL_END,
+
+ // MEMCACHE_CMD_STORE_*
+ MEMCACHE_RPL_STORED,
+ MEMCACHE_RPL_NOT_STORED,
+ MEMCACHE_RPL_EXISTS,
+ MEMCACHE_RPL_NOT_FOUND,
+
+ MEMCACHE_RPL_MAX,
};
enum memcache_req_state {
- STATE_INVALID,
+ MEMCACHE_STATE_INVALID,
- STATE_ERROR,
+ MEMCACHE_STATE_ERROR,
};
/*
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/command.c Wed Aug 27 21:30:32 2008 +0300
@@ -0,0 +1,123 @@
+
+#include <assert.h>
+
+#include "command.h"
+#include "../common.h"
+
+static char *memcache_cmd_names[MEMCACHE_CMD_MAX] = {
+ NULL, // MEMCACHE_CMD_INVALID
+ "get", // MEMCACHE_CMD_FETCH_GET
+ "set", // MEMCACHE_CMD_STORE_SET
+ "add", // MEMCACHE_CMD_STORE_ADD
+ "replace", // MEMCACHE_CMD_STORE_REPLACE
+ "append", // MEMCACHE_CMD_STORE_APPEND
+ "prepend" // MEMCACHE_CMD_STORE_PREPEND
+};
+
+/*
+static struct memcache_reply_info {
+ enum memcache_reply type;
+ char *name;
+ int has_data;
+
+} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
+ MEMCACHE_RPL_INVALID,
+
+ MEMCACHE_RPL_ERROR,
+ MEMCACHE_RPL_CLIENT_ERROR,
+ MEMCACHE_RPL_SERVER_ERROR,
+
+ // MEMCACHE_CMD_FETCH_*
+ MEMCACHE_RPL_VALUE,
+ MEMCACHE_RPL_END,
+
+ // MEMCACHE_CMD_STORE_*
+ MEMCACHE_RPL_STORED,
+ MEMCACHE_RPL_NOT_STORED,
+ MEMCACHE_RPL_EXISTS,
+ MEMCACHE_RPL_NOT_FOUND,
+
+};
+
+*/
+
+int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
+ // shouldn't already have a request header yet?
+ assert(cmd->req_header == NULL);
+
+ // allocate the request header
+ if ((cmd->req_header = evbuffer_new()) == NULL)
+ ERROR("evbuffer_new");
+
+ // format the command
+ if (memcache_cmd_format_header(cmd->req_header, cmd_type, key, obj))
+ goto error;
+
+ // XXX: prepare the rest
+
+ // success
+ return 0;
+
+error:
+ if (cmd->req_header)
+ evbuffer_free(cmd->req_header);
+
+ return -1;
+}
+
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
+ char *cmd_name;
+
+ // valid command
+ assert(cmd_type < MEMCACHE_CMD_MAX);
+
+ if (cmd_type == MEMCACHE_CMD_INVALID)
+ ERROR("invalid command");
+
+ // map the command to a string
+ cmd_name = memcache_cmd_names[cmd_type];
+
+ // format the request header
+ switch (cmd_type) {
+ case MEMCACHE_CMD_FETCH_GET:
+ assert(key != NULL && obj == NULL);
+ assert(key->len > 0 && key->buf != NULL);
+
+ if (evbuffer_add_printf(buf, "%s %*s\r\n", cmd_name, (int) key->len, key->buf) == -1)
+ ERROR("evbuffer_add_printf");
+
+ break;
+
+ case MEMCACHE_CMD_STORE_SET:
+ case MEMCACHE_CMD_STORE_ADD:
+ case MEMCACHE_CMD_STORE_REPLACE:
+ case MEMCACHE_CMD_STORE_APPEND:
+ case MEMCACHE_CMD_STORE_PREPEND:
+ assert(key != NULL && obj != NULL);
+ assert(key->len > 0 && key->buf != NULL);
+ assert(obj->bytes > 0);
+
+ if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes))
+ ERROR("evbuffer_add_printf");
+
+ break;
+
+ case MEMCACHE_CMD_STORE_CAS:
+ default:
+ // XXX: not supported yet/invalid
+ assert(0);
+ };
+
+ // success
+ return 0;
+
+error:
+ return -1;
+
+}
+
+int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) {
+ // XXX: implement
+ assert(0);
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/command.h Wed Aug 27 21:30:32 2008 +0300
@@ -0,0 +1,18 @@
+#ifndef MEMCACHE_COMMAND_H
+#define MEMCACHE_COMMAND_H
+
+#include <event2/util.h>
+#include <event2/buffer.h>
+
+#include "../memcache.h"
+
+struct memcache_cmd {
+ struct evbuffer *req_header;
+};
+
+int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data);
+
+#endif /* MEMCACHE_COMMAND_H */
Binary file memcache/command.o has changed
--- a/memcache/connection.c Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300
@@ -2,13 +2,20 @@
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
+#include <errno.h>
#include <assert.h>
#include "connection.h"
+#include "command.h"
+#include "request.h"
#include "../socket.h"
#include "../common.h"
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -18,7 +25,7 @@
// remember the server
conn->server = server;
-
+
// attempt connect
if (memcache_conn_connect(conn))
ERROR("failed to connect to server");
@@ -74,13 +81,49 @@
// store the req
conn->req = req;
- // XXX: transmit it
+ // write the request header into our bufferevent's output buffer
+ if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+ ERROR("failed to init the cmd");
+
+ // tell our bufferevent to send it
+ if (bufferevent_enable(conn->bev, EV_WRITE))
+ PERROR("bufferevent_enable");
+
+ // wait for that to complete
+ return 0;
error:
return -1;
}
/*
+ * Start writing out the request data
+ */
+void memcache_conn_send_req_data (struct memcache_conn *conn) {
+ // just fake a call to the event handler
+ _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+}
+
+/*
+ * Start reading a reply from the connection
+ */
+void memcache_conn_handle_reply (struct memcache_conn *conn) {
+ // ensure that we either didn't have a command, or it has been sent
+ assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
+
+ // start reading on the bufferevent
+ if (bufferevent_enable(conn->bev, EV_READ))
+ PERROR("bufferevent_enable");
+
+ // ok, wait for the reply
+ return;
+
+error:
+ // XXX: error handling
+ assert(0);
+}
+
+/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -93,6 +136,15 @@
if (error)
ERROR("connect failed: %s", strerror(error));
+ // set up the bufferevent
+ if ((conn->bev = bufferevent_new(fd,
+ &_memcache_conn_bev_read,
+ &_memcache_conn_bev_write,
+ &_memcache_conn_bev_error,
+ conn
+ )) == NULL)
+ ERROR("bufferevent_new");
+
// mark us as succesfully connected
conn->is_connected = 1;
@@ -107,6 +159,100 @@
memcache_server_conn_dead(conn->server, conn);
}
+/*
+ * The write buffer is empty, which means that we have written out a command header
+ */
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
+ struct memcache_conn *conn = arg;
+
+ // the command header has been sent
+ assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
+
+ // does this request have some data to be included in the request?
+ if (conn->req->buf.data > 0) {
+ // we need to send the request data next
+ memcache_conn_send_req_data(conn);
+
+ } else {
+ // wait for a reply
+ memcache_conn_handle_reply(conn);
+ }
+}
+
+/*
+ * We have received some reply data, which should include the complete reply line at some point
+ */
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
+ struct memcache_conn *conn = arg;
+ struct evbuffer *in_buf = bufferevent_get_input(bev);
+ char *header_data;
+ enum memcache_reply reply_type;
+ int has_data;
+
+ // ensure that we do indeed have some data
+ assert(evbuffer_get_length(in_buf) > 0);
+
+ // attempt to parse the response header
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+ ERROR("memcache_cmd_parse_header");
+
+ // XXX: read reply data
+
+error:
+ // XXX: error handling
+ return;
+}
+
+
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
+ // XXX: error handling
+ assert(0);
+}
+
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
+ struct memcache_conn *conn = arg;
+ struct memcache_buf *buf = &conn->req->buf;
+ int ret;
+
+ // correct event
+ assert(event == EV_WRITE);
+
+ // we do indeed have data to send
+ assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+
+ // do the actual write()
+ if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+ PERROR("write");
+
+ // should never be the case... ?
+ if (ret == 0)
+ ERROR("write returned 0 !?!");
+
+ // did we manage to write some data?
+ if (ret > 0) {
+ // update offset
+ buf->offset += ret;
+ }
+
+ // data left to write?
+ if (buf->offset < buf->len) {
+ // reschedule
+ if (event_add(&conn->ev, NULL))
+ PERROR("event_add");
+
+ } else {
+ // done! We can handle the reply now
+ memcache_conn_handle_reply(conn);
+ }
+
+ // success
+ return;
+
+error:
+ // XXX: error handling
+ assert(0);
+}
+
void memcache_conn_free (struct memcache_conn *conn) {
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
--- a/memcache/connection.h Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.h Wed Aug 27 21:30:32 2008 +0300
@@ -5,8 +5,10 @@
#include <event2/event.h>
#include <event2/event_struct.h>
+#include <event2/bufferevent.h>
#include "server.h"
+#include "command.h"
struct memcache_conn {
// the server we are connected to
@@ -21,11 +23,17 @@
// socket event
struct event ev;
+ // socket bufferevent
+ struct bufferevent *bev;
+
// have we succesfully connected yet?
int is_connected;
// the request (if any) that we are currently processing - NULL for idle connections
struct memcache_req *req;
+
+ // used to track our commands
+ struct memcache_cmd cmd;
};
/*
--- a/memcache/memcache.c Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/memcache.c Wed Aug 27 21:30:32 2008 +0300
@@ -3,6 +3,7 @@
#include "memcache.h"
#include "server.h"
+#include "request.h"
#include "../common.h"
struct memcache *memcache_alloc (memcache_cb cb_fn) {
@@ -54,7 +55,7 @@
struct memcache_server *server = NULL;
// alloc the request
- if ((req = memcache_req_alloc(key, cb_arg)) == NULL)
+ if ((req = memcache_req_alloc(mc, MEMCACHE_CMD_FETCH_GET, key, cb_arg)) == NULL)
ERROR("failed to allocate request");
// pick a server
@@ -66,12 +67,12 @@
ERROR("failed to hand over the request for processing");
// success!
- return 0;
+ return req;
error:
if (req)
memcache_req_free(req);
- return -1;
+ return NULL;
}
--- a/memcache/request.c Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/request.c Wed Aug 27 21:30:32 2008 +0300
@@ -6,7 +6,7 @@
#include "memcache.h"
#include "../common.h"
-struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg) {
+struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg) {
struct memcache_req *req = NULL;
// allocate it
@@ -14,7 +14,7 @@
ERROR("calloc");
// state
- req->state = STATE_INVALID;
+ req->state = MEMCACHE_STATE_INVALID;
// copy the key
if ((req->key.buf = malloc(key->len)) == NULL)
@@ -24,8 +24,9 @@
memcpy(req->key.buf, key->buf, key->len);
req->key.len = key->len;
- // store the mc + callback argument
+ // store the other data
req->mc = mc;
+ req->cmd_type = cmd_type;
req->cb_arg = cb_arg;
// success
@@ -49,7 +50,7 @@
req->conn = NULL;
// enter ERROR state
- req->state = STATE_ERROR;
+ req->state = MEMCACHE_STATE_ERROR;
// notify
if (_memcache_req_notify(req))
@@ -59,7 +60,7 @@
void memcache_req_free (struct memcache_req *req) {
// must be unused
assert(req->conn == NULL);
- assert(req->state == STATE_INVALID || req->state == STATE_ERROR);
+ assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR);
free(req->key.buf);
free(req);
--- a/memcache/request.h Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/request.h Wed Aug 27 21:30:32 2008 +0300
@@ -10,9 +10,13 @@
// the memcache context that we belong to
struct memcache *mc;
+ // the command to execute
+ enum memcache_command cmd_type;
+
// our key/obj
struct memcache_key key;
- struct memcache_obj *obj;
+ struct memcache_obj obj;
+ struct memcache_buf buf;
// our state
enum memcache_req_state state;
@@ -30,7 +34,7 @@
/*
* Allocate and return a new req, or NULL if unsuccesfull.
*/
-struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg);
+struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
/*