fix doc tpyos, rename some enums, fix printf format len for non-zero terminated strings (hg status), pass args to memcache_cmd_format_header via memcache_req_*, handle zero-length STORE requests, memcache_req is_buf_ours + free, other function name typos (keymemcache_req_key), fix req state behaviour re *_DATA_* for STORE requests and FETCH/END, better memcache_server connpool events/management, modular memcache_test with a working benchmark. This is a long commit message.
authorTero Marttila <terom@fixme.fi>
Fri, 29 Aug 2008 23:31:17 +0300
changeset 48 1c67f512779b
parent 47 a5c09677ca6f
child 49 10c7dce1a043
fix doc tpyos, rename some enums, fix printf format len for non-zero terminated strings (hg status), pass args to memcache_cmd_format_header via memcache_req_*, handle zero-length STORE requests, memcache_req is_buf_ours + free, other function name typos (keymemcache_req_key), fix req state behaviour re *_DATA_* for STORE requests and FETCH/END, better memcache_server connpool events/management, modular memcache_test with a working benchmark. This is a long commit message.
memcache.h
memcache/command.c
memcache/command.h
memcache/command.o
memcache/connection.c
memcache/request.c
memcache/request.h
memcache/server.c
memcache/server.h
memcache/strings.c
memcache_test.c
socket.c
--- a/memcache.h	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache.h	Fri Aug 29 23:31:17 2008 +0300
@@ -43,7 +43,7 @@
     /*
      * The length of the key buffer in bytes. This does not include a NUL byte.
      *
-     * If this is given as zero, then the length will be calculsted from buf using strlen(). Empty keys are not
+     * If this is given as zero, then the length will be calculated from buf using strlen(). Empty keys are not
      * allowed, so this will result in an error if buf is an empty string.
      */
     size_t len;
@@ -54,7 +54,7 @@
  */
 struct memcache_obj {
     /*
-     * An arbitrary 16-bit (32-bit for >1.2.1) that the server stores transparently.
+     * An arbitrary 16-bit (32-bit for >1.2.1) integer that the server stores transparently.
      */
     unsigned int flags;
 
@@ -230,7 +230,7 @@
 /*
  * Request states
  */
-enum memcache_req_state {
+enum memcache_state {
     MEMCACHE_STATE_INVALID,
     
     /*
@@ -269,7 +269,7 @@
      *
      * req_reply, req_obj and req_buf will all return non-NULL values, and buf.offset will be equal to buf.len.
      */
-    MEMCACHE_STATE_DATA_DONE,
+    MEMCACHE_STATE_DONE_DATA,
     
     /*
      * An error has occurred.
@@ -312,10 +312,12 @@
  *  ---------------------------------------------
  *  STATE_QUEUE                 ?
  *  STATE_SEND
- *  STATE_REPLY                         RPL_VALUE
+ *      STATE_REPLY                     RPL_VALUE
  *      STATE_REPLY_DATA        *       RPL_VALUE
- *      STATE_DATA_DONE                 RPL_END
+ *      STATE_REPLY_DATA                RPL_END
+ *      STATE_DONE_DATA                 RPL_END
  *
+ *      STATE_REPLY                     RPL_END
  *      STATE_DONE                      RPL_END
  *
  *  STATE_ERROR                         RPL_{ERROR,CLIENT_ERROR,SERVER_ERROR}
@@ -353,7 +355,7 @@
  *
  * Should always return a valid value.
  */ 
-enum memcache_req_state memcache_req_state (struct memcache_req *req);
+enum memcache_state memcache_req_state (struct memcache_req *req);
 
 /*
  * Request command.
@@ -365,7 +367,7 @@
 /*
  * Request reply.
  *
- * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DATA_DONE states.
+ * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DONE_DATA states.
  */
 enum memcache_reply memcache_req_reply (struct memcache_req *req);
 
@@ -374,19 +376,19 @@
  *
  * Will return a valid valuein all states
  */
-const struct memcache_key *keymemcache_req_key (struct memcache_req *req);
+const struct memcache_key *memcache_req_key (struct memcache_req *req);
 
 /*
  * Request data.
  *
- * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DATA_DONE states.
+ * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DONE_DATA states.
  */
 const struct memcache_obj *memcache_req_obj (struct memcache_req *req);
 
 /*
  * Request buf.
  *
- * Will return a valid value in the STATE_REPLY_DATA and STATE_DATA_DONE states.
+ * Will return a valid value in the STATE_REPLY_DATA and STATE_DONE_DATA states.
  *
  * Note that buf.offset may be less than buf.len in the STATE_REPLY_DATA state.
  */
@@ -402,7 +404,7 @@
 void memcache_req_abort (struct memcache_req *req);
 
 /*
- * Free a req that is in the STATE_DONE, STATE_DATA_DONE or STATE_ERROR state.
+ * Free a req that is in the STATE_DONE, STATE_DONE_DATA or STATE_ERROR state.
  */
 void memcache_req_free (struct memcache_req *req);
 
@@ -411,6 +413,6 @@
  */
 const char *memcache_command_str (enum memcache_command cmd);
 const char *memcache_reply_str (enum memcache_reply reply);
-const char *memcache_state_str (enum memcache_req_state state);
+const char *memcache_state_str (enum memcache_state state);
 
 #endif /* MEMCACHE_H */
--- a/memcache/command.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/command.c	Fri Aug 29 23:31:17 2008 +0300
@@ -62,7 +62,7 @@
     return -1;
 }
 
-int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, const struct memcache_key *key, const struct memcache_obj *obj) {
     char *cmd_name;
 
     // valid command
@@ -80,7 +80,7 @@
             assert(key != NULL && obj == NULL);
             assert(key->len > 0 && key->buf != NULL);
 
-            if (evbuffer_add_printf(buf, "%s %*s\r\n", cmd_name, (int) key->len, key->buf) == -1)
+            if (evbuffer_add_printf(buf, "%s %.*s\r\n", cmd_name, (int) key->len, key->buf) == -1)
                 ERROR("evbuffer_add_printf");
 
             break;
@@ -94,7 +94,7 @@
             assert(key->len > 0 && key->buf != NULL);
             // XXX: ensure that we have a valid buf
 
-            if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes) == -1)
+            if (evbuffer_add_printf(buf, "%s %.*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes) == -1)
                 ERROR("evbuffer_add_printf");
 
             break;
--- a/memcache/command.h	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/command.h	Fri Aug 29 23:31:17 2008 +0300
@@ -17,7 +17,7 @@
  *
  * This must be atomic, so if it fails, it must not modify buf.
  */
-int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, const struct memcache_key *key, const struct memcache_obj *obj);
 
 /*
  * Attempt to parse a response line from the given buf. *header_data will be set to NULL if no complete response line
Binary file memcache/command.o has changed
--- a/memcache/connection.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/connection.c	Fri Aug 29 23:31:17 2008 +0300
@@ -11,6 +11,12 @@
 #include "../socket.h"
 #include "../common.h"
 
+
+void memcache_conn_send_req_data (struct memcache_conn *conn);
+void memcache_conn_finish_req_data (struct memcache_conn *conn);
+void memcache_conn_handle_reply (struct memcache_conn *conn);
+void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf);
+
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
@@ -85,7 +91,11 @@
     assert(conn->req == NULL);
 
     // write the request header into our bufferevent's output buffer
-    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
+    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
+        memcache_req_cmd(req),
+        memcache_req_key(req),
+        memcache_req_obj(req)
+    )) {
         ERROR("failed to init the cmd");
     }
     
@@ -114,11 +124,17 @@
  * Start writing out the request data
  */
 void memcache_conn_send_req_data (struct memcache_conn *conn) {
-    // set up the ev_write
-    event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
+    if (conn->req->obj.bytes > 0) {
+        // set up the ev_write
+        event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
 
-    // just fake a call to the event handler
-    _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+        // just fake a call to the event handler
+        _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+
+    } else {
+        // just send the \r\n
+        memcache_conn_finish_req_data(conn);
+    }
 }
 
 /*
@@ -167,6 +183,8 @@
 
     // bytes *may* be zero if we have an empty cache entry
     if (conn->req->obj.bytes > 0) {
+        // XXX: memcache_req_make_buffer?
+
         // allocate a buffer for the reply data
         if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
             ERROR("malloc");
@@ -176,6 +194,10 @@
 
         // set offset to zero
         conn->req->buf.offset = 0;
+        
+        // and note that it is present, and is ours
+        conn->req->have_buf = 1;
+        conn->req->is_buf_ours = 1;
 
         // do we have any data in the buf that we need to copy?
         if (evbuffer_get_length(buf) > 0) {
@@ -453,7 +475,6 @@
     if (conn->req) {
         // error out the req
         memcache_req_error(conn->req);
-        assert(conn->req->conn == NULL);
         
         // we are now available again
         conn->req = NULL;
@@ -473,10 +494,9 @@
     // ensure that we do currently have a req
     assert(conn->req);
     
-    // have the req detach and check it did so
+    // have the req detach
     memcache_req_done(conn->req);
-    assert(conn->req->conn == NULL);
-    
+
     // we are now available again
     conn->req = NULL;
     
--- a/memcache/request.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/request.c	Fri Aug 29 23:31:17 2008 +0300
@@ -47,6 +47,7 @@
 
         memcpy(&req->buf, buf, sizeof(req->buf));
         req->have_buf = 1;
+        req->is_buf_ours = 0;
 
         // set offset to zero
         req->buf.offset = 0;
@@ -70,7 +71,7 @@
 }
 
 // accessors
-enum memcache_req_state memcache_req_state (struct memcache_req *req) {
+enum memcache_state memcache_req_state (struct memcache_req *req) {
     return req->state;
 }
 
@@ -82,7 +83,7 @@
     return req->reply_type;
 }
 
-const struct memcache_key *keymemcache_req_key (struct memcache_req *req) {
+const struct memcache_key *memcache_req_key (struct memcache_req *req) {
     return &req->key;
 }
 
@@ -112,7 +113,8 @@
 }
 
 void memcache_req_recv (struct memcache_req *req, enum memcache_reply reply_type) {
-    req->state = MEMCACHE_STATE_REPLY;
+    // set state to REPLY_DATA/REPLY based on have_buf/is_buf_ours
+    req->state = (req->have_buf && req->is_buf_ours) ? MEMCACHE_STATE_REPLY_DATA : MEMCACHE_STATE_REPLY;
     req->reply_type = reply_type;
 
     // we must surely have a valid obj now
@@ -136,21 +138,8 @@
     // make sure we are in the REPLY/REPLY_DATA state
     assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA);
     
-    // are we supposed to have data?
-    if (req->buf.data) {
-        // make sure we really have the full data, if applicable
-        assert(req->buf.offset == req->buf.len);
-        
-        // yes...
-        req->have_buf = 1;
-
-        // have data
-        req->state = MEMCACHE_STATE_DATA_DONE;
-
-    } else {
-        // no data
-        req->state = MEMCACHE_STATE_DONE;
-    }
+    // set state to DONE_DATA/DONE based on have_buf/is_buf_ours
+    req->state = (req->have_buf && req->is_buf_ours) ? MEMCACHE_STATE_DONE_DATA : MEMCACHE_STATE_DONE;
 
     // forget the connection
     req->conn = NULL;
@@ -170,7 +159,10 @@
 void memcache_req_free (struct memcache_req *req) {
     // must be unused
     assert(req->conn == NULL);
-    assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DATA_DONE);
+    assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DONE_DATA);
+    
+    if (req->have_buf && req->is_buf_ours)
+        free(req->buf.data);
 
     free(req->key.buf);
     free(req);
--- a/memcache/request.h	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/request.h	Fri Aug 29 23:31:17 2008 +0300
@@ -21,11 +21,11 @@
     struct memcache_obj obj;
     struct memcache_buf buf;
 
-    // flags to indicate if we have valid obj/buf data
-    char have_obj, have_buf;
+    // flags to indicate if we have valid obj/buf data, and if buf.data was allocated by us or supplied by the user
+    char have_obj, have_buf, is_buf_ours;
 
     // our state
-    enum memcache_req_state state;
+    enum memcache_state state;
 
     // our user callback argument
     void *cb_arg;
@@ -81,12 +81,15 @@
  * The request was sent and is now done, and is not associated with the connection anymore.
  *
  * This will be called before req_reply/req_data, but not before/after req_error.
+ *
+ * The request will not be accessed anymore after calling this.
  */
 void memcache_req_done (struct memcache_req *req);
 
 /*
- * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
- * anymore, and the req should not be accessed by memcache_* code after this.
+ * An error occurred, and the request must be abandoned. The request is not active or enqueued anymore.
+ *
+ * The request will not be accessed anymore after calling this.
  */
 void memcache_req_error (struct memcache_req *req);
 
--- a/memcache/server.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/server.c	Fri Aug 29 23:31:17 2008 +0300
@@ -23,8 +23,7 @@
     TAILQ_INIT(&server->req_queue);
     
     // grow connpool so as to have a connection ready
-    if (memcache_server_grow_connpool(server))
-        ERROR("error opening initial connection");
+    memcache_server_grow_connpool(server);
 
     // success
     return server;
@@ -35,34 +34,43 @@
     return NULL;
 }
 
-int memcache_server_grow_connpool (struct memcache_server *server) {
-   struct memcache_conn *conn;
-   int count;
+void memcache_server_grow_connpool (struct memcache_server *server) {
+    struct memcache_conn *conn;
+    int count = 0;
 
-   // count connections
-   for (count = 0, conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next, count++) ;
+    // count connections
+    LIST_FOREACH(conn, &server->conn_list, connlist_node) {
+        count++;
+    }
 
-   // room for more?
-   if (count < server->max_connections) {
+    // room for more?
+    if (count < server->max_connections) {
         // create a new one
         if ((conn = memcache_conn_open(server)) == NULL)
-            return -1;
-        
+            ERROR("failed to grow the connpool");
+       
         // enlist it
         LIST_INSERT_HEAD(&server->conn_list, conn, connlist_node);
 
         // the connection will call memcache_server_coon_ready once it's ready for use...
-   }
-    
-   // success
-   return 0;
+    }
+
+    // ok
+    return;
+
+error:
+    // XXX: we might be deadlocked now... requests queued, but no connections!
+    if (LIST_EMPTY(&server->conn_list) && !TAILQ_EMPTY(&server->req_queue))
+        FATAL("deadlock; requests queued, but no connections");
+
+    // XXX: harmless... but need some retry logic
 }
 
 int memcache_server_add_req (struct memcache_server *server, struct memcache_req *req) {
     struct memcache_conn *conn;
     
     // look for an idle connection
-    for (conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next) {
+    LIST_FOREACH(conn, &server->conn_list, connlist_node) {
         if (memcache_conn_is_available(conn)) {
             // we found an idle connection
             break;
@@ -85,34 +93,60 @@
         // notify the req
         memcache_req_queued(req);
 
+        // grow the connpool, as we apparently don't have enough connections
+        memcache_server_grow_connpool(server);
+
         return 0;
     }
 }
 
+/*
+ * We might have available connections, process any queued requests, or try and grow the connpool if non available
+ */
+void memache_server_dequeue (struct memcache_server *server) {
+    struct memcache_conn *conn;
+    struct memcache_req *req;
+
+    // if no requests are queued, nothing needs doing
+    if ((req = TAILQ_FIRST(&server->req_queue)) == NULL)
+        return;
+    
+    // look for idle connections to service the request
+    LIST_FOREACH(conn, &server->conn_list, connlist_node) {
+        if (memcache_conn_is_available(conn)) {
+            // remove the req from the queue and execute it
+            TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
+            
+            // this will take care of any error handling by itself
+            memcache_conn_do_req(conn, req);
+            
+            // if that was the last req, return, otherwise continue
+            if ((req = TAILQ_FIRST(&server->req_queue)) == NULL)
+                return;
+        }
+    }
+    
+    // no idle connections remaining, try and grow if applicable
+    memcache_server_grow_connpool(server);
+}
+
 void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn) {
     assert(server == conn->server);
-
-    // do we have any queued requests waiting?
-    struct memcache_req *req = server->req_queue.tqh_first;
-
-    if (req) {
-        // remove it from the queue and execute it
-        TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
-        
-        // this will take care of any error handling by itself
-        memcache_conn_do_req(conn, req);
-    }
+    
+    // grab the next queued request
+    memache_server_dequeue(server);
 }
 
 void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn) {
     assert(server == conn->server);
 
-    // XXX: reconnect/error out requests?
-    
     // remove it from the list
     LIST_REMOVE(conn, connlist_node);
 
     // free it
     memcache_conn_free(conn);
+
+    // this should grow the connpool back again
+    memache_server_dequeue(server);
 }
 
--- a/memcache/server.h	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/server.h	Fri Aug 29 23:31:17 2008 +0300
@@ -31,7 +31,7 @@
  * Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections,
  * otherwise the new connection will be opened and added to the conn_list.
  */
-int memcache_server_grow_connpool (struct memcache_server *server);
+void memcache_server_grow_connpool (struct memcache_server *server);
 
 /*
  * Process the given request on this server.
--- a/memcache/strings.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache/strings.c	Fri Aug 29 23:31:17 2008 +0300
@@ -34,7 +34,7 @@
     }
 }
 
-const char *memcache_state_str (enum memcache_req_state state) {
+const char *memcache_state_str (enum memcache_state state) {
     switch (state) {
         CASE_STR(MEMCACHE_STATE_INVALID);
         CASE_STR(MEMCACHE_STATE_QUEUED);
@@ -42,7 +42,7 @@
         CASE_STR(MEMCACHE_STATE_REPLY);
         CASE_STR(MEMCACHE_STATE_REPLY_DATA);
         CASE_STR(MEMCACHE_STATE_DONE);
-        CASE_STR(MEMCACHE_STATE_DATA_DONE);
+        CASE_STR(MEMCACHE_STATE_DONE_DATA);
         CASE_STR(MEMCACHE_STATE_ERROR);
         CASE_DEFAULT(MEMCACHE_STATE);
     }
--- a/memcache_test.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/memcache_test.c	Fri Aug 29 23:31:17 2008 +0300
@@ -1,3 +1,11 @@
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <sys/time.h>
+#include <time.h>
+#include <ctype.h>
+#include <assert.h>
 
 #include <event2/event.h>
 #include <event2/event_compat.h>
@@ -7,6 +15,12 @@
 #include "config.h"
 #include "common.h"
 
+/*
+ * Test:
+ *  * zero-length entries
+ */
+
+static struct event_base *ev_base;
 static struct memcache *mc;
 static struct config_endpoint server_endpoint;
 static char *data_1 = "rei4quohV8Oocio1ua0co8ni4Ae1re4houcheixahchoh3ioghie0aShooShoh6Ahboequ9eiX5eashuu6Chu1quo"
@@ -15,33 +29,134 @@
 static char *data_2 = "iefaek7ighi5UpueThageish5ieshohyeil1raiceerahjahng5ui7vuzie9quu4dai5ar2aiXi5ieth4looweigi"
                             "e3fo5ieri1queengaiphuaghaic1xahvoo9joo6baiNaig8puCootheowah4moocohDoiquoh3quieka5ao3aeNg9"
                             "Aimei1soangu4Duch5pho5buu2ohzaich4chahz9iTh3Pei4beep1ongie6au1aafoosh2vierei5E";
-
-void _memcache_cb (struct memcache_req *req, void *arg) {
-    char *key = arg;
-    const struct memcache_obj *obj;
-    const struct memcache_buf *buf;
+                            
+#define BENCHMARK_KEY "memcache_benchmark"
+#define BENCHMARK_KEY_MAX 256
+#define BENCHMARK_DATA_MAX 1024 * 1024
 
-    INFO("[%s]: cmd=%15s state=%15s reply=%15s", key,
-        memcache_command_str(memcache_req_cmd(req)),
-        memcache_state_str(memcache_req_state(req)),
-        memcache_reply_str(memcache_req_reply(req))
-    );
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+#define MAX(a, b) ((a) > (b) ? (a) : (b))
+
+struct common {
+    unsigned int max_connections;
     
-    if ((obj = memcache_req_obj(req)))
-        INFO("\tobj: flags=0x%04X exptime=%9zu bytes=%6zu cas=%llu", obj->flags, obj->exptime, obj->bytes, obj->cas);
+    struct timeval start;
+} common;
 
-    if ((buf = memcache_req_buf(req)))
-        INFO("\tbuf: data=%p len=%6zu offset=%6zu", buf->data, buf->len, buf->offset);
+struct benchmark_fetch {
+    unsigned int concurrent_ops;
+    unsigned int total_ops;
     
-    INFO("%s", "");
+    const char *key_prefix;
+    unsigned int key_len_min, key_len_max, key_count;
+    unsigned int data_len_min, data_len_max;
+    
+    int keys_stored;
+    int cur_ops;
+    int op_count;
+    
+    struct key_buf {
+        char buf[BENCHMARK_KEY_MAX];
+        struct memcache_key key;
+    } *keys;
+
+} benchmark_fetch;
+
+void benchmark_cb (struct memcache_req *req, void *arg);
+void benchmark_fetch_fn (void);
+
+enum option_code {
+    OPT_CODE_INVALID,
+
+    COMMON_CONN_MAX,
+    BENCH_FETCH_REQ_CONCURRENCY,
+    BENCH_FETCH_REQ_AMOUNT,
+    BENCH_FETCH_KEY_PREFIX,
+    BENCH_FETCH_KEY_LEN_MIN,
+    BENCH_FETCH_KEY_LEN_MAX,
+    BENCH_FETCH_KEY_COUNT,
+    BENCH_FETCH_DATA_LEN_MIN,
+    BENCH_FETCH_DATA_LEN_MAX,
+
+    OPT_CODE_MAX,
+};
+
+enum option_type {
+    OPT_TYPE_NONE,
+    OPT_TYPE_UINT,
+    OPT_TYPE_STR,
+};
+
+static struct test {
+    char *name;
+    memcache_cb cb_fn;
+
+    void (*test_fn) (void);
+
+} test_list[] = {
+    { "benchmark_fetch",        &benchmark_cb,  &benchmark_fetch_fn },
+    { 0,                        0,              0,                  }
+};
+
+static struct option options[] = {
+    { "conn-max",           required_argument,  NULL,   COMMON_CONN_MAX             },
+    { "req-concurrency",    required_argument,  NULL,   BENCH_FETCH_REQ_CONCURRENCY },
+    { "req-amount",         required_argument,  NULL,   BENCH_FETCH_REQ_AMOUNT      },
+    { "key-prefix",         required_argument,  NULL,   BENCH_FETCH_KEY_PREFIX      },
+    { "key-len-min",        required_argument,  NULL,   BENCH_FETCH_KEY_LEN_MIN     },
+    { "key-len-max",        required_argument,  NULL,   BENCH_FETCH_KEY_LEN_MAX     },
+    { "key-count",          required_argument,  NULL,   BENCH_FETCH_KEY_COUNT       },
+    { "data-len-min",       required_argument,  NULL,   BENCH_FETCH_DATA_LEN_MIN    },
+    { "data-len-max",       required_argument,  NULL,   BENCH_FETCH_DATA_LEN_MAX    },
+    { 0,                    0,                  0,      0                           },
+};
+
+static struct opt {
+    enum option_code code;
+    enum option_type type;
+    
+    union opt_type_data {
+        struct {
+            unsigned int *value;
+            unsigned int default_value;
+        } uint;
+
+        struct {
+            const char **value;
+            const char *default_value;
+        } str;
+    } data;
+} option_info[OPT_CODE_MAX] = {
+    {   OPT_CODE_INVALID,               OPT_TYPE_NONE                                                           },
+    {   COMMON_CONN_MAX,                OPT_TYPE_UINT,  { .uint = { &common.max_connections,          1     }}  },
+    {   BENCH_FETCH_REQ_CONCURRENCY,    OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.concurrent_ops,  1     }}  },
+    {   BENCH_FETCH_REQ_AMOUNT,         OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.total_ops,       500   }}  },
+    {   BENCH_FETCH_KEY_PREFIX,         OPT_TYPE_STR,   { .str  = { &benchmark_fetch.key_prefix,      "bf_" }}  },
+    {   BENCH_FETCH_KEY_LEN_MIN,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_min,     8     }}  },
+    {   BENCH_FETCH_KEY_LEN_MAX,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_max,     8     }}  },
+    {   BENCH_FETCH_KEY_COUNT,          OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_count,       1     }}  },
+    {   BENCH_FETCH_DATA_LEN_MIN,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_min,    64    }}  },
+    {   BENCH_FETCH_DATA_LEN_MAX,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_max,    64    }}  },
+};
+
+void time_reset () {
+    // start timing
+    assert(gettimeofday(&common.start, NULL) == 0);
 }
 
-void begin_test () {
-    struct memcache_key key_1, key_2;
-    struct memcache_obj obj_1, obj_2;
-    struct memcache_buf buf_1, buf_2;
+double time_offset () {
+    struct timeval time;
 
-    if ((mc = memcache_alloc(&_memcache_cb)) == NULL)
+    assert(gettimeofday(&time, NULL) == 0);
+    
+    return ((double) (time.tv_sec - common.start.tv_sec)) + ((double) (time.tv_usec - common.start.tv_usec)) / 1000000;
+}
+
+
+
+void mc_init (int max_connections, memcache_cb cb_fn) {
+    // memcache init    
+    if ((mc = memcache_alloc(cb_fn)) == NULL)
         ERROR("memcache_alloc");
     
     // fix up the endpoint
@@ -51,9 +166,84 @@
         ERROR("config_endpoint_parse");
     
     // add the server
-    if (memcache_add_server(mc, &server_endpoint, 1))
+    if (memcache_add_server(mc, &server_endpoint, max_connections))
         ERROR("memcache_add_server");
-   
+    
+    INFO("[memcache] initialized with max_connections=%d", max_connections);
+
+    return;
+
+error:
+    assert(0);
+}
+
+void dump_req (struct memcache_req *req, void *unused) {
+    const struct memcache_obj *obj;
+    const struct memcache_buf *buf;
+
+    INFO("[%*s]: cmd=%s state=%s reply=%s",
+        (int) memcache_req_key(req)->len, memcache_req_key(req)->buf,
+        memcache_command_str(memcache_req_cmd(req)),
+        memcache_state_str(memcache_req_state(req)),
+        memcache_reply_str(memcache_req_reply(req))
+    );
+    
+    if ((obj = memcache_req_obj(req)))
+        INFO("\tobj: flags=0x%04X exptime=%zu bytes=%zu cas=%llu", obj->flags, obj->exptime, obj->bytes, obj->cas);
+
+    if ((buf = memcache_req_buf(req)))
+        INFO("\tbuf: data=%p len=%zu offset=%zu", buf->data, buf->len, buf->offset);
+    
+    INFO("%s", "");
+}
+
+size_t random_value (size_t min, size_t max) {
+    return ((max == min) ? min : (random() % (max - min)) + min);
+}
+
+size_t random_data (char *buf, size_t min, size_t max) {
+#define CHAR_TABLE_MAX (('z' - 'a' + 1) + ('Z' - 'A' + 1) + ('9' - '0' + 1))
+
+    static char char_table[CHAR_TABLE_MAX] = {
+        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'
+    };
+
+    assert(max >= min);
+
+    size_t size = random_value(min, max), i;
+
+    for (i = 0; i < size; i++) {
+        buf[i] = char_table[random_value(0, CHAR_TABLE_MAX)];
+
+        assert(isalnum(buf[i]));
+/*    
+        switch (MIN((size - i), 4)) {
+            case 4: * ((u_int32_t*) &buf[i]) = random(); i += 4; break;
+            case 3:
+            case 2: * ((u_int16_t*) &buf[i]) = random(); i += 2; break;
+            case 1: * ((u_int8_t*)  &buf[i]) = random(); i += 1; break;
+            default: assert(0);
+        }
+*/        
+    }
+
+    return size;
+}
+
+void test_cb (struct memcache_req *req, void *arg) {
+    dump_req(req, arg);
+}
+
+void begin_test () {
+    struct memcache_key key_1, key_2;
+    struct memcache_obj obj_1, obj_2;
+    struct memcache_buf buf_1, buf_2;
+    struct memcache_req *req_1s, *req_2s, *req_1f, *req_2f;
+    
+    mc_init(1, &test_cb);
+
     // add a request or two
     key_1.buf = "memcache_test_k1";
     key_2.buf = "memcache_test_k2";
@@ -76,36 +266,280 @@
     buf_2.len = strlen(data_2);
     buf_2.offset = buf_2.len;
 
-    if (memcache_store(mc, MEMCACHE_CMD_STORE_SET, &key_1, &obj_1, &buf_1, key_1.buf))
+    if ((req_1s = memcache_store(mc, MEMCACHE_CMD_STORE_SET, &key_1, &obj_1, &buf_1, key_1.buf)) == NULL)
         ERROR("memcache_store: key_1");
     
-    if (memcache_store(mc, MEMCACHE_CMD_STORE_ADD, &key_2, &obj_2, &buf_2, key_2.buf))
+    if ((req_2s = memcache_store(mc, MEMCACHE_CMD_STORE_ADD, &key_2, &obj_2, &buf_2, key_2.buf)) == NULL)
         ERROR("memcache_store: key_2");
     
-    if (memcache_fetch(mc, &key_1, key_1.buf))
+    if ((req_1f = memcache_fetch(mc, &key_1, key_1.buf)) == NULL)
         ERROR("memcache_fetch: key_1");
     
-    if (memcache_fetch(mc, &key_2, key_2.buf))
+    if ((req_2f = memcache_fetch(mc, &key_2, key_2.buf)) == NULL)
         ERROR("memcache_fetch: key_2");
 
 error:
     return;
 }
 
+void benchmark_continue () {
+    while (benchmark_fetch.cur_ops < benchmark_fetch.concurrent_ops && (benchmark_fetch.op_count + benchmark_fetch.cur_ops) < benchmark_fetch.total_ops) {
+        // launch
+        assert(memcache_fetch(mc, &benchmark_fetch.keys[random_value(0, benchmark_fetch.key_count)].key, NULL) != NULL);
+
+        benchmark_fetch.cur_ops++;
+
+        if ((benchmark_fetch.op_count + benchmark_fetch.cur_ops) % (benchmark_fetch.total_ops / 10) == 0)
+            INFO("[benchmark] %0.6f: %d+%d/%d requests", 
+                time_offset(),
+                benchmark_fetch.op_count, benchmark_fetch.cur_ops, benchmark_fetch.total_ops
+            );
+    }
+
+    if (benchmark_fetch.op_count == benchmark_fetch.total_ops) {
+        // done
+        assert(event_base_loopexit(ev_base, NULL) == 0);
+
+        INFO("[benchmark] %.6f: %.6f req/s", 
+            time_offset(),
+            benchmark_fetch.op_count / time_offset()
+        );
+    }
+}
+
+void benchmark_fetch_start () {    
+    INFO(
+        "[benchmark] %0.6f: starting\n"
+        "\tconcurrent_ops   = %u\n"
+        "\ttotal_ops        = %u\n"
+        "\tkey_prefix       = %s\n"
+        "\tkey_len_min      = %u\n"
+        "\tkey_len_max      = %u\n"
+        "\tkey_count        = %u\n"
+        "\tdata_len_min     = %u\n"
+        "\tdata_len_max     = %u\n"
+        , time_offset(),
+        benchmark_fetch.concurrent_ops, benchmark_fetch.total_ops,
+        benchmark_fetch.key_prefix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max, benchmark_fetch.key_count,
+        benchmark_fetch.data_len_min, benchmark_fetch.data_len_max
+    );
+    
+    time_reset();
+
+    benchmark_continue();
+    
+    INFO("[benchmark] %0.6f: running", 
+        time_offset()
+    );
+}
+
+
+void benchmark_cb (struct memcache_req *req, void *arg) {
+    enum memcache_command cmd = memcache_req_cmd(req);
+    enum memcache_state state = memcache_req_state(req);
+
+    if (state == MEMCACHE_STATE_ERROR) {
+        dump_req(req, arg);
+        FATAL("request failed");
+    }
+
+    if (state == MEMCACHE_STATE_DONE || state == MEMCACHE_STATE_DONE_DATA) {
+        if (cmd == MEMCACHE_CMD_FETCH_GET) {
+            benchmark_fetch.cur_ops--;
+            benchmark_fetch.op_count++;
+
+            benchmark_continue();
+
+        } else if (cmd == MEMCACHE_CMD_STORE_SET) {
+            if (memcache_req_reply(req) != MEMCACHE_RPL_STORED) {
+                dump_req(req, arg);
+                WARNING("value was not stored");
+            }
+
+            benchmark_fetch.keys_stored++;
+
+            if (benchmark_fetch.keys_stored % (benchmark_fetch.key_count / 10) == 0)
+                INFO("[benchmark] %.6f: key %u/%u stored: %.*s", 
+                    time_offset(), benchmark_fetch.keys_stored, benchmark_fetch.key_count, (int) memcache_req_key(req)->len, memcache_req_key(req)->buf
+                );
+
+            if (benchmark_fetch.keys_stored == benchmark_fetch.key_count)
+                benchmark_fetch_start();
+        }
+        
+        memcache_req_free(req);
+    }
+}
+
+/*
+ * Run <concurrent_op> ops in parrallel, until we have completed total_ops, at which point we shut down.
+ */
+void benchmark_fetch_fn () {
+    static char data[BENCHMARK_DATA_MAX];
+    char key_postfix[BENCHMARK_KEY_MAX];
+    int i, key_len, data_len;
+    struct memcache_obj obj;
+    struct memcache_buf buf;
+
+    assert(benchmark_fetch.key_len_min > 0 && (strlen(benchmark_fetch.key_prefix) + benchmark_fetch.key_len_max) < BENCHMARK_KEY_MAX);
+    assert(benchmark_fetch.data_len_min > 0 && benchmark_fetch.data_len_max < BENCHMARK_DATA_MAX);
+
+    benchmark_fetch.cur_ops = benchmark_fetch.op_count = benchmark_fetch.keys_stored = 0;
+
+    if ((benchmark_fetch.keys = calloc(benchmark_fetch.key_count, sizeof(struct key_buf))) == NULL)
+        FATAL("calloc");
+
+    // pregenerate the data
+    data_len = random_data(data, benchmark_fetch.data_len_min, benchmark_fetch.data_len_max);
+
+    obj.flags = 0x1234;
+    obj.exptime = 0;
+    obj.bytes = data_len;
+    buf.data = data;
+    buf.len = buf.offset = data_len;
+
+    // insert keys
+    INFO("[benchmark] %0.6f: inserting %u keys with prefix=%s and len=(%u -> %u)",
+        time_offset(),
+        benchmark_fetch.key_count, benchmark_fetch.key_prefix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max
+    );
+    
+    for (i = 0; i < benchmark_fetch.key_count; i++) {
+        struct key_buf *keybuf = &benchmark_fetch.keys[i];
+
+        key_len = random_data(key_postfix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max);
+
+        key_postfix[key_len] = '\0';
+
+        assert((keybuf->key.len = snprintf(keybuf->buf, BENCHMARK_KEY_MAX, "%s%*s", benchmark_fetch.key_prefix, key_len, key_postfix)) < BENCHMARK_KEY_MAX);
+        
+        keybuf->key.buf = keybuf->buf;
+
+        assert(memcache_store(mc, MEMCACHE_CMD_STORE_SET, &keybuf->key, &obj, &buf, NULL) != NULL);
+    }
+}
+
+void usage (char *cmd) {
+    INFO(
+        "Usage: %s <cmd> [<args> ... ]\n"
+        "\n"
+        "COMMANDS\n"
+        "\n"
+        "benchmark_fetch:\n"
+        "\tMeasure the speed of fetch requests\n"
+        "\t\n"
+        "\tconn-max         1       number of connections to use\n"
+        "\treq-concurrency  1       number of requests to have running\n"
+        "\treq-amount       500     number of requests to issue\n"
+        "\tkey-prefix       bf_     key prefix\n"
+        "\tkey-len-min      8       minimum key lengt\n"
+        "\tkey-len-max      8       maximum key length\n"
+        "\tkey-count        1       how many keys to use\n"
+        "\tdata-len-min     64      minimum data length\n"
+        "\tdata-len-max     64      maximum data length\n",
+
+        cmd
+    );
+
+    exit(1);
+}
+
 int main (int argc, char **argv) {
-    // libevent init
-    struct event_base *ev_base = event_init();
+    char *name, *invalid;
+    struct test *test;
+    int c, option_index;
+
+    // argument-parsing
+    if (argc < 2) {
+        WARNING("No command given");
+        usage(argv[0]);
+    }
+    
+    // look up the test
+    name = argv[1];
+    test = test_list;
+    
+    while (test->name && strcmp(test->name, name) != 0)
+        test++;
+    
+    if (!test->name) {
+        WARNING("Unknown cmd '%s'", name);
+        usage(argv[0]);
+    }
+    
+    // default values
+    for (c = OPT_CODE_INVALID; c < OPT_CODE_MAX; c++) {
+        switch (option_info[c].type) {
+            case OPT_TYPE_NONE:
+                break;
+
+            case OPT_TYPE_UINT:
+                *option_info[c].data.uint.value = option_info[c].data.uint.default_value;
+
+                break;
+
+            case OPT_TYPE_STR:
+                *option_info[c].data.str.value = option_info[c].data.str.default_value;
+
+                break;
+
+            default:
+                assert(0);
+        }
+    }
+
+    while ((c = getopt_long(argc, argv, "", options, &option_index)) != -1) {
+        if (c <= OPT_CODE_INVALID || c >= OPT_CODE_MAX)
+            FATAL("invalid argument %s", options[option_index].name);
+        
+        switch (option_info[c].type) {
+            case OPT_TYPE_UINT:
+                assert(optarg);
+
+                *option_info[c].data.uint.value = strtol(optarg, &invalid, 10);
+
+                if (*invalid)
+                    FATAL("invalid argument value: %s: %s (%s)", options[option_index].name, optarg, invalid);
+                
+                break;
+
+            case OPT_TYPE_STR:
+                assert(optarg);
+
+                *option_info[c].data.str.value = optarg;
+
+                break;
+    
+            case OPT_TYPE_NONE:
+            default:
+                FATAL("invalid argument type %s", options[option_index].name);
+                
+                break;
+        }
+    }
+
+    // libevent init 
+    ev_base = event_init();
 
     if (!ev_base)
         FATAL("event_init");
     
-    begin_test();
+    // set up the memcache
+    mc_init(common.max_connections, test->cb_fn);
+
+    // start timing
+    time_reset();
+    
+    // start the test
+    test->test_fn();
+    
+    INFO("[libevent] run");
 
     // run the libevent mainloop
     if (event_base_dispatch(ev_base))
         WARNING("event_dispatch");
 
-    INFO("SHUTDOWN");
+    INFO("[libevent] shutdown");
     
     // clean up
     event_base_free(ev_base);
--- a/socket.c	Thu Aug 28 03:14:07 2008 +0300
+++ b/socket.c	Fri Aug 29 23:31:17 2008 +0300
@@ -101,6 +101,7 @@
 static int _socket_do (struct config_endpoint *endpoint, int *sock, int sock_type, struct sockaddr_storage *addr, enum socket_op sockop) {
     struct addrinfo *res = NULL, *info, _fake_res;
     struct sockaddr_un _fake_addr_un;
+    int err = -1;
 
     if (endpoint->family == PF_UNIX) {
         // getaddrinfo doesn't handle PF_UNIX, so we need to build a fake result
@@ -189,13 +190,14 @@
 
     if (*sock == -1)
         ERROR("no working results from getaddrinfo: %s#%s", endpoint->af.inet.addr, endpoint->af.inet.port);
-
-    return 0;
+    
+    // success
+    err = 0;
 
 error:
     if (res != 0 && res != &_fake_res)
         freeaddrinfo(res);
 
-    return -1;
+    return err;
 }