sending requests, and partial support for receiving -- incomplete, not tested
authorTero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 21:30:32 +0300
changeset 41 540737bf6bac
parent 40 9cecd22e643a
child 42 0e503189af2f
sending requests, and partial support for receiving -- incomplete, not tested
Makefile
memcache.h
memcache/command.c
memcache/command.h
memcache/command.o
memcache/connection.c
memcache/connection.h
memcache/memcache.c
memcache/request.c
memcache/request.h
--- a/Makefile	Wed Aug 27 10:13:38 2008 +0300
+++ b/Makefile	Wed Aug 27 21:30:32 2008 +0300
@@ -16,7 +16,7 @@
 web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
 coro_test: coro_test.o common.o config.o socket.o
 cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
-memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o socket.o config.o
+memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o memcache/command.o socket.o config.o
 
 clean :
 	-rm *.o ${EXECS}
--- a/memcache.h	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache.h	Wed Aug 27 21:30:32 2008 +0300
@@ -36,22 +36,55 @@
 };
 
 /*
+ * Object data
+ */
+struct memcache_buf {
+    char *data;
+    size_t len;
+    size_t offset;
+};
+
+/*
  * Available commands
  */
 enum memcache_command {
-    FETCH_GET,
-    STORE_SET,
-    STORE_ADD,
-    STORE_REPLACE,
-    STORE_APPEND,
-    STORE_PREPEND,
-    STORE_CAS,
+    MEMCACHE_CMD_INVALID,
+
+    MEMCACHE_CMD_FETCH_GET,
+    MEMCACHE_CMD_STORE_SET,
+    MEMCACHE_CMD_STORE_ADD,
+    MEMCACHE_CMD_STORE_REPLACE,
+    MEMCACHE_CMD_STORE_APPEND,
+    MEMCACHE_CMD_STORE_PREPEND,
+    MEMCACHE_CMD_STORE_CAS,
+
+    MEMCACHE_CMD_MAX,
+};
+
+enum memcache_reply {
+    MEMCACHE_RPL_INVALID,
+
+    MEMCACHE_RPL_ERROR,
+    MEMCACHE_RPL_CLIENT_ERROR,
+    MEMCACHE_RPL_SERVER_ERROR,
+    
+    // MEMCACHE_CMD_FETCH_*
+    MEMCACHE_RPL_VALUE,
+    MEMCACHE_RPL_END,
+    
+    // MEMCACHE_CMD_STORE_*
+    MEMCACHE_RPL_STORED,
+    MEMCACHE_RPL_NOT_STORED,
+    MEMCACHE_RPL_EXISTS,
+    MEMCACHE_RPL_NOT_FOUND,
+
+    MEMCACHE_RPL_MAX,
 };
 
 enum memcache_req_state {
-    STATE_INVALID,
+    MEMCACHE_STATE_INVALID,
 
-    STATE_ERROR,
+    MEMCACHE_STATE_ERROR,
 };
 
 /*
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/command.c	Wed Aug 27 21:30:32 2008 +0300
@@ -0,0 +1,123 @@
+
+#include <assert.h>
+
+#include "command.h"
+#include "../common.h"
+
+static char *memcache_cmd_names[MEMCACHE_CMD_MAX] = {
+    NULL,       // MEMCACHE_CMD_INVALID
+    "get",      // MEMCACHE_CMD_FETCH_GET
+    "set",      // MEMCACHE_CMD_STORE_SET
+    "add",      // MEMCACHE_CMD_STORE_ADD
+    "replace",  // MEMCACHE_CMD_STORE_REPLACE
+    "append",   // MEMCACHE_CMD_STORE_APPEND
+    "prepend"   // MEMCACHE_CMD_STORE_PREPEND
+};
+
+/*
+static struct memcache_reply_info {
+    enum memcache_reply type;
+    char *name;
+    int has_data;
+
+} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
+    MEMCACHE_RPL_INVALID,
+
+    MEMCACHE_RPL_ERROR,
+    MEMCACHE_RPL_CLIENT_ERROR,
+    MEMCACHE_RPL_SERVER_ERROR,
+    
+    // MEMCACHE_CMD_FETCH_*
+    MEMCACHE_RPL_VALUE,
+    MEMCACHE_RPL_END,
+    
+    // MEMCACHE_CMD_STORE_*
+    MEMCACHE_RPL_STORED,
+    MEMCACHE_RPL_NOT_STORED,
+    MEMCACHE_RPL_EXISTS,
+    MEMCACHE_RPL_NOT_FOUND,
+
+};
+
+*/
+
+int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
+    // shouldn't already have a request header yet?
+    assert(cmd->req_header == NULL);
+
+    // allocate the request header
+    if ((cmd->req_header = evbuffer_new()) == NULL)
+        ERROR("evbuffer_new");
+
+    // format the command
+    if (memcache_cmd_format_header(cmd->req_header, cmd_type, key, obj))
+        goto error;
+
+    // XXX: prepare the rest
+
+    // success
+    return 0;
+
+error:    
+    if (cmd->req_header)
+        evbuffer_free(cmd->req_header);
+
+    return -1;
+}
+
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
+    char *cmd_name;
+
+    // valid command
+    assert(cmd_type < MEMCACHE_CMD_MAX);
+
+    if (cmd_type == MEMCACHE_CMD_INVALID)
+        ERROR("invalid command");
+    
+    // map the command to a string
+    cmd_name = memcache_cmd_names[cmd_type];
+
+    // format the request header
+    switch (cmd_type) {
+        case MEMCACHE_CMD_FETCH_GET:
+            assert(key != NULL && obj == NULL);
+            assert(key->len > 0 && key->buf != NULL);
+
+            if (evbuffer_add_printf(buf, "%s %*s\r\n", cmd_name, (int) key->len, key->buf) == -1)
+                ERROR("evbuffer_add_printf");
+
+            break;
+
+        case MEMCACHE_CMD_STORE_SET:
+        case MEMCACHE_CMD_STORE_ADD:
+        case MEMCACHE_CMD_STORE_REPLACE:
+        case MEMCACHE_CMD_STORE_APPEND:
+        case MEMCACHE_CMD_STORE_PREPEND:
+            assert(key != NULL && obj != NULL);
+            assert(key->len > 0 && key->buf != NULL);
+            assert(obj->bytes > 0);
+
+            if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes))
+                ERROR("evbuffer_add_printf");
+
+            break;
+
+        case MEMCACHE_CMD_STORE_CAS:
+        default:
+            // XXX: not supported yet/invalid
+            assert(0);
+    };
+    
+    // success
+    return 0;
+
+error:
+    return -1;
+
+}
+
+int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) {
+    // XXX: implement
+    assert(0);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/command.h	Wed Aug 27 21:30:32 2008 +0300
@@ -0,0 +1,18 @@
+#ifndef MEMCACHE_COMMAND_H
+#define MEMCACHE_COMMAND_H
+
+#include <event2/util.h>
+#include <event2/buffer.h>
+
+#include "../memcache.h"
+
+struct memcache_cmd {
+    struct evbuffer *req_header;
+};
+
+int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data);
+
+#endif /* MEMCACHE_COMMAND_H */
Binary file memcache/command.o has changed
--- a/memcache/connection.c	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.c	Wed Aug 27 21:30:32 2008 +0300
@@ -2,13 +2,20 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#include <errno.h>
 #include <assert.h>
 
 #include "connection.h"
+#include "command.h"
+#include "request.h"
 #include "../socket.h"
 #include "../common.h"
 
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
 
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
     struct memcache_conn *conn = NULL;
@@ -18,7 +25,7 @@
     
     // remember the server
     conn->server = server;
-    
+
     // attempt connect
     if (memcache_conn_connect(conn))
         ERROR("failed to connect to server");
@@ -74,13 +81,49 @@
     // store the req
     conn->req = req;
 
-    // XXX: transmit it
+    // write the request header into our bufferevent's output buffer
+    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+        ERROR("failed to init the cmd");
+    
+    // tell our bufferevent to send it
+    if (bufferevent_enable(conn->bev, EV_WRITE))
+        PERROR("bufferevent_enable");
+    
+    // wait for that to complete
+    return 0;
 
 error:
     return -1;
 }
 
 /*
+ * Start writing out the request data
+ */
+void memcache_conn_send_req_data (struct memcache_conn *conn) {
+    // just fake a call to the event handler
+    _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+}
+
+/*
+ * Start reading a reply from the connection
+ */
+void memcache_conn_handle_reply (struct memcache_conn *conn) {
+    // ensure that we either didn't have a command, or it has been sent
+    assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
+
+    // start reading on the bufferevent
+    if (bufferevent_enable(conn->bev, EV_READ))
+        PERROR("bufferevent_enable");
+
+    // ok, wait for the reply
+    return;
+
+error:
+    // XXX: error handling
+    assert(0);
+}
+
+/*
  * The connect() has finished
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -93,6 +136,15 @@
     if (error)
         ERROR("connect failed: %s", strerror(error));
 
+    // set up the bufferevent
+    if ((conn->bev = bufferevent_new(fd, 
+        &_memcache_conn_bev_read,
+        &_memcache_conn_bev_write,
+        &_memcache_conn_bev_error,
+        conn
+    )) == NULL)
+        ERROR("bufferevent_new");
+
     // mark us as succesfully connected
     conn->is_connected = 1;
 
@@ -107,6 +159,100 @@
     memcache_server_conn_dead(conn->server, conn);
 }
 
+/*
+ * The write buffer is empty, which means that we have written out a command header
+ */
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
+    struct memcache_conn *conn = arg;
+
+    // the command header has been sent
+    assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
+    
+    // does this request have some data to be included in the request?
+    if (conn->req->buf.data > 0) {
+        // we need to send the request data next
+        memcache_conn_send_req_data(conn);
+
+    } else {
+        // wait for a reply
+        memcache_conn_handle_reply(conn);
+    }
+}
+
+/*
+ * We have received some reply data, which should include the complete reply line at some point
+ */
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
+    struct memcache_conn *conn = arg;
+    struct evbuffer *in_buf = bufferevent_get_input(bev);
+    char *header_data;
+    enum memcache_reply reply_type;
+    int has_data;
+    
+    // ensure that we do indeed have some data
+    assert(evbuffer_get_length(in_buf) > 0);
+
+    // attempt to parse the response header
+    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+        ERROR("memcache_cmd_parse_header");
+
+    // XXX: read reply data
+
+error:
+    // XXX: error handling
+    return;
+}
+
+
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
+    // XXX: error handling
+    assert(0);
+}
+
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
+    struct memcache_conn *conn = arg;
+    struct memcache_buf *buf = &conn->req->buf;
+    int ret;
+
+    // correct event
+    assert(event == EV_WRITE);
+
+    // we do indeed have data to send
+    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+    
+    // do the actual write()
+    if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+        PERROR("write");
+
+    // should never be the case... ?
+    if (ret == 0)
+        ERROR("write returned 0 !?!");
+    
+    // did we manage to write some data?
+    if (ret > 0) {
+        // update offset
+        buf->offset += ret;
+    }
+
+    // data left to write?
+    if (buf->offset < buf->len) {
+        // reschedule
+        if (event_add(&conn->ev, NULL))
+            PERROR("event_add");
+
+    } else {
+        // done! We can handle the reply now
+        memcache_conn_handle_reply(conn);
+    }
+
+    // success
+    return;
+
+error:
+    // XXX: error handling
+    assert(0);
+}
+
 void memcache_conn_free (struct memcache_conn *conn) {
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);
--- a/memcache/connection.h	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.h	Wed Aug 27 21:30:32 2008 +0300
@@ -5,8 +5,10 @@
 
 #include <event2/event.h>
 #include <event2/event_struct.h>
+#include <event2/bufferevent.h>
 
 #include "server.h"
+#include "command.h"
 
 struct memcache_conn {
     // the server we are connected to
@@ -21,11 +23,17 @@
     // socket event
     struct event ev;
 
+    // socket bufferevent
+    struct bufferevent *bev;
+
     // have we succesfully connected yet?
     int is_connected;
     
     // the request (if any) that we are currently processing - NULL for idle connections
     struct memcache_req *req;
+
+    // used to track our commands
+    struct memcache_cmd cmd;
 };
 
 /*
--- a/memcache/memcache.c	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/memcache.c	Wed Aug 27 21:30:32 2008 +0300
@@ -3,6 +3,7 @@
 
 #include "memcache.h"
 #include "server.h"
+#include "request.h"
 #include "../common.h"
 
 struct memcache *memcache_alloc (memcache_cb cb_fn) {
@@ -54,7 +55,7 @@
     struct memcache_server *server = NULL;
     
     // alloc the request
-    if ((req = memcache_req_alloc(key, cb_arg)) == NULL)
+    if ((req = memcache_req_alloc(mc, MEMCACHE_CMD_FETCH_GET, key, cb_arg)) == NULL)
         ERROR("failed to allocate request");
 
     // pick a server
@@ -66,12 +67,12 @@
         ERROR("failed to hand over the request for processing");
 
     // success!
-    return 0;
+    return req;
 
 error:
     if (req)
         memcache_req_free(req);
 
-    return -1;
+    return NULL;
 }  
 
--- a/memcache/request.c	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/request.c	Wed Aug 27 21:30:32 2008 +0300
@@ -6,7 +6,7 @@
 #include "memcache.h"
 #include "../common.h"
 
-struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg) {
+struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg) {
     struct memcache_req *req = NULL;
     
     // allocate it
@@ -14,7 +14,7 @@
         ERROR("calloc");
     
     // state
-    req->state = STATE_INVALID;
+    req->state = MEMCACHE_STATE_INVALID;
 
     // copy the key
     if ((req->key.buf = malloc(key->len)) == NULL)
@@ -24,8 +24,9 @@
     memcpy(req->key.buf, key->buf, key->len);
     req->key.len = key->len;
 
-    // store the mc + callback argument
+    // store the other data
     req->mc = mc;
+    req->cmd_type = cmd_type;
     req->cb_arg = cb_arg;
 
     // success
@@ -49,7 +50,7 @@
     req->conn = NULL;
     
     // enter ERROR state
-    req->state = STATE_ERROR;
+    req->state = MEMCACHE_STATE_ERROR;
 
     // notify
     if (_memcache_req_notify(req))
@@ -59,7 +60,7 @@
 void memcache_req_free (struct memcache_req *req) {
     // must be unused
     assert(req->conn == NULL);
-    assert(req->state == STATE_INVALID || req->state == STATE_ERROR);
+    assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR);
 
     free(req->key.buf);
     free(req);
--- a/memcache/request.h	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/request.h	Wed Aug 27 21:30:32 2008 +0300
@@ -10,9 +10,13 @@
     // the memcache context that we belong to
     struct memcache *mc;
 
+    // the command to execute
+    enum memcache_command cmd_type;
+
     // our key/obj
     struct memcache_key key;
-    struct memcache_obj *obj;
+    struct memcache_obj obj;
+    struct memcache_buf buf;
 
     // our state
     enum memcache_req_state state;
@@ -30,7 +34,7 @@
 /*
  * Allocate and return a new req, or NULL if unsuccesfull.
  */
-struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg);
+struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
 
 
 /*