--- a/memcache.h Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache.h Sat Aug 30 19:13:15 2008 +0300
@@ -292,7 +292,7 @@
*
* The given callback function is used for all requests in this context.
*/
-struct memcache *memcache_alloc (memcache_cb cb_fn);
+struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests);
/*
* Add a server to the pool of available servers.
--- a/memcache/connection.c Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/connection.c Sat Aug 30 19:13:15 2008 +0300
@@ -6,16 +6,20 @@
#include <assert.h>
#include "connection.h"
+#include "memcache.h"
#include "command.h"
#include "request.h"
#include "../socket.h"
#include "../common.h"
-
-void memcache_conn_send_req_data (struct memcache_conn *conn);
-void memcache_conn_finish_req_data (struct memcache_conn *conn);
-void memcache_conn_handle_reply (struct memcache_conn *conn);
-void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf);
+void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint);
+void memcache_conn_send_data (struct memcache_conn *conn);
+void memcache_conn_send_end (struct memcache_conn *conn);
+void memcache_conn_send_done (struct memcache_conn *conn);
+void memcache_conn_recv_next (struct memcache_conn *conn);
+void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf);
+void memcache_conn_recv_done (struct memcache_conn *conn);
+void memcache_conn_recv_end (struct memcache_conn *conn);
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
@@ -25,9 +29,7 @@
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
static void memcache_conn_error (struct memcache_conn *conn);
-static void memcache_conn_req_done (struct memcache_conn *conn);
-
-void memcache_conn_close (struct memcache_conn *conn);
+static void memcache_conn_close (struct memcache_conn *conn);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -38,6 +40,9 @@
// remember the server
conn->server = server;
+ // init the req queue
+ TAILQ_INIT(&conn->req_queue);
+
// attempt connect
if (memcache_conn_connect(conn))
ERROR("failed to connect to server");
@@ -83,48 +88,92 @@
}
int memcache_conn_is_available (struct memcache_conn *conn) {
- return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
+ return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests));
}
void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
- assert(conn->fd > 0 && conn->is_connected);
- assert(conn->req == NULL);
+ assert(memcache_conn_is_available(conn));
+
+ // XXX: validate req
- // write the request header into our bufferevent's output buffer
- if (memcache_cmd_format_header(bufferevent_get_output(conn->bev),
- memcache_req_cmd(req),
- memcache_req_key(req),
- memcache_req_obj(req)
- )) {
- ERROR("failed to init the cmd");
+ // stick the req into the req queue
+ TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node);
+
+ // if send is idle...
+ if (conn->send_req == NULL)
+ memcache_conn_send_next(conn, req);
+}
+
+/*
+ * Send out the next request.
+ *
+ * If there is currently a send_req, it is considered as done.
+ */
+void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) {
+ struct memcache_req *req;
+
+ // req will be either
+ // * the next enqueued req after the one that was last written
+ // * hint, if no req was being written (the req that was just enqueued)
+ if (conn->send_req) {
+ assert(!hint);
+
+ // the nex req
+ req = TAILQ_NEXT(conn->send_req, reqqueue_node);
+
+ // and reset the send_req to NULL...
+ conn->send_req = NULL;
+
+ } else {
+ assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint);
+
+ // the given req
+ req = hint;
+ }
+
+ // while we still have a request to process...
+ while (req) {
+ // try and write the request header into our bufferevent's output buffer
+ if (memcache_cmd_format_header(bufferevent_get_output(conn->bev),
+ memcache_req_cmd(req),
+ memcache_req_key(req),
+ memcache_req_obj(req)
+ )) {
+ WARNING("invalid request");
+ memcache_req_error(req);
+
+ // continue on to the next request
+ req = TAILQ_NEXT(req, reqqueue_node);
+
+ continue;
+ }
+
+ // send this one
+ conn->send_req = req;
+
+ // enable our bufferevent to send it
+ if (bufferevent_enable(conn->bev, EV_WRITE))
+ PERROR("bufferevent_enable");
+
+ // tell the req that it is underway
+ memcache_req_send(req);
+
+ // done, we only want to process one
+ break;
}
- // store the req
- conn->req = req;
-
- // tell our bufferevent to send it
- if (bufferevent_enable(conn->bev, EV_WRITE))
- PERROR("bufferevent_enable");
-
- // tell the req that it is underway
- memcache_req_send(req);
-
- // success
+ // done, we either replaced send_req, or consumed them all
return;
error:
- if (conn->req)
- memcache_conn_error(conn);
-
- else
- memcache_req_error(req);
+ memcache_conn_error(conn);
}
/*
* Start writing out the request data
*/
-void memcache_conn_send_req_data (struct memcache_conn *conn) {
- if (conn->req->obj.bytes > 0) {
+void memcache_conn_send_data (struct memcache_conn *conn) {
+ if (conn->send_req->obj.bytes > 0) {
// set up the ev_write
event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
@@ -133,14 +182,15 @@
} else {
// just send the \r\n
- memcache_conn_finish_req_data(conn);
+ memcache_conn_send_end(conn);
}
}
/*
* Write out the final \r\n to terminate the request data
*/
-void memcache_conn_finish_req_data (struct memcache_conn *conn) {
+void memcache_conn_send_end (struct memcache_conn *conn) {
+ // XXX: this will enable the bev by itself?
if (bufferevent_write(conn->bev, "\r\n", 2))
PERROR("bufferevent_write");
@@ -152,12 +202,23 @@
}
/*
+ * Finished sending the current send_req
+ */
+void memcache_conn_send_done (struct memcache_conn *conn) {
+ assert(conn->send_req != NULL);
+
+ // send the next req, if there is one
+ memcache_conn_send_next(conn, NULL);
+
+ // if pipelining is on, it's a question of how many we've sent...
+ if (conn->server->mc->pipeline_requests)
+ memcache_server_conn_ready(conn->server, conn);
+}
+
+/*
* Start reading a reply from the connection
*/
-void memcache_conn_handle_reply (struct memcache_conn *conn) {
- // ensure that we either didn't have a command, or it has been sent
- assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
-
+void memcache_conn_recv_next (struct memcache_conn *conn) {
// start/continue reading on the bufferevent
if (bufferevent_enable(conn->bev, EV_READ))
PERROR("bufferevent_enable");
@@ -175,44 +236,45 @@
/*
* Start reading reply data from the connection
*/
-void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) {
+ struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
int ret;
// check that the buf doesn't contain any data
- assert(conn->req->buf.data == NULL);
+ assert(req->buf.data == NULL);
// bytes *may* be zero if we have an empty cache entry
- if (conn->req->obj.bytes > 0) {
+ if (req->obj.bytes > 0) {
// XXX: memcache_req_make_buffer?
// allocate a buffer for the reply data
- if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
+ if ((req->buf.data = malloc(req->obj.bytes)) == NULL)
ERROR("malloc");
// update the length
- conn->req->buf.len = conn->req->obj.bytes;
+ req->buf.len = req->obj.bytes;
// set offset to zero
- conn->req->buf.offset = 0;
+ req->buf.offset = 0;
// and note that it is present, and is ours
- conn->req->have_buf = 1;
- conn->req->is_buf_ours = 1;
+ req->have_buf = 1;
+ req->is_buf_ours = 1;
// do we have any data in the buf that we need to copy?
if (evbuffer_get_length(buf) > 0) {
// read the data into the memcache_buf
- ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len);
+ ret = evbuffer_remove(buf, req->buf.data, req->buf.len);
// sanity check...
- assert(ret > 0 && ret <= conn->req->buf.len);
+ assert(ret > 0 && ret <= req->buf.len);
// update offset
- conn->req->buf.offset += ret;
+ req->buf.offset += ret;
}
// still need to receive more data?
- if (conn->req->buf.offset < conn->req->buf.len) {
+ if (req->buf.offset < req->buf.len) {
// disable the bufferevent while we read the data
if (bufferevent_disable(conn->bev, EV_READ))
@@ -236,11 +298,8 @@
}
- // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ
- // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then
- // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called.
- // Elegant!
- memcache_conn_handle_reply(conn);
+ // finish it off
+ memcache_conn_recv_end(conn);
// ok
return;
@@ -250,6 +309,38 @@
}
/*
+ * Receive the final bits of data following the reply data block
+ */
+void memcache_conn_recv_end (struct memcache_conn *conn) {
+ // we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the
+ // bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line"
+ // after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause
+ // recv_done to be called.
+ // Elegant!
+ memcache_conn_recv_next(conn);
+}
+
+/*
+ * We have finished receiving our current request
+ */
+void memcache_conn_recv_done (struct memcache_conn *conn) {
+ struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
+
+ // we can remove it from the list now
+ TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
+
+ // have the req detach
+ memcache_req_done(req);
+
+ // and prepare to recv the next one
+ memcache_conn_recv_next(conn);
+
+ // if pipelining is off, it's a question of when we've recevied the reply...
+ if (!conn->server->mc->pipeline_requests)
+ memcache_server_conn_ready(conn->server, conn);
+}
+
+/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -273,6 +364,9 @@
// mark us as succesfully connected
conn->is_connected = 1;
+
+ // and prepare to recv any response headers
+ memcache_conn_recv_next(conn);
// notify the server
memcache_server_conn_ready(conn->server, conn);
@@ -290,18 +384,20 @@
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
+ assert(conn->send_req != NULL);
+
// the command header has been sent
assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
// does this request have some data to be included in the request?
// if the data has already been sent (we handle the final \r\n as well), then skip this.
- if (conn->req->have_buf && conn->req->buf.offset == 0) {
+ if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) {
// we need to send the request data next
- memcache_conn_send_req_data(conn);
+ memcache_conn_send_data(conn);
} else {
- // wait for a reply
- memcache_conn_handle_reply(conn);
+ // the request has now been sent, and se can send the next one
+ memcache_conn_send_done(conn);
}
}
@@ -311,6 +407,7 @@
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
struct evbuffer *in_buf = bufferevent_get_input(bev);
+ struct memcache_req *req;
struct memcache_key key;
char *header_data;
enum memcache_reply reply_type;
@@ -321,8 +418,11 @@
// consume as much data as possible
do {
+ // the req we are processing
+ req = TAILQ_FIRST(&conn->req_queue);
+
// attempt to parse the response header
- if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data))
ERROR("memcache_cmd_parse_header");
if (!header_data) {
@@ -330,25 +430,33 @@
return;
}
+ // no request waiting?
+ if (req == NULL)
+ ERROR("got a response without any request pending: %s", header_data);
+
// if the reply contains data, check that they key is the same
- if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0))
- ERROR("got reply with wrong key !?!");
+ if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0))
+ ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf);
+
+ // check it's a FETCH request
+ if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET)
+ ERROR("a data reply for a non-CMD_FETCH_* command !?!");
// notify the request (no reply data is ready for reading yet, though)
- memcache_req_recv(conn->req, reply_type);
+ memcache_req_recv(req, reply_type);
// does the reply include data?
if (has_data) {
// start reading the data (including whatever might be left over in the bufferevent buffer...)
- memcache_conn_handle_reply_data(conn, in_buf);
+ memcache_conn_recv_data(conn, in_buf);
} else {
// the request is done with
- memcache_conn_req_done(conn);
+ memcache_conn_recv_done(conn);
}
- // free the header data
- free(header_data);
+ // free the header data, but not a second time on error exit
+ free(header_data); header_data = NULL;
} while (evbuffer_get_length(in_buf) > 0);
@@ -372,7 +480,7 @@
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
struct memcache_conn *conn = arg;
- struct memcache_buf *buf = &conn->req->buf;
+ struct memcache_buf *buf = &conn->send_req->buf;
int ret;
// correct event
@@ -403,7 +511,7 @@
} else {
// done! Send the terminating \r\n next
- memcache_conn_finish_req_data(conn);
+ memcache_conn_send_end(conn);
}
// success
@@ -416,7 +524,8 @@
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
struct memcache_conn *conn = arg;
- struct memcache_buf *buf = &conn->req->buf;
+ struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
+ struct memcache_buf *buf = &req->buf;
int ret;
// correct event
@@ -442,7 +551,7 @@
// only notify the req if new data was received, and we won't be calling req_done next.
if (ret > 0 && buf->offset < buf->len) {
// notify the req
- memcache_req_data(conn->req);
+ memcache_req_data(req);
}
// data left to read?
@@ -454,7 +563,7 @@
} else {
// done! We can let the bufferenvet handle the rest of the reply now
- memcache_conn_handle_reply(conn);
+ memcache_conn_recv_end(conn);
}
// success
@@ -465,20 +574,21 @@
memcache_conn_error(conn);
}
-// XXX: need to flush/disable buffers/events on errors
-
/*
* The entire connection failed
*/
static void memcache_conn_error (struct memcache_conn *conn) {
- // fail the request, if we have one
- if (conn->req) {
+ struct memcache_req *req;
+
+ // fail all requests, if we have any
+ TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) {
// error out the req
- memcache_req_error(conn->req);
-
- // we are now available again
- conn->req = NULL;
+ memcache_req_error(req);
+
+ TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
}
+
+ conn->send_req = NULL;
// close the connection
memcache_conn_close(conn);
@@ -487,22 +597,6 @@
memcache_server_conn_dead(conn->server, conn);
}
-/*
- * Detach the request
- */
-static void memcache_conn_req_done (struct memcache_conn *conn) {
- // ensure that we do currently have a req
- assert(conn->req);
-
- // have the req detach
- memcache_req_done(conn->req);
-
- // we are now available again
- conn->req = NULL;
-
- memcache_server_conn_ready(conn->server, conn);
-}
-
void memcache_conn_close (struct memcache_conn *conn) {
// close the fd if needed
if (conn->fd > 0) {
@@ -529,8 +623,8 @@
}
void memcache_conn_free (struct memcache_conn *conn) {
- // ensure we don't have a req bound to us
- assert(conn->req == NULL);
+ // ensure we don't have any reqs bound to us
+ assert(TAILQ_EMPTY(&conn->req_queue));
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
--- a/memcache/connection.h Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/connection.h Sat Aug 30 19:13:15 2008 +0300
@@ -8,6 +8,7 @@
#include <event2/bufferevent.h>
#include "server.h"
+#include "request.h"
#include "command.h"
struct memcache_conn {
@@ -31,8 +32,14 @@
// have we succesfully connected yet?
int is_connected;
- // the request (if any) that we are currently processing - NULL for idle connections
- struct memcache_req *req;
+ /*
+ * The list of requests that we are currently processing, and a pointer to the request that is currently being
+ * sent.
+ *
+ * New requests are enqueued on the end of this list.
+ */
+ struct memcache_reqqueue_head req_queue;
+ struct memcache_req *send_req;
// used to track our commands
struct memcache_cmd cmd;
--- a/memcache/memcache.c Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/memcache.c Sat Aug 30 19:13:15 2008 +0300
@@ -6,14 +6,15 @@
#include "request.h"
#include "../common.h"
-struct memcache *memcache_alloc (memcache_cb cb_fn) {
+struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests) {
struct memcache *mc = NULL;
if ((mc = calloc(1, sizeof(*mc))) == NULL)
ERROR("calloc");
- // store callback
+ // store attributes
mc->cb_fn = cb_fn;
+ mc->pipeline_requests = pipeline_requests;
// init server list
LIST_INIT(&mc->server_list);
@@ -32,7 +33,7 @@
struct memcache_server *server = NULL;
// alloc the server
- if ((server = memcache_server_alloc(endpoint, max_connections)) == NULL)
+ if ((server = memcache_server_alloc(mc, endpoint, max_connections)) == NULL)
goto error;
// enlist it
--- a/memcache/memcache.h Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/memcache.h Sat Aug 30 19:13:15 2008 +0300
@@ -11,6 +11,9 @@
// list of servers
LIST_HEAD(memcache_serverlist_head, memcache_server) server_list;
+
+ // should we "pipeline" requests?
+ char pipeline_requests;
};
struct memcache_server *memcache_choose_server (struct memcache *mc, const struct memcache_key *key);
--- a/memcache/request.h Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/request.h Sat Aug 30 19:13:15 2008 +0300
@@ -4,7 +4,8 @@
#include <sys/queue.h>
#include "../memcache.h"
-#include "connection.h"
+
+TAILQ_HEAD(memcache_reqqueue_head, memcache_req);
struct memcache_req {
// the memcache context that we belong to
--- a/memcache/server.c Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/server.c Sat Aug 30 19:13:15 2008 +0300
@@ -3,18 +3,24 @@
#include <assert.h>
#include "server.h"
+#include "memcache.h"
#include "connection.h"
#include "request.h"
#include "../memcache.h"
#include "../common.h"
-struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections) {
+struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections) {
struct memcache_server *server = NULL;
+ // warn if nonsensical arguments
+ if (max_connections > 1 && mc->pipeline_requests)
+ WARNING("only one connection will ever be used with pipeline_requests");
+
if ((server = calloc(1, sizeof(*server))) == NULL)
ERROR("calloc");
// store the vars
+ server->mc = mc;
server->endpoint = endpoint;
server->max_connections = max_connections;
@@ -113,7 +119,8 @@
// look for idle connections to service the request
LIST_FOREACH(conn, &server->conn_list, connlist_node) {
- if (memcache_conn_is_available(conn)) {
+ // handle pipelining as well
+ while (memcache_conn_is_available(conn)) {
// remove the req from the queue and execute it
TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
--- a/memcache/server.h Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache/server.h Sat Aug 30 19:13:15 2008 +0300
@@ -4,9 +4,14 @@
#include <sys/queue.h>
#include "../memcache.h"
+#include "request.h"
#include "../config.h"
struct memcache_server {
+ // the memcache context we belong to
+ struct memcache *mc;
+
+ // our endpoint
struct config_endpoint *endpoint;
// we are a member of struct memcache.server_list
@@ -16,7 +21,7 @@
LIST_HEAD(memcache_connlist_head, memcache_conn) conn_list;
// a list of enqueued requests waiting for a connection
- TAILQ_HEAD(memcache_reqqueue_head, memcache_req) req_queue;
+ struct memcache_reqqueue_head req_queue;
// how many connections we should have at most
int max_connections;
@@ -25,7 +30,7 @@
/*
* Alloc and return a new memcache_server
*/
-struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections);
+struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections);
/*
* Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections,
--- a/memcache_test.c Fri Aug 29 23:31:17 2008 +0300
+++ b/memcache_test.c Sat Aug 30 19:13:15 2008 +0300
@@ -39,6 +39,7 @@
struct common {
unsigned int max_connections;
+ char pipeline_requests;
struct timeval start;
} common;
@@ -65,10 +66,18 @@
void benchmark_cb (struct memcache_req *req, void *arg);
void benchmark_fetch_fn (void);
+enum option_test {
+ TEST_INVALID,
+ TEST_COMMON,
+
+ TEST_BENCH_FETCH,
+};
+
enum option_code {
OPT_CODE_INVALID,
COMMON_CONN_MAX,
+ COMMON_PIPELINE,
BENCH_FETCH_REQ_CONCURRENCY,
BENCH_FETCH_REQ_AMOUNT,
BENCH_FETCH_KEY_PREFIX,
@@ -83,6 +92,7 @@
enum option_type {
OPT_TYPE_NONE,
+ OPT_TYPE_BOOL,
OPT_TYPE_UINT,
OPT_TYPE_STR,
};
@@ -93,13 +103,19 @@
void (*test_fn) (void);
+ enum option_test code;
+
+ char *descr;
+
} test_list[] = {
- { "benchmark_fetch", &benchmark_cb, &benchmark_fetch_fn },
- { 0, 0, 0, }
+ { "benchmark_fetch", &benchmark_cb, &benchmark_fetch_fn, TEST_BENCH_FETCH,
+ "Measure the speed of fetch requests" },
+ { 0, 0, 0, 0, 0 }
};
static struct option options[] = {
{ "conn-max", required_argument, NULL, COMMON_CONN_MAX },
+ { "pipeline", required_argument, NULL, COMMON_PIPELINE },
{ "req-concurrency", required_argument, NULL, BENCH_FETCH_REQ_CONCURRENCY },
{ "req-amount", required_argument, NULL, BENCH_FETCH_REQ_AMOUNT },
{ "key-prefix", required_argument, NULL, BENCH_FETCH_KEY_PREFIX },
@@ -112,10 +128,16 @@
};
static struct opt {
+ enum option_test test;
enum option_code code;
enum option_type type;
union opt_type_data {
+ struct {
+ char *value;
+ char default_value;
+ } bool;
+
struct {
unsigned int *value;
unsigned int default_value;
@@ -126,17 +148,41 @@
const char *default_value;
} str;
} data;
+
+ const char *name;
+ const char *descr;
} option_info[OPT_CODE_MAX] = {
- { OPT_CODE_INVALID, OPT_TYPE_NONE },
- { COMMON_CONN_MAX, OPT_TYPE_UINT, { .uint = { &common.max_connections, 1 }} },
- { BENCH_FETCH_REQ_CONCURRENCY, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.concurrent_ops, 1 }} },
- { BENCH_FETCH_REQ_AMOUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.total_ops, 500 }} },
- { BENCH_FETCH_KEY_PREFIX, OPT_TYPE_STR, { .str = { &benchmark_fetch.key_prefix, "bf_" }} },
- { BENCH_FETCH_KEY_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_min, 8 }} },
- { BENCH_FETCH_KEY_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_max, 8 }} },
- { BENCH_FETCH_KEY_COUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_count, 1 }} },
- { BENCH_FETCH_DATA_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_min, 64 }} },
- { BENCH_FETCH_DATA_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_max, 64 }} },
+ { TEST_INVALID, OPT_CODE_INVALID, OPT_TYPE_NONE },
+
+ { TEST_COMMON, COMMON_CONN_MAX, OPT_TYPE_UINT, { .uint = { &common.max_connections, 1 }},
+ "conn-max", "number of connections to use" },
+
+ { TEST_COMMON, COMMON_PIPELINE, OPT_TYPE_BOOL, { .bool = { &common.pipeline_requests, 1 }},
+ "pipeline", "pipeline requests" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_REQ_CONCURRENCY, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.concurrent_ops, 1 }},
+ "req-concurrency", "number of requests to have running" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_REQ_AMOUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.total_ops, 500 }},
+ "req-amount", "number of requests to issue" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_KEY_PREFIX, OPT_TYPE_STR, { .str = { &benchmark_fetch.key_prefix, "bf_" }},
+ "key-prefix", "key prefix" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_KEY_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_min, 8 }},
+ "key-len-min", "minimum key length" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_KEY_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_max, 8 }},
+ "key-len-max", "maximum key length" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_KEY_COUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_count, 1 }},
+ "key-count", "how many keys to use" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_DATA_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_min, 64 }},
+ "data-len-min", "minimum data length" },
+
+ { TEST_BENCH_FETCH, BENCH_FETCH_DATA_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_max, 64 }},
+ "data-len-max", "maximum data length" },
};
void time_reset () {
@@ -152,11 +198,15 @@
return ((double) (time.tv_sec - common.start.tv_sec)) + ((double) (time.tv_usec - common.start.tv_usec)) / 1000000;
}
+int ratelimit (int count, int total) {
+ return ((total / 10) ? (count % (total / 10) == 0) : 1);
+}
+
void mc_init (int max_connections, memcache_cb cb_fn) {
// memcache init
- if ((mc = memcache_alloc(cb_fn)) == NULL)
+ if ((mc = memcache_alloc(cb_fn, common.pipeline_requests)) == NULL)
ERROR("memcache_alloc");
// fix up the endpoint
@@ -166,10 +216,10 @@
ERROR("config_endpoint_parse");
// add the server
- if (memcache_add_server(mc, &server_endpoint, max_connections))
+ if (memcache_add_server(mc, &server_endpoint, common.max_connections))
ERROR("memcache_add_server");
- INFO("[memcache] initialized with max_connections=%d", max_connections);
+ INFO("[memcache] initialized with pipeline_requests=%c max_connections=%d", common.pipeline_requests ? 't' : 'f', common.max_connections);
return;
@@ -181,7 +231,7 @@
const struct memcache_obj *obj;
const struct memcache_buf *buf;
- INFO("[%*s]: cmd=%s state=%s reply=%s",
+ INFO("[%.*s]: cmd=%s state=%s reply=%s",
(int) memcache_req_key(req)->len, memcache_req_key(req)->buf,
memcache_command_str(memcache_req_cmd(req)),
memcache_state_str(memcache_req_state(req)),
@@ -288,8 +338,8 @@
assert(memcache_fetch(mc, &benchmark_fetch.keys[random_value(0, benchmark_fetch.key_count)].key, NULL) != NULL);
benchmark_fetch.cur_ops++;
-
- if ((benchmark_fetch.op_count + benchmark_fetch.cur_ops) % (benchmark_fetch.total_ops / 10) == 0)
+
+ if (ratelimit(benchmark_fetch.op_count + benchmark_fetch.cur_ops, benchmark_fetch.total_ops))
INFO("[benchmark] %0.6f: %d+%d/%d requests",
time_offset(),
benchmark_fetch.op_count, benchmark_fetch.cur_ops, benchmark_fetch.total_ops
@@ -357,8 +407,8 @@
}
benchmark_fetch.keys_stored++;
-
- if (benchmark_fetch.keys_stored % (benchmark_fetch.key_count / 10) == 0)
+
+ if (ratelimit(benchmark_fetch.keys_stored, benchmark_fetch.key_count))
INFO("[benchmark] %.6f: key %u/%u stored: %.*s",
time_offset(), benchmark_fetch.keys_stored, benchmark_fetch.key_count, (int) memcache_req_key(req)->len, memcache_req_key(req)->buf
);
@@ -419,27 +469,51 @@
}
}
+void usage_opt (enum option_test test) {
+ struct opt *opt;
+
+ for (opt = option_info + 1; opt->code; opt++) {
+ if (opt->test == test) {
+ switch (opt->type) {
+ case OPT_TYPE_BOOL:
+ INFO("\t%-20s %c %s", opt->name, opt->data.bool.default_value ? 't' : 'f', opt->descr);
+ break;
+
+ case OPT_TYPE_UINT:
+ INFO("\t%-20s %-6d %s", opt->name, opt->data.uint.default_value, opt->descr);
+ break;
+
+ case OPT_TYPE_STR:
+ INFO("\t%-20s %-6s %s", opt->name, opt->data.str.default_value, opt->descr);
+ break;
+
+ default:
+ assert(0);
+ }
+ }
+ }
+}
+
void usage (char *cmd) {
- INFO(
- "Usage: %s <cmd> [<args> ... ]\n"
- "\n"
- "COMMANDS\n"
- "\n"
- "benchmark_fetch:\n"
- "\tMeasure the speed of fetch requests\n"
- "\t\n"
- "\tconn-max 1 number of connections to use\n"
- "\treq-concurrency 1 number of requests to have running\n"
- "\treq-amount 500 number of requests to issue\n"
- "\tkey-prefix bf_ key prefix\n"
- "\tkey-len-min 8 minimum key lengt\n"
- "\tkey-len-max 8 maximum key length\n"
- "\tkey-count 1 how many keys to use\n"
- "\tdata-len-min 64 minimum data length\n"
- "\tdata-len-max 64 maximum data length\n",
+ struct test *test;
- cmd
- );
+ INFO("Usage: %s <cmd> [<args> ... ]", cmd);
+
+ INFO("\nCOMMON OPTIONS\n");
+
+ usage_opt(TEST_COMMON);
+
+ INFO("\nCOMMANDS\n");
+
+ for (test = test_list; test->name; test++) {
+ INFO("%s:", test->name);
+ INFO("\t%s", test->descr);
+ INFO("\t");
+
+ usage_opt(test->code);
+
+ INFO("\t");
+ }
exit(1);
}
@@ -472,6 +546,11 @@
switch (option_info[c].type) {
case OPT_TYPE_NONE:
break;
+
+ case OPT_TYPE_BOOL:
+ *option_info[c].data.bool.value = option_info[c].data.bool.default_value;
+
+ break;
case OPT_TYPE_UINT:
*option_info[c].data.uint.value = option_info[c].data.uint.default_value;
@@ -491,8 +570,31 @@
while ((c = getopt_long(argc, argv, "", options, &option_index)) != -1) {
if (c <= OPT_CODE_INVALID || c >= OPT_CODE_MAX)
FATAL("invalid argument %s", options[option_index].name);
+
+ if (option_info[c].test != test->code && option_info[c].test != TEST_COMMON)
+ FATAL("invalid option %s for test %s", options[option_index].name, test->name);
switch (option_info[c].type) {
+ case OPT_TYPE_BOOL:
+ assert(optarg);
+
+ switch (optarg[0]) {
+ case 't':
+ case '1':
+ *option_info[c].data.bool.value = 1;
+ break;
+
+ case 'f':
+ case '0':
+ *option_info[c].data.bool.value = 0;
+ break;
+
+ default:
+ FATAL("invalid true/false value: %s: %s", options[option_index].name, optarg);
+ }
+
+ break;
+
case OPT_TYPE_UINT:
assert(optarg);