# HG changeset patch # User Tero Marttila # Date 1219861832 -10800 # Node ID 540737bf6bac2907b08eddbeadd237d313d42163 # Parent 9cecd22e643aad328334bf3de954d14f44793807 sending requests, and partial support for receiving -- incomplete, not tested diff -r 9cecd22e643a -r 540737bf6bac Makefile --- a/Makefile Wed Aug 27 10:13:38 2008 +0300 +++ b/Makefile Wed Aug 27 21:30:32 2008 +0300 @@ -16,7 +16,7 @@ web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o coro_test: coro_test.o common.o config.o socket.o cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o -memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o socket.o config.o +memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o memcache/command.o socket.o config.o clean : -rm *.o ${EXECS} diff -r 9cecd22e643a -r 540737bf6bac memcache.h --- a/memcache.h Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache.h Wed Aug 27 21:30:32 2008 +0300 @@ -36,22 +36,55 @@ }; /* + * Object data + */ +struct memcache_buf { + char *data; + size_t len; + size_t offset; +}; + +/* * Available commands */ enum memcache_command { - FETCH_GET, - STORE_SET, - STORE_ADD, - STORE_REPLACE, - STORE_APPEND, - STORE_PREPEND, - STORE_CAS, + MEMCACHE_CMD_INVALID, + + MEMCACHE_CMD_FETCH_GET, + MEMCACHE_CMD_STORE_SET, + MEMCACHE_CMD_STORE_ADD, + MEMCACHE_CMD_STORE_REPLACE, + MEMCACHE_CMD_STORE_APPEND, + MEMCACHE_CMD_STORE_PREPEND, + MEMCACHE_CMD_STORE_CAS, + + MEMCACHE_CMD_MAX, +}; + +enum memcache_reply { + MEMCACHE_RPL_INVALID, + + MEMCACHE_RPL_ERROR, + MEMCACHE_RPL_CLIENT_ERROR, + MEMCACHE_RPL_SERVER_ERROR, + + // MEMCACHE_CMD_FETCH_* + MEMCACHE_RPL_VALUE, + MEMCACHE_RPL_END, + + // MEMCACHE_CMD_STORE_* + MEMCACHE_RPL_STORED, + MEMCACHE_RPL_NOT_STORED, + MEMCACHE_RPL_EXISTS, + MEMCACHE_RPL_NOT_FOUND, + + MEMCACHE_RPL_MAX, }; enum memcache_req_state { - STATE_INVALID, + MEMCACHE_STATE_INVALID, - STATE_ERROR, + MEMCACHE_STATE_ERROR, }; /* diff -r 9cecd22e643a -r 540737bf6bac memcache/command.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/memcache/command.c Wed Aug 27 21:30:32 2008 +0300 @@ -0,0 +1,123 @@ + +#include + +#include "command.h" +#include "../common.h" + +static char *memcache_cmd_names[MEMCACHE_CMD_MAX] = { + NULL, // MEMCACHE_CMD_INVALID + "get", // MEMCACHE_CMD_FETCH_GET + "set", // MEMCACHE_CMD_STORE_SET + "add", // MEMCACHE_CMD_STORE_ADD + "replace", // MEMCACHE_CMD_STORE_REPLACE + "append", // MEMCACHE_CMD_STORE_APPEND + "prepend" // MEMCACHE_CMD_STORE_PREPEND +}; + +/* +static struct memcache_reply_info { + enum memcache_reply type; + char *name; + int has_data; + +} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = { + MEMCACHE_RPL_INVALID, + + MEMCACHE_RPL_ERROR, + MEMCACHE_RPL_CLIENT_ERROR, + MEMCACHE_RPL_SERVER_ERROR, + + // MEMCACHE_CMD_FETCH_* + MEMCACHE_RPL_VALUE, + MEMCACHE_RPL_END, + + // MEMCACHE_CMD_STORE_* + MEMCACHE_RPL_STORED, + MEMCACHE_RPL_NOT_STORED, + MEMCACHE_RPL_EXISTS, + MEMCACHE_RPL_NOT_FOUND, + +}; + +*/ + +int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) { + // shouldn't already have a request header yet? + assert(cmd->req_header == NULL); + + // allocate the request header + if ((cmd->req_header = evbuffer_new()) == NULL) + ERROR("evbuffer_new"); + + // format the command + if (memcache_cmd_format_header(cmd->req_header, cmd_type, key, obj)) + goto error; + + // XXX: prepare the rest + + // success + return 0; + +error: + if (cmd->req_header) + evbuffer_free(cmd->req_header); + + return -1; +} + +int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) { + char *cmd_name; + + // valid command + assert(cmd_type < MEMCACHE_CMD_MAX); + + if (cmd_type == MEMCACHE_CMD_INVALID) + ERROR("invalid command"); + + // map the command to a string + cmd_name = memcache_cmd_names[cmd_type]; + + // format the request header + switch (cmd_type) { + case MEMCACHE_CMD_FETCH_GET: + assert(key != NULL && obj == NULL); + assert(key->len > 0 && key->buf != NULL); + + if (evbuffer_add_printf(buf, "%s %*s\r\n", cmd_name, (int) key->len, key->buf) == -1) + ERROR("evbuffer_add_printf"); + + break; + + case MEMCACHE_CMD_STORE_SET: + case MEMCACHE_CMD_STORE_ADD: + case MEMCACHE_CMD_STORE_REPLACE: + case MEMCACHE_CMD_STORE_APPEND: + case MEMCACHE_CMD_STORE_PREPEND: + assert(key != NULL && obj != NULL); + assert(key->len > 0 && key->buf != NULL); + assert(obj->bytes > 0); + + if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes)) + ERROR("evbuffer_add_printf"); + + break; + + case MEMCACHE_CMD_STORE_CAS: + default: + // XXX: not supported yet/invalid + assert(0); + }; + + // success + return 0; + +error: + return -1; + +} + +int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) { + // XXX: implement + assert(0); +} + diff -r 9cecd22e643a -r 540737bf6bac memcache/command.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/memcache/command.h Wed Aug 27 21:30:32 2008 +0300 @@ -0,0 +1,18 @@ +#ifndef MEMCACHE_COMMAND_H +#define MEMCACHE_COMMAND_H + +#include +#include + +#include "../memcache.h" + +struct memcache_cmd { + struct evbuffer *req_header; +}; + +int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj); + +int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj); +int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data); + +#endif /* MEMCACHE_COMMAND_H */ diff -r 9cecd22e643a -r 540737bf6bac memcache/command.o Binary file memcache/command.o has changed diff -r 9cecd22e643a -r 540737bf6bac memcache/connection.c --- a/memcache/connection.c Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300 @@ -2,13 +2,20 @@ #include #include #include +#include #include #include "connection.h" +#include "command.h" +#include "request.h" #include "../socket.h" #include "../common.h" static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); +static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); +static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); +static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); +static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); struct memcache_conn *memcache_conn_open (struct memcache_server *server) { struct memcache_conn *conn = NULL; @@ -18,7 +25,7 @@ // remember the server conn->server = server; - + // attempt connect if (memcache_conn_connect(conn)) ERROR("failed to connect to server"); @@ -74,13 +81,49 @@ // store the req conn->req = req; - // XXX: transmit it + // write the request header into our bufferevent's output buffer + if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) + ERROR("failed to init the cmd"); + + // tell our bufferevent to send it + if (bufferevent_enable(conn->bev, EV_WRITE)) + PERROR("bufferevent_enable"); + + // wait for that to complete + return 0; error: return -1; } /* + * Start writing out the request data + */ +void memcache_conn_send_req_data (struct memcache_conn *conn) { + // just fake a call to the event handler + _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); +} + +/* + * Start reading a reply from the connection + */ +void memcache_conn_handle_reply (struct memcache_conn *conn) { + // ensure that we either didn't have a command, or it has been sent + assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); + + // start reading on the bufferevent + if (bufferevent_enable(conn->bev, EV_READ)) + PERROR("bufferevent_enable"); + + // ok, wait for the reply + return; + +error: + // XXX: error handling + assert(0); +} + +/* * The connect() has finished */ static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { @@ -93,6 +136,15 @@ if (error) ERROR("connect failed: %s", strerror(error)); + // set up the bufferevent + if ((conn->bev = bufferevent_new(fd, + &_memcache_conn_bev_read, + &_memcache_conn_bev_write, + &_memcache_conn_bev_error, + conn + )) == NULL) + ERROR("bufferevent_new"); + // mark us as succesfully connected conn->is_connected = 1; @@ -107,6 +159,100 @@ memcache_server_conn_dead(conn->server, conn); } +/* + * The write buffer is empty, which means that we have written out a command header + */ +static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { + struct memcache_conn *conn = arg; + + // the command header has been sent + assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); + + // does this request have some data to be included in the request? + if (conn->req->buf.data > 0) { + // we need to send the request data next + memcache_conn_send_req_data(conn); + + } else { + // wait for a reply + memcache_conn_handle_reply(conn); + } +} + +/* + * We have received some reply data, which should include the complete reply line at some point + */ +static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { + struct memcache_conn *conn = arg; + struct evbuffer *in_buf = bufferevent_get_input(bev); + char *header_data; + enum memcache_reply reply_type; + int has_data; + + // ensure that we do indeed have some data + assert(evbuffer_get_length(in_buf) > 0); + + // attempt to parse the response header + if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data)) + ERROR("memcache_cmd_parse_header"); + + // XXX: read reply data + +error: + // XXX: error handling + return; +} + + +static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { + // XXX: error handling + assert(0); +} + +static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { + struct memcache_conn *conn = arg; + struct memcache_buf *buf = &conn->req->buf; + int ret; + + // correct event + assert(event == EV_WRITE); + + // we do indeed have data to send + assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); + + // do the actual write() + if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) + PERROR("write"); + + // should never be the case... ? + if (ret == 0) + ERROR("write returned 0 !?!"); + + // did we manage to write some data? + if (ret > 0) { + // update offset + buf->offset += ret; + } + + // data left to write? + if (buf->offset < buf->len) { + // reschedule + if (event_add(&conn->ev, NULL)) + PERROR("event_add"); + + } else { + // done! We can handle the reply now + memcache_conn_handle_reply(conn); + } + + // success + return; + +error: + // XXX: error handling + assert(0); +} + void memcache_conn_free (struct memcache_conn *conn) { // ensure that the connection is not considered to be connected anymore assert(!conn->is_connected); diff -r 9cecd22e643a -r 540737bf6bac memcache/connection.h --- a/memcache/connection.h Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/connection.h Wed Aug 27 21:30:32 2008 +0300 @@ -5,8 +5,10 @@ #include #include +#include #include "server.h" +#include "command.h" struct memcache_conn { // the server we are connected to @@ -21,11 +23,17 @@ // socket event struct event ev; + // socket bufferevent + struct bufferevent *bev; + // have we succesfully connected yet? int is_connected; // the request (if any) that we are currently processing - NULL for idle connections struct memcache_req *req; + + // used to track our commands + struct memcache_cmd cmd; }; /* diff -r 9cecd22e643a -r 540737bf6bac memcache/memcache.c --- a/memcache/memcache.c Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/memcache.c Wed Aug 27 21:30:32 2008 +0300 @@ -3,6 +3,7 @@ #include "memcache.h" #include "server.h" +#include "request.h" #include "../common.h" struct memcache *memcache_alloc (memcache_cb cb_fn) { @@ -54,7 +55,7 @@ struct memcache_server *server = NULL; // alloc the request - if ((req = memcache_req_alloc(key, cb_arg)) == NULL) + if ((req = memcache_req_alloc(mc, MEMCACHE_CMD_FETCH_GET, key, cb_arg)) == NULL) ERROR("failed to allocate request"); // pick a server @@ -66,12 +67,12 @@ ERROR("failed to hand over the request for processing"); // success! - return 0; + return req; error: if (req) memcache_req_free(req); - return -1; + return NULL; } diff -r 9cecd22e643a -r 540737bf6bac memcache/request.c --- a/memcache/request.c Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/request.c Wed Aug 27 21:30:32 2008 +0300 @@ -6,7 +6,7 @@ #include "memcache.h" #include "../common.h" -struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg) { +struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg) { struct memcache_req *req = NULL; // allocate it @@ -14,7 +14,7 @@ ERROR("calloc"); // state - req->state = STATE_INVALID; + req->state = MEMCACHE_STATE_INVALID; // copy the key if ((req->key.buf = malloc(key->len)) == NULL) @@ -24,8 +24,9 @@ memcpy(req->key.buf, key->buf, key->len); req->key.len = key->len; - // store the mc + callback argument + // store the other data req->mc = mc; + req->cmd_type = cmd_type; req->cb_arg = cb_arg; // success @@ -49,7 +50,7 @@ req->conn = NULL; // enter ERROR state - req->state = STATE_ERROR; + req->state = MEMCACHE_STATE_ERROR; // notify if (_memcache_req_notify(req)) @@ -59,7 +60,7 @@ void memcache_req_free (struct memcache_req *req) { // must be unused assert(req->conn == NULL); - assert(req->state == STATE_INVALID || req->state == STATE_ERROR); + assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR); free(req->key.buf); free(req); diff -r 9cecd22e643a -r 540737bf6bac memcache/request.h --- a/memcache/request.h Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/request.h Wed Aug 27 21:30:32 2008 +0300 @@ -10,9 +10,13 @@ // the memcache context that we belong to struct memcache *mc; + // the command to execute + enum memcache_command cmd_type; + // our key/obj struct memcache_key key; - struct memcache_obj *obj; + struct memcache_obj obj; + struct memcache_buf buf; // our state enum memcache_req_state state; @@ -30,7 +34,7 @@ /* * Allocate and return a new req, or NULL if unsuccesfull. */ -struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg); +struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg); /*