autogenerate the memcache_test help output, and pipeline memcache requests default tip
authorTero Marttila <terom@fixme.fi>
Sat, 30 Aug 2008 19:13:15 +0300
changeset 49 10c7dce1a043
parent 48 1c67f512779b
autogenerate the memcache_test help output, and pipeline memcache requests
memcache.h
memcache/connection.c
memcache/connection.h
memcache/memcache.c
memcache/memcache.h
memcache/request.h
memcache/server.c
memcache/server.h
memcache_test.c
--- a/memcache.h	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache.h	Sat Aug 30 19:13:15 2008 +0300
@@ -292,7 +292,7 @@
  *
  * The given callback function is used for all requests in this context.
  */
-struct memcache *memcache_alloc (memcache_cb cb_fn);
+struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests);
 
 /*
  * Add a server to the pool of available servers.
--- a/memcache/connection.c	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/connection.c	Sat Aug 30 19:13:15 2008 +0300
@@ -6,16 +6,20 @@
 #include <assert.h>
 
 #include "connection.h"
+#include "memcache.h"
 #include "command.h"
 #include "request.h"
 #include "../socket.h"
 #include "../common.h"
 
-
-void memcache_conn_send_req_data (struct memcache_conn *conn);
-void memcache_conn_finish_req_data (struct memcache_conn *conn);
-void memcache_conn_handle_reply (struct memcache_conn *conn);
-void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf);
+void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint);
+void memcache_conn_send_data (struct memcache_conn *conn);
+void memcache_conn_send_end (struct memcache_conn *conn);
+void memcache_conn_send_done (struct memcache_conn *conn);
+void memcache_conn_recv_next (struct memcache_conn *conn);
+void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf);
+void memcache_conn_recv_done (struct memcache_conn *conn);
+void memcache_conn_recv_end (struct memcache_conn *conn);
 
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
@@ -25,9 +29,7 @@
 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
 
 static void memcache_conn_error (struct memcache_conn *conn);
-static void memcache_conn_req_done (struct memcache_conn *conn);
-
-void memcache_conn_close (struct memcache_conn *conn);
+static void memcache_conn_close (struct memcache_conn *conn);
 
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
     struct memcache_conn *conn = NULL;
@@ -38,6 +40,9 @@
     // remember the server
     conn->server = server;
 
+    // init the req queue
+    TAILQ_INIT(&conn->req_queue);
+
     // attempt connect
     if (memcache_conn_connect(conn))
         ERROR("failed to connect to server");
@@ -83,48 +88,92 @@
 }
 
 int memcache_conn_is_available (struct memcache_conn *conn) {
-    return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
+    return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests));
 }
 
 void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
-    assert(conn->fd > 0 && conn->is_connected);
-    assert(conn->req == NULL);
+    assert(memcache_conn_is_available(conn));
+    
+    // XXX: validate req
 
-    // write the request header into our bufferevent's output buffer
-    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
-        memcache_req_cmd(req),
-        memcache_req_key(req),
-        memcache_req_obj(req)
-    )) {
-        ERROR("failed to init the cmd");
+    // stick the req into the req queue
+    TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node);
+    
+    // if send is idle...
+    if (conn->send_req == NULL) 
+        memcache_conn_send_next(conn, req);
+}
+
+/*
+ * Send out the next request.
+ *
+ * If there is currently a send_req, it is considered as done.
+ */
+void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) {
+    struct memcache_req *req;
+    
+    // req will be either
+    //  * the next enqueued req after the one that was last written
+    //  * hint, if no req was being written (the req that was just enqueued)
+    if (conn->send_req) {
+        assert(!hint);
+    
+        // the nex req
+        req = TAILQ_NEXT(conn->send_req, reqqueue_node);
+
+        // and reset the send_req to NULL...
+        conn->send_req = NULL;
+
+    } else {
+        assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint);
+        
+        // the given req
+        req = hint;
+    }
+
+    // while we still have a request to process...
+    while (req) {
+        // try and write the request header into our bufferevent's output buffer
+        if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
+            memcache_req_cmd(req),
+            memcache_req_key(req),
+            memcache_req_obj(req)
+        )) {
+            WARNING("invalid request");
+            memcache_req_error(req);
+
+            // continue on to the next request
+            req = TAILQ_NEXT(req, reqqueue_node);
+
+            continue;
+        }
+
+        // send this one
+        conn->send_req = req;
+
+        // enable our bufferevent to send it
+        if (bufferevent_enable(conn->bev, EV_WRITE))
+            PERROR("bufferevent_enable");
+
+        // tell the req that it is underway
+        memcache_req_send(req);
+        
+        // done, we only want to process one
+        break;
     }
     
-    // store the req
-    conn->req = req;
-    
-    // tell our bufferevent to send it
-    if (bufferevent_enable(conn->bev, EV_WRITE))
-        PERROR("bufferevent_enable");
-    
-    // tell the req that it is underway
-    memcache_req_send(req);
-    
-    // success
+    // done, we either replaced send_req, or consumed them all
     return;
 
 error:
-    if (conn->req)
-        memcache_conn_error(conn);
-
-    else
-        memcache_req_error(req);
+    memcache_conn_error(conn);
 }
 
 /*
  * Start writing out the request data
  */
-void memcache_conn_send_req_data (struct memcache_conn *conn) {
-    if (conn->req->obj.bytes > 0) {
+void memcache_conn_send_data (struct memcache_conn *conn) {
+    if (conn->send_req->obj.bytes > 0) {
         // set up the ev_write
         event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
 
@@ -133,14 +182,15 @@
 
     } else {
         // just send the \r\n
-        memcache_conn_finish_req_data(conn);
+        memcache_conn_send_end(conn);
     }
 }
 
 /*
  * Write out the final \r\n to terminate the request data
  */
-void memcache_conn_finish_req_data (struct memcache_conn *conn) {
+void memcache_conn_send_end (struct memcache_conn *conn) {
+    // XXX: this will enable the bev by itself?
     if (bufferevent_write(conn->bev, "\r\n", 2))
         PERROR("bufferevent_write");
     
@@ -152,12 +202,23 @@
 }
 
 /*
+ * Finished sending the current send_req
+ */
+void memcache_conn_send_done (struct memcache_conn *conn) {
+    assert(conn->send_req != NULL);
+
+    // send the next req, if there is one
+    memcache_conn_send_next(conn, NULL);
+    
+    // if pipelining is on, it's a question of how many we've sent...
+    if (conn->server->mc->pipeline_requests)
+        memcache_server_conn_ready(conn->server, conn);
+}
+
+/*
  * Start reading a reply from the connection
  */
-void memcache_conn_handle_reply (struct memcache_conn *conn) {
-    // ensure that we either didn't have a command, or it has been sent
-    assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
-
+void memcache_conn_recv_next (struct memcache_conn *conn) {
     // start/continue reading on the bufferevent
     if (bufferevent_enable(conn->bev, EV_READ))
         PERROR("bufferevent_enable");
@@ -175,44 +236,45 @@
 /*
  * Start reading reply data from the connection
  */
-void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) {
+    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
     int ret;
 
     // check that the buf doesn't contain any data
-    assert(conn->req->buf.data == NULL);
+    assert(req->buf.data == NULL);
 
     // bytes *may* be zero if we have an empty cache entry
-    if (conn->req->obj.bytes > 0) {
+    if (req->obj.bytes > 0) {
         // XXX: memcache_req_make_buffer?
 
         // allocate a buffer for the reply data
-        if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
+        if ((req->buf.data = malloc(req->obj.bytes)) == NULL)
             ERROR("malloc");
         
         // update the length
-        conn->req->buf.len = conn->req->obj.bytes;
+        req->buf.len = req->obj.bytes;
 
         // set offset to zero
-        conn->req->buf.offset = 0;
+        req->buf.offset = 0;
         
         // and note that it is present, and is ours
-        conn->req->have_buf = 1;
-        conn->req->is_buf_ours = 1;
+        req->have_buf = 1;
+        req->is_buf_ours = 1;
 
         // do we have any data in the buf that we need to copy?
         if (evbuffer_get_length(buf) > 0) {
             // read the data into the memcache_buf
-            ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len);
+            ret = evbuffer_remove(buf, req->buf.data, req->buf.len);
             
             // sanity check...
-            assert(ret > 0 && ret <= conn->req->buf.len);
+            assert(ret > 0 && ret <= req->buf.len);
 
             // update offset
-            conn->req->buf.offset += ret;
+            req->buf.offset += ret;
         }
 
         // still need to receive more data?
-        if (conn->req->buf.offset < conn->req->buf.len) {
+        if (req->buf.offset < req->buf.len) {
         
             // disable the bufferevent while we read the data
             if (bufferevent_disable(conn->bev, EV_READ))
@@ -236,11 +298,8 @@
         
     }
     
-    // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ
-    // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then
-    // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called.
-    // Elegant!
-    memcache_conn_handle_reply(conn);
+    // finish it off
+    memcache_conn_recv_end(conn);
 
     // ok
     return;
@@ -250,6 +309,38 @@
 }
 
 /*
+ * Receive the final bits of data following the reply data block
+ */ 
+void memcache_conn_recv_end (struct memcache_conn *conn) {
+    // we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the
+    // bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line"
+    // after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause
+    // recv_done to be called.
+    // Elegant!
+    memcache_conn_recv_next(conn);
+}
+
+/*
+ * We have finished receiving our current request
+ */
+void memcache_conn_recv_done (struct memcache_conn *conn) {
+    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
+    
+    // we can remove it from the list now
+    TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
+
+    // have the req detach
+    memcache_req_done(req);
+
+    // and prepare to recv the next one
+    memcache_conn_recv_next(conn);
+    
+    // if pipelining is off, it's a question of when we've recevied the reply...
+    if (!conn->server->mc->pipeline_requests)
+        memcache_server_conn_ready(conn->server, conn);
+}
+
+/*
  * The connect() has finished
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -273,6 +364,9 @@
 
     // mark us as succesfully connected
     conn->is_connected = 1;
+    
+    // and prepare to recv any response headers
+    memcache_conn_recv_next(conn);
 
     // notify the server
     memcache_server_conn_ready(conn->server, conn);
@@ -290,18 +384,20 @@
 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
     struct memcache_conn *conn = arg;
 
+    assert(conn->send_req != NULL);
+
     // the command header has been sent
     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
     
     // does this request have some data to be included in the request?
     // if the data has already been sent (we handle the final \r\n as well), then skip this.
-    if (conn->req->have_buf && conn->req->buf.offset == 0) {
+    if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) {
         // we need to send the request data next
-        memcache_conn_send_req_data(conn);
+        memcache_conn_send_data(conn);
 
     } else {
-        // wait for a reply
-        memcache_conn_handle_reply(conn);
+        // the request has now been sent, and se can send the next one
+        memcache_conn_send_done(conn);
     }
 }
 
@@ -311,6 +407,7 @@
 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
     struct memcache_conn *conn = arg;
     struct evbuffer *in_buf = bufferevent_get_input(bev);
+    struct memcache_req *req;
     struct memcache_key key;
     char *header_data;
     enum memcache_reply reply_type;
@@ -321,8 +418,11 @@
     
     // consume as much data as possible
     do {
+        // the req we are processing
+        req = TAILQ_FIRST(&conn->req_queue);
+
         // attempt to parse the response header
-        if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
+        if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data))
             ERROR("memcache_cmd_parse_header");
         
         if (!header_data) {
@@ -330,25 +430,33 @@
             return;
         }
 
+        // no request waiting?
+        if (req == NULL)
+            ERROR("got a response without any request pending: %s", header_data);
+
         // if the reply contains data, check that they key is the same
-        if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0))
-            ERROR("got reply with wrong key !?!");
+        if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0))
+            ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf);
+
+        // check it's a FETCH request
+        if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET)
+            ERROR("a data reply for a non-CMD_FETCH_* command !?!");
 
         // notify the request (no reply data is ready for reading yet, though)
-        memcache_req_recv(conn->req, reply_type);
+        memcache_req_recv(req, reply_type);
         
         // does the reply include data?
         if (has_data) {
             // start reading the data (including whatever might be left over in the bufferevent buffer...)
-            memcache_conn_handle_reply_data(conn, in_buf);
+            memcache_conn_recv_data(conn, in_buf);
 
         } else {
             // the request is done with
-            memcache_conn_req_done(conn);
+            memcache_conn_recv_done(conn);
         }
 
-        // free the header data
-        free(header_data);
+        // free the header data, but not a second time on error exit
+        free(header_data); header_data = NULL;
     
     } while (evbuffer_get_length(in_buf) > 0);
     
@@ -372,7 +480,7 @@
 
 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
     struct memcache_conn *conn = arg;
-    struct memcache_buf *buf = &conn->req->buf;
+    struct memcache_buf *buf = &conn->send_req->buf;
     int ret;
 
     // correct event
@@ -403,7 +511,7 @@
 
     } else {
         // done! Send the terminating \r\n next
-        memcache_conn_finish_req_data(conn);
+        memcache_conn_send_end(conn);
     }
 
     // success
@@ -416,7 +524,8 @@
 
 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
     struct memcache_conn *conn = arg;
-    struct memcache_buf *buf = &conn->req->buf;
+    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
+    struct memcache_buf *buf = &req->buf;
     int ret;
 
     // correct event
@@ -442,7 +551,7 @@
     // only notify the req if new data was received, and we won't be calling req_done next.
     if (ret > 0 && buf->offset < buf->len) {
         // notify the req
-        memcache_req_data(conn->req);
+        memcache_req_data(req);
     }
 
     // data left to read?
@@ -454,7 +563,7 @@
 
     } else {
         // done! We can let the bufferenvet handle the rest of the reply now
-        memcache_conn_handle_reply(conn);
+        memcache_conn_recv_end(conn);
     }
 
     // success
@@ -465,20 +574,21 @@
     memcache_conn_error(conn);
 }
 
-// XXX: need to flush/disable buffers/events on errors
-
 /*
  * The entire connection failed
  */
 static void memcache_conn_error (struct memcache_conn *conn) {
-    // fail the request, if we have one
-    if (conn->req) {
+    struct memcache_req *req;
+
+    // fail all requests, if we have any
+    TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) {
         // error out the req
-        memcache_req_error(conn->req);
-        
-        // we are now available again
-        conn->req = NULL;
+        memcache_req_error(req);
+
+        TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
     }
+
+    conn->send_req = NULL;
     
     // close the connection
     memcache_conn_close(conn);
@@ -487,22 +597,6 @@
     memcache_server_conn_dead(conn->server, conn);
 }
 
-/*
- * Detach the request
- */
-static void memcache_conn_req_done (struct memcache_conn *conn) {
-    // ensure that we do currently have a req
-    assert(conn->req);
-    
-    // have the req detach
-    memcache_req_done(conn->req);
-
-    // we are now available again
-    conn->req = NULL;
-    
-    memcache_server_conn_ready(conn->server, conn);
-}
-
 void memcache_conn_close (struct memcache_conn *conn) {
     // close the fd if needed
     if (conn->fd > 0) {
@@ -529,8 +623,8 @@
 }
 
 void memcache_conn_free (struct memcache_conn *conn) {
-    // ensure we don't have a req bound to us
-    assert(conn->req == NULL);
+    // ensure we don't have any reqs bound to us
+    assert(TAILQ_EMPTY(&conn->req_queue));
     
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);
--- a/memcache/connection.h	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/connection.h	Sat Aug 30 19:13:15 2008 +0300
@@ -8,6 +8,7 @@
 #include <event2/bufferevent.h>
 
 #include "server.h"
+#include "request.h"
 #include "command.h"
 
 struct memcache_conn {
@@ -31,8 +32,14 @@
     // have we succesfully connected yet?
     int is_connected;
     
-    // the request (if any) that we are currently processing - NULL for idle connections
-    struct memcache_req *req;
+    /*
+     * The list of requests that we are currently processing, and a pointer to the request that is currently being
+     * sent.
+     *  
+     * New requests are enqueued on the end of this list.
+     */
+    struct memcache_reqqueue_head req_queue;
+    struct memcache_req *send_req;
 
     // used to track our commands
     struct memcache_cmd cmd;
--- a/memcache/memcache.c	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/memcache.c	Sat Aug 30 19:13:15 2008 +0300
@@ -6,14 +6,15 @@
 #include "request.h"
 #include "../common.h"
 
-struct memcache *memcache_alloc (memcache_cb cb_fn) {
+struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests) {
     struct memcache *mc = NULL;
 
     if ((mc = calloc(1, sizeof(*mc))) == NULL)
         ERROR("calloc");
     
-    // store callback
+    // store attributes
     mc->cb_fn = cb_fn;
+    mc->pipeline_requests = pipeline_requests;
 
     // init server list
     LIST_INIT(&mc->server_list);
@@ -32,7 +33,7 @@
     struct memcache_server *server = NULL;
     
     // alloc the server
-    if ((server = memcache_server_alloc(endpoint, max_connections)) == NULL)
+    if ((server = memcache_server_alloc(mc, endpoint, max_connections)) == NULL)
         goto error;
 
     // enlist it
--- a/memcache/memcache.h	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/memcache.h	Sat Aug 30 19:13:15 2008 +0300
@@ -11,6 +11,9 @@
 
     // list of servers
     LIST_HEAD(memcache_serverlist_head, memcache_server) server_list;
+
+    // should we "pipeline" requests?
+    char pipeline_requests;
 };
 
 struct memcache_server *memcache_choose_server (struct memcache *mc, const struct memcache_key *key);
--- a/memcache/request.h	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/request.h	Sat Aug 30 19:13:15 2008 +0300
@@ -4,7 +4,8 @@
 #include <sys/queue.h>
 
 #include "../memcache.h"
-#include "connection.h"
+
+TAILQ_HEAD(memcache_reqqueue_head, memcache_req);
 
 struct memcache_req {
     // the memcache context that we belong to
--- a/memcache/server.c	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/server.c	Sat Aug 30 19:13:15 2008 +0300
@@ -3,18 +3,24 @@
 #include <assert.h>
 
 #include "server.h"
+#include "memcache.h"
 #include "connection.h"
 #include "request.h"
 #include "../memcache.h"
 #include "../common.h"
 
-struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections) {
+struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections) {
     struct memcache_server *server = NULL;
 
+    // warn if nonsensical arguments
+    if (max_connections > 1 && mc->pipeline_requests)
+        WARNING("only one connection will ever be used with pipeline_requests");
+
     if ((server = calloc(1, sizeof(*server))) == NULL)
         ERROR("calloc");
     
     // store the vars
+    server->mc = mc;
     server->endpoint = endpoint;
     server->max_connections = max_connections;
 
@@ -113,7 +119,8 @@
     
     // look for idle connections to service the request
     LIST_FOREACH(conn, &server->conn_list, connlist_node) {
-        if (memcache_conn_is_available(conn)) {
+        // handle pipelining as well
+        while (memcache_conn_is_available(conn)) {
             // remove the req from the queue and execute it
             TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
             
--- a/memcache/server.h	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/server.h	Sat Aug 30 19:13:15 2008 +0300
@@ -4,9 +4,14 @@
 #include <sys/queue.h>
 
 #include "../memcache.h"
+#include "request.h"
 #include "../config.h"
 
 struct memcache_server {
+    // the memcache context we belong to
+    struct memcache *mc;
+    
+    // our endpoint
     struct config_endpoint *endpoint;
     
     // we are a member of struct memcache.server_list
@@ -16,7 +21,7 @@
     LIST_HEAD(memcache_connlist_head, memcache_conn) conn_list;
 
     // a list of enqueued requests waiting for a connection
-    TAILQ_HEAD(memcache_reqqueue_head, memcache_req) req_queue;
+    struct memcache_reqqueue_head req_queue;
 
     // how many connections we should have at most
     int max_connections;
@@ -25,7 +30,7 @@
 /*
  * Alloc and return a new memcache_server
  */
-struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections);
+struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections);
 
 /*
  * Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections,
--- a/memcache_test.c	Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache_test.c	Sat Aug 30 19:13:15 2008 +0300
@@ -39,6 +39,7 @@
 
 struct common {
     unsigned int max_connections;
+    char pipeline_requests;
     
     struct timeval start;
 } common;
@@ -65,10 +66,18 @@
 void benchmark_cb (struct memcache_req *req, void *arg);
 void benchmark_fetch_fn (void);
 
+enum option_test {
+    TEST_INVALID,
+    TEST_COMMON,
+
+    TEST_BENCH_FETCH,
+};
+
 enum option_code {
     OPT_CODE_INVALID,
 
     COMMON_CONN_MAX,
+    COMMON_PIPELINE,
     BENCH_FETCH_REQ_CONCURRENCY,
     BENCH_FETCH_REQ_AMOUNT,
     BENCH_FETCH_KEY_PREFIX,
@@ -83,6 +92,7 @@
 
 enum option_type {
     OPT_TYPE_NONE,
+    OPT_TYPE_BOOL,
     OPT_TYPE_UINT,
     OPT_TYPE_STR,
 };
@@ -93,13 +103,19 @@
 
     void (*test_fn) (void);
 
+    enum option_test code;
+
+    char *descr;
+
 } test_list[] = {
-    { "benchmark_fetch",        &benchmark_cb,  &benchmark_fetch_fn },
-    { 0,                        0,              0,                  }
+    { "benchmark_fetch",        &benchmark_cb,  &benchmark_fetch_fn,    TEST_BENCH_FETCH,
+        "Measure the speed of fetch requests"                                               },
+    { 0,                        0,              0,                      0,  0               }
 };
 
 static struct option options[] = {
     { "conn-max",           required_argument,  NULL,   COMMON_CONN_MAX             },
+    { "pipeline",           required_argument,  NULL,   COMMON_PIPELINE             },
     { "req-concurrency",    required_argument,  NULL,   BENCH_FETCH_REQ_CONCURRENCY },
     { "req-amount",         required_argument,  NULL,   BENCH_FETCH_REQ_AMOUNT      },
     { "key-prefix",         required_argument,  NULL,   BENCH_FETCH_KEY_PREFIX      },
@@ -112,10 +128,16 @@
 };
 
 static struct opt {
+    enum option_test test;
     enum option_code code;
     enum option_type type;
     
     union opt_type_data {
+        struct  {
+            char *value;
+            char default_value;
+        } bool;
+
         struct {
             unsigned int *value;
             unsigned int default_value;
@@ -126,17 +148,41 @@
             const char *default_value;
         } str;
     } data;
+    
+    const char *name;
+    const char *descr;
 } option_info[OPT_CODE_MAX] = {
-    {   OPT_CODE_INVALID,               OPT_TYPE_NONE                                                           },
-    {   COMMON_CONN_MAX,                OPT_TYPE_UINT,  { .uint = { &common.max_connections,          1     }}  },
-    {   BENCH_FETCH_REQ_CONCURRENCY,    OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.concurrent_ops,  1     }}  },
-    {   BENCH_FETCH_REQ_AMOUNT,         OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.total_ops,       500   }}  },
-    {   BENCH_FETCH_KEY_PREFIX,         OPT_TYPE_STR,   { .str  = { &benchmark_fetch.key_prefix,      "bf_" }}  },
-    {   BENCH_FETCH_KEY_LEN_MIN,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_min,     8     }}  },
-    {   BENCH_FETCH_KEY_LEN_MAX,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_max,     8     }}  },
-    {   BENCH_FETCH_KEY_COUNT,          OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_count,       1     }}  },
-    {   BENCH_FETCH_DATA_LEN_MIN,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_min,    64    }}  },
-    {   BENCH_FETCH_DATA_LEN_MAX,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_max,    64    }}  },
+    {   TEST_INVALID,       OPT_CODE_INVALID,               OPT_TYPE_NONE                                                           },
+
+    {   TEST_COMMON,        COMMON_CONN_MAX,                OPT_TYPE_UINT,  { .uint = { &common.max_connections,          1     }},
+            "conn-max",             "number of connections to use"                                                                  },
+
+    {   TEST_COMMON,        COMMON_PIPELINE,                OPT_TYPE_BOOL,  { .bool = { &common.pipeline_requests,        1     }},  
+            "pipeline",             "pipeline requests"                                                                             },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_REQ_CONCURRENCY,    OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.concurrent_ops,  1     }},
+            "req-concurrency",      "number of requests to have running"                                                            },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_REQ_AMOUNT,         OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.total_ops,       500   }},
+            "req-amount",           "number of requests to issue"                                                                   },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_KEY_PREFIX,         OPT_TYPE_STR,   { .str  = { &benchmark_fetch.key_prefix,      "bf_" }},
+            "key-prefix",           "key prefix"                                                                                    },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_KEY_LEN_MIN,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_min,     8     }},  
+            "key-len-min",          "minimum key length"                                                                            },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_KEY_LEN_MAX,        OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_len_max,     8     }},
+            "key-len-max",          "maximum key length"                                                                            },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_KEY_COUNT,          OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.key_count,       1     }},
+            "key-count",            "how many keys to use"                                                                          },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_DATA_LEN_MIN,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_min,    64    }},
+            "data-len-min",         "minimum data length"                                                                           },
+
+    {   TEST_BENCH_FETCH,   BENCH_FETCH_DATA_LEN_MAX,       OPT_TYPE_UINT,  { .uint = { &benchmark_fetch.data_len_max,    64    }},
+            "data-len-max",         "maximum data length"                                                                            },
 };
 
 void time_reset () {
@@ -152,11 +198,15 @@
     return ((double) (time.tv_sec - common.start.tv_sec)) + ((double) (time.tv_usec - common.start.tv_usec)) / 1000000;
 }
 
+int ratelimit (int count, int total) {
+    return ((total / 10) ? (count % (total / 10) == 0) : 1);
+}
+
 
 
 void mc_init (int max_connections, memcache_cb cb_fn) {
     // memcache init    
-    if ((mc = memcache_alloc(cb_fn)) == NULL)
+    if ((mc = memcache_alloc(cb_fn, common.pipeline_requests)) == NULL)
         ERROR("memcache_alloc");
     
     // fix up the endpoint
@@ -166,10 +216,10 @@
         ERROR("config_endpoint_parse");
     
     // add the server
-    if (memcache_add_server(mc, &server_endpoint, max_connections))
+    if (memcache_add_server(mc, &server_endpoint, common.max_connections))
         ERROR("memcache_add_server");
     
-    INFO("[memcache] initialized with max_connections=%d", max_connections);
+    INFO("[memcache] initialized with pipeline_requests=%c max_connections=%d", common.pipeline_requests ? 't' : 'f', common.max_connections);
 
     return;
 
@@ -181,7 +231,7 @@
     const struct memcache_obj *obj;
     const struct memcache_buf *buf;
 
-    INFO("[%*s]: cmd=%s state=%s reply=%s",
+    INFO("[%.*s]: cmd=%s state=%s reply=%s",
         (int) memcache_req_key(req)->len, memcache_req_key(req)->buf,
         memcache_command_str(memcache_req_cmd(req)),
         memcache_state_str(memcache_req_state(req)),
@@ -288,8 +338,8 @@
         assert(memcache_fetch(mc, &benchmark_fetch.keys[random_value(0, benchmark_fetch.key_count)].key, NULL) != NULL);
 
         benchmark_fetch.cur_ops++;
-
-        if ((benchmark_fetch.op_count + benchmark_fetch.cur_ops) % (benchmark_fetch.total_ops / 10) == 0)
+        
+        if (ratelimit(benchmark_fetch.op_count + benchmark_fetch.cur_ops, benchmark_fetch.total_ops))
             INFO("[benchmark] %0.6f: %d+%d/%d requests", 
                 time_offset(),
                 benchmark_fetch.op_count, benchmark_fetch.cur_ops, benchmark_fetch.total_ops
@@ -357,8 +407,8 @@
             }
 
             benchmark_fetch.keys_stored++;
-
-            if (benchmark_fetch.keys_stored % (benchmark_fetch.key_count / 10) == 0)
+            
+            if (ratelimit(benchmark_fetch.keys_stored, benchmark_fetch.key_count))
                 INFO("[benchmark] %.6f: key %u/%u stored: %.*s", 
                     time_offset(), benchmark_fetch.keys_stored, benchmark_fetch.key_count, (int) memcache_req_key(req)->len, memcache_req_key(req)->buf
                 );
@@ -419,27 +469,51 @@
     }
 }
 
+void usage_opt (enum option_test test) {
+    struct opt *opt;
+
+    for (opt = option_info + 1; opt->code; opt++) {
+        if (opt->test == test) {
+            switch (opt->type) {
+                case OPT_TYPE_BOOL:
+                    INFO("\t%-20s %c     %s", opt->name, opt->data.bool.default_value ? 't' : 'f', opt->descr);
+                    break;
+
+                case OPT_TYPE_UINT:
+                    INFO("\t%-20s %-6d %s", opt->name, opt->data.uint.default_value, opt->descr);
+                    break;
+
+                case OPT_TYPE_STR:
+                    INFO("\t%-20s %-6s %s", opt->name, opt->data.str.default_value, opt->descr);
+                    break;
+                
+                default:
+                    assert(0);
+            }
+        }
+    }
+}
+
 void usage (char *cmd) {
-    INFO(
-        "Usage: %s <cmd> [<args> ... ]\n"
-        "\n"
-        "COMMANDS\n"
-        "\n"
-        "benchmark_fetch:\n"
-        "\tMeasure the speed of fetch requests\n"
-        "\t\n"
-        "\tconn-max         1       number of connections to use\n"
-        "\treq-concurrency  1       number of requests to have running\n"
-        "\treq-amount       500     number of requests to issue\n"
-        "\tkey-prefix       bf_     key prefix\n"
-        "\tkey-len-min      8       minimum key lengt\n"
-        "\tkey-len-max      8       maximum key length\n"
-        "\tkey-count        1       how many keys to use\n"
-        "\tdata-len-min     64      minimum data length\n"
-        "\tdata-len-max     64      maximum data length\n",
+    struct test *test;
 
-        cmd
-    );
+    INFO("Usage: %s <cmd> [<args> ... ]", cmd);
+
+    INFO("\nCOMMON OPTIONS\n");
+    
+    usage_opt(TEST_COMMON);
+
+    INFO("\nCOMMANDS\n");
+
+    for (test = test_list; test->name; test++) {
+        INFO("%s:", test->name);
+        INFO("\t%s", test->descr);
+        INFO("\t");
+        
+        usage_opt(test->code);
+
+        INFO("\t");
+    }
 
     exit(1);
 }
@@ -472,6 +546,11 @@
         switch (option_info[c].type) {
             case OPT_TYPE_NONE:
                 break;
+            
+            case OPT_TYPE_BOOL:
+                *option_info[c].data.bool.value = option_info[c].data.bool.default_value;
+
+                break;
 
             case OPT_TYPE_UINT:
                 *option_info[c].data.uint.value = option_info[c].data.uint.default_value;
@@ -491,8 +570,31 @@
     while ((c = getopt_long(argc, argv, "", options, &option_index)) != -1) {
         if (c <= OPT_CODE_INVALID || c >= OPT_CODE_MAX)
             FATAL("invalid argument %s", options[option_index].name);
+
+        if (option_info[c].test != test->code && option_info[c].test != TEST_COMMON)
+            FATAL("invalid option %s for test %s", options[option_index].name, test->name);
         
         switch (option_info[c].type) {
+            case OPT_TYPE_BOOL:
+                assert(optarg);
+
+                switch (optarg[0]) {
+                    case 't':
+                    case '1':
+                        *option_info[c].data.bool.value = 1;
+                        break;
+
+                    case 'f':
+                    case '0':
+                        *option_info[c].data.bool.value = 0;
+                        break;
+
+                    default:
+                        FATAL("invalid true/false value: %s: %s", options[option_index].name, optarg);
+                }
+
+                break;
+
             case OPT_TYPE_UINT:
                 assert(optarg);