# HG changeset patch # User Tero Marttila # Date 1220112795 -10800 # Node ID 10c7dce1a04343d4a7756c1f39274934f4a80059 # Parent 1c67f512779bfdeddbe184c8d732ae031840cf1b autogenerate the memcache_test help output, and pipeline memcache requests diff -r 1c67f512779b -r 10c7dce1a043 memcache.h --- a/memcache.h Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache.h Sat Aug 30 19:13:15 2008 +0300 @@ -292,7 +292,7 @@ * * The given callback function is used for all requests in this context. */ -struct memcache *memcache_alloc (memcache_cb cb_fn); +struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests); /* * Add a server to the pool of available servers. diff -r 1c67f512779b -r 10c7dce1a043 memcache/connection.c --- a/memcache/connection.c Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/connection.c Sat Aug 30 19:13:15 2008 +0300 @@ -6,16 +6,20 @@ #include #include "connection.h" +#include "memcache.h" #include "command.h" #include "request.h" #include "../socket.h" #include "../common.h" - -void memcache_conn_send_req_data (struct memcache_conn *conn); -void memcache_conn_finish_req_data (struct memcache_conn *conn); -void memcache_conn_handle_reply (struct memcache_conn *conn); -void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf); +void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint); +void memcache_conn_send_data (struct memcache_conn *conn); +void memcache_conn_send_end (struct memcache_conn *conn); +void memcache_conn_send_done (struct memcache_conn *conn); +void memcache_conn_recv_next (struct memcache_conn *conn); +void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf); +void memcache_conn_recv_done (struct memcache_conn *conn); +void memcache_conn_recv_end (struct memcache_conn *conn); static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); @@ -25,9 +29,7 @@ static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg); static void memcache_conn_error (struct memcache_conn *conn); -static void memcache_conn_req_done (struct memcache_conn *conn); - -void memcache_conn_close (struct memcache_conn *conn); +static void memcache_conn_close (struct memcache_conn *conn); struct memcache_conn *memcache_conn_open (struct memcache_server *server) { struct memcache_conn *conn = NULL; @@ -38,6 +40,9 @@ // remember the server conn->server = server; + // init the req queue + TAILQ_INIT(&conn->req_queue); + // attempt connect if (memcache_conn_connect(conn)) ERROR("failed to connect to server"); @@ -83,48 +88,92 @@ } int memcache_conn_is_available (struct memcache_conn *conn) { - return (conn->fd > 0 && conn->is_connected && conn->req == NULL); + return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests)); } void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) { - assert(conn->fd > 0 && conn->is_connected); - assert(conn->req == NULL); + assert(memcache_conn_is_available(conn)); + + // XXX: validate req - // write the request header into our bufferevent's output buffer - if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), - memcache_req_cmd(req), - memcache_req_key(req), - memcache_req_obj(req) - )) { - ERROR("failed to init the cmd"); + // stick the req into the req queue + TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node); + + // if send is idle... + if (conn->send_req == NULL) + memcache_conn_send_next(conn, req); +} + +/* + * Send out the next request. + * + * If there is currently a send_req, it is considered as done. + */ +void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) { + struct memcache_req *req; + + // req will be either + // * the next enqueued req after the one that was last written + // * hint, if no req was being written (the req that was just enqueued) + if (conn->send_req) { + assert(!hint); + + // the nex req + req = TAILQ_NEXT(conn->send_req, reqqueue_node); + + // and reset the send_req to NULL... + conn->send_req = NULL; + + } else { + assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint); + + // the given req + req = hint; + } + + // while we still have a request to process... + while (req) { + // try and write the request header into our bufferevent's output buffer + if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), + memcache_req_cmd(req), + memcache_req_key(req), + memcache_req_obj(req) + )) { + WARNING("invalid request"); + memcache_req_error(req); + + // continue on to the next request + req = TAILQ_NEXT(req, reqqueue_node); + + continue; + } + + // send this one + conn->send_req = req; + + // enable our bufferevent to send it + if (bufferevent_enable(conn->bev, EV_WRITE)) + PERROR("bufferevent_enable"); + + // tell the req that it is underway + memcache_req_send(req); + + // done, we only want to process one + break; } - // store the req - conn->req = req; - - // tell our bufferevent to send it - if (bufferevent_enable(conn->bev, EV_WRITE)) - PERROR("bufferevent_enable"); - - // tell the req that it is underway - memcache_req_send(req); - - // success + // done, we either replaced send_req, or consumed them all return; error: - if (conn->req) - memcache_conn_error(conn); - - else - memcache_req_error(req); + memcache_conn_error(conn); } /* * Start writing out the request data */ -void memcache_conn_send_req_data (struct memcache_conn *conn) { - if (conn->req->obj.bytes > 0) { +void memcache_conn_send_data (struct memcache_conn *conn) { + if (conn->send_req->obj.bytes > 0) { // set up the ev_write event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn); @@ -133,14 +182,15 @@ } else { // just send the \r\n - memcache_conn_finish_req_data(conn); + memcache_conn_send_end(conn); } } /* * Write out the final \r\n to terminate the request data */ -void memcache_conn_finish_req_data (struct memcache_conn *conn) { +void memcache_conn_send_end (struct memcache_conn *conn) { + // XXX: this will enable the bev by itself? if (bufferevent_write(conn->bev, "\r\n", 2)) PERROR("bufferevent_write"); @@ -152,12 +202,23 @@ } /* + * Finished sending the current send_req + */ +void memcache_conn_send_done (struct memcache_conn *conn) { + assert(conn->send_req != NULL); + + // send the next req, if there is one + memcache_conn_send_next(conn, NULL); + + // if pipelining is on, it's a question of how many we've sent... + if (conn->server->mc->pipeline_requests) + memcache_server_conn_ready(conn->server, conn); +} + +/* * Start reading a reply from the connection */ -void memcache_conn_handle_reply (struct memcache_conn *conn) { - // ensure that we either didn't have a command, or it has been sent - assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); - +void memcache_conn_recv_next (struct memcache_conn *conn) { // start/continue reading on the bufferevent if (bufferevent_enable(conn->bev, EV_READ)) PERROR("bufferevent_enable"); @@ -175,44 +236,45 @@ /* * Start reading reply data from the connection */ -void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) { +void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) { + struct memcache_req *req = TAILQ_FIRST(&conn->req_queue); int ret; // check that the buf doesn't contain any data - assert(conn->req->buf.data == NULL); + assert(req->buf.data == NULL); // bytes *may* be zero if we have an empty cache entry - if (conn->req->obj.bytes > 0) { + if (req->obj.bytes > 0) { // XXX: memcache_req_make_buffer? // allocate a buffer for the reply data - if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL) + if ((req->buf.data = malloc(req->obj.bytes)) == NULL) ERROR("malloc"); // update the length - conn->req->buf.len = conn->req->obj.bytes; + req->buf.len = req->obj.bytes; // set offset to zero - conn->req->buf.offset = 0; + req->buf.offset = 0; // and note that it is present, and is ours - conn->req->have_buf = 1; - conn->req->is_buf_ours = 1; + req->have_buf = 1; + req->is_buf_ours = 1; // do we have any data in the buf that we need to copy? if (evbuffer_get_length(buf) > 0) { // read the data into the memcache_buf - ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len); + ret = evbuffer_remove(buf, req->buf.data, req->buf.len); // sanity check... - assert(ret > 0 && ret <= conn->req->buf.len); + assert(ret > 0 && ret <= req->buf.len); // update offset - conn->req->buf.offset += ret; + req->buf.offset += ret; } // still need to receive more data? - if (conn->req->buf.offset < conn->req->buf.len) { + if (req->buf.offset < req->buf.len) { // disable the bufferevent while we read the data if (bufferevent_disable(conn->bev, EV_READ)) @@ -236,11 +298,8 @@ } - // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ - // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then - // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called. - // Elegant! - memcache_conn_handle_reply(conn); + // finish it off + memcache_conn_recv_end(conn); // ok return; @@ -250,6 +309,38 @@ } /* + * Receive the final bits of data following the reply data block + */ +void memcache_conn_recv_end (struct memcache_conn *conn) { + // we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the + // bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" + // after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause + // recv_done to be called. + // Elegant! + memcache_conn_recv_next(conn); +} + +/* + * We have finished receiving our current request + */ +void memcache_conn_recv_done (struct memcache_conn *conn) { + struct memcache_req *req = TAILQ_FIRST(&conn->req_queue); + + // we can remove it from the list now + TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node); + + // have the req detach + memcache_req_done(req); + + // and prepare to recv the next one + memcache_conn_recv_next(conn); + + // if pipelining is off, it's a question of when we've recevied the reply... + if (!conn->server->mc->pipeline_requests) + memcache_server_conn_ready(conn->server, conn); +} + +/* * The connect() has finished */ static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { @@ -273,6 +364,9 @@ // mark us as succesfully connected conn->is_connected = 1; + + // and prepare to recv any response headers + memcache_conn_recv_next(conn); // notify the server memcache_server_conn_ready(conn->server, conn); @@ -290,18 +384,20 @@ static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { struct memcache_conn *conn = arg; + assert(conn->send_req != NULL); + // the command header has been sent assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); // does this request have some data to be included in the request? // if the data has already been sent (we handle the final \r\n as well), then skip this. - if (conn->req->have_buf && conn->req->buf.offset == 0) { + if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) { // we need to send the request data next - memcache_conn_send_req_data(conn); + memcache_conn_send_data(conn); } else { - // wait for a reply - memcache_conn_handle_reply(conn); + // the request has now been sent, and se can send the next one + memcache_conn_send_done(conn); } } @@ -311,6 +407,7 @@ static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { struct memcache_conn *conn = arg; struct evbuffer *in_buf = bufferevent_get_input(bev); + struct memcache_req *req; struct memcache_key key; char *header_data; enum memcache_reply reply_type; @@ -321,8 +418,11 @@ // consume as much data as possible do { + // the req we are processing + req = TAILQ_FIRST(&conn->req_queue); + // attempt to parse the response header - if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) + if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data)) ERROR("memcache_cmd_parse_header"); if (!header_data) { @@ -330,25 +430,33 @@ return; } + // no request waiting? + if (req == NULL) + ERROR("got a response without any request pending: %s", header_data); + // if the reply contains data, check that they key is the same - if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)) - ERROR("got reply with wrong key !?!"); + if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0)) + ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf); + + // check it's a FETCH request + if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET) + ERROR("a data reply for a non-CMD_FETCH_* command !?!"); // notify the request (no reply data is ready for reading yet, though) - memcache_req_recv(conn->req, reply_type); + memcache_req_recv(req, reply_type); // does the reply include data? if (has_data) { // start reading the data (including whatever might be left over in the bufferevent buffer...) - memcache_conn_handle_reply_data(conn, in_buf); + memcache_conn_recv_data(conn, in_buf); } else { // the request is done with - memcache_conn_req_done(conn); + memcache_conn_recv_done(conn); } - // free the header data - free(header_data); + // free the header data, but not a second time on error exit + free(header_data); header_data = NULL; } while (evbuffer_get_length(in_buf) > 0); @@ -372,7 +480,7 @@ static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { struct memcache_conn *conn = arg; - struct memcache_buf *buf = &conn->req->buf; + struct memcache_buf *buf = &conn->send_req->buf; int ret; // correct event @@ -403,7 +511,7 @@ } else { // done! Send the terminating \r\n next - memcache_conn_finish_req_data(conn); + memcache_conn_send_end(conn); } // success @@ -416,7 +524,8 @@ static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) { struct memcache_conn *conn = arg; - struct memcache_buf *buf = &conn->req->buf; + struct memcache_req *req = TAILQ_FIRST(&conn->req_queue); + struct memcache_buf *buf = &req->buf; int ret; // correct event @@ -442,7 +551,7 @@ // only notify the req if new data was received, and we won't be calling req_done next. if (ret > 0 && buf->offset < buf->len) { // notify the req - memcache_req_data(conn->req); + memcache_req_data(req); } // data left to read? @@ -454,7 +563,7 @@ } else { // done! We can let the bufferenvet handle the rest of the reply now - memcache_conn_handle_reply(conn); + memcache_conn_recv_end(conn); } // success @@ -465,20 +574,21 @@ memcache_conn_error(conn); } -// XXX: need to flush/disable buffers/events on errors - /* * The entire connection failed */ static void memcache_conn_error (struct memcache_conn *conn) { - // fail the request, if we have one - if (conn->req) { + struct memcache_req *req; + + // fail all requests, if we have any + TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) { // error out the req - memcache_req_error(conn->req); - - // we are now available again - conn->req = NULL; + memcache_req_error(req); + + TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node); } + + conn->send_req = NULL; // close the connection memcache_conn_close(conn); @@ -487,22 +597,6 @@ memcache_server_conn_dead(conn->server, conn); } -/* - * Detach the request - */ -static void memcache_conn_req_done (struct memcache_conn *conn) { - // ensure that we do currently have a req - assert(conn->req); - - // have the req detach - memcache_req_done(conn->req); - - // we are now available again - conn->req = NULL; - - memcache_server_conn_ready(conn->server, conn); -} - void memcache_conn_close (struct memcache_conn *conn) { // close the fd if needed if (conn->fd > 0) { @@ -529,8 +623,8 @@ } void memcache_conn_free (struct memcache_conn *conn) { - // ensure we don't have a req bound to us - assert(conn->req == NULL); + // ensure we don't have any reqs bound to us + assert(TAILQ_EMPTY(&conn->req_queue)); // ensure that the connection is not considered to be connected anymore assert(!conn->is_connected); diff -r 1c67f512779b -r 10c7dce1a043 memcache/connection.h --- a/memcache/connection.h Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/connection.h Sat Aug 30 19:13:15 2008 +0300 @@ -8,6 +8,7 @@ #include #include "server.h" +#include "request.h" #include "command.h" struct memcache_conn { @@ -31,8 +32,14 @@ // have we succesfully connected yet? int is_connected; - // the request (if any) that we are currently processing - NULL for idle connections - struct memcache_req *req; + /* + * The list of requests that we are currently processing, and a pointer to the request that is currently being + * sent. + * + * New requests are enqueued on the end of this list. + */ + struct memcache_reqqueue_head req_queue; + struct memcache_req *send_req; // used to track our commands struct memcache_cmd cmd; diff -r 1c67f512779b -r 10c7dce1a043 memcache/memcache.c --- a/memcache/memcache.c Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/memcache.c Sat Aug 30 19:13:15 2008 +0300 @@ -6,14 +6,15 @@ #include "request.h" #include "../common.h" -struct memcache *memcache_alloc (memcache_cb cb_fn) { +struct memcache *memcache_alloc (memcache_cb cb_fn, char pipeline_requests) { struct memcache *mc = NULL; if ((mc = calloc(1, sizeof(*mc))) == NULL) ERROR("calloc"); - // store callback + // store attributes mc->cb_fn = cb_fn; + mc->pipeline_requests = pipeline_requests; // init server list LIST_INIT(&mc->server_list); @@ -32,7 +33,7 @@ struct memcache_server *server = NULL; // alloc the server - if ((server = memcache_server_alloc(endpoint, max_connections)) == NULL) + if ((server = memcache_server_alloc(mc, endpoint, max_connections)) == NULL) goto error; // enlist it diff -r 1c67f512779b -r 10c7dce1a043 memcache/memcache.h --- a/memcache/memcache.h Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/memcache.h Sat Aug 30 19:13:15 2008 +0300 @@ -11,6 +11,9 @@ // list of servers LIST_HEAD(memcache_serverlist_head, memcache_server) server_list; + + // should we "pipeline" requests? + char pipeline_requests; }; struct memcache_server *memcache_choose_server (struct memcache *mc, const struct memcache_key *key); diff -r 1c67f512779b -r 10c7dce1a043 memcache/request.h --- a/memcache/request.h Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/request.h Sat Aug 30 19:13:15 2008 +0300 @@ -4,7 +4,8 @@ #include #include "../memcache.h" -#include "connection.h" + +TAILQ_HEAD(memcache_reqqueue_head, memcache_req); struct memcache_req { // the memcache context that we belong to diff -r 1c67f512779b -r 10c7dce1a043 memcache/server.c --- a/memcache/server.c Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/server.c Sat Aug 30 19:13:15 2008 +0300 @@ -3,18 +3,24 @@ #include #include "server.h" +#include "memcache.h" #include "connection.h" #include "request.h" #include "../memcache.h" #include "../common.h" -struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections) { +struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections) { struct memcache_server *server = NULL; + // warn if nonsensical arguments + if (max_connections > 1 && mc->pipeline_requests) + WARNING("only one connection will ever be used with pipeline_requests"); + if ((server = calloc(1, sizeof(*server))) == NULL) ERROR("calloc"); // store the vars + server->mc = mc; server->endpoint = endpoint; server->max_connections = max_connections; @@ -113,7 +119,8 @@ // look for idle connections to service the request LIST_FOREACH(conn, &server->conn_list, connlist_node) { - if (memcache_conn_is_available(conn)) { + // handle pipelining as well + while (memcache_conn_is_available(conn)) { // remove the req from the queue and execute it TAILQ_REMOVE(&server->req_queue, req, reqqueue_node); diff -r 1c67f512779b -r 10c7dce1a043 memcache/server.h --- a/memcache/server.h Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache/server.h Sat Aug 30 19:13:15 2008 +0300 @@ -4,9 +4,14 @@ #include #include "../memcache.h" +#include "request.h" #include "../config.h" struct memcache_server { + // the memcache context we belong to + struct memcache *mc; + + // our endpoint struct config_endpoint *endpoint; // we are a member of struct memcache.server_list @@ -16,7 +21,7 @@ LIST_HEAD(memcache_connlist_head, memcache_conn) conn_list; // a list of enqueued requests waiting for a connection - TAILQ_HEAD(memcache_reqqueue_head, memcache_req) req_queue; + struct memcache_reqqueue_head req_queue; // how many connections we should have at most int max_connections; @@ -25,7 +30,7 @@ /* * Alloc and return a new memcache_server */ -struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections); +struct memcache_server *memcache_server_alloc (struct memcache *mc, struct config_endpoint *endpoint, int max_connections); /* * Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections, diff -r 1c67f512779b -r 10c7dce1a043 memcache_test.c --- a/memcache_test.c Fri Aug 29 23:31:17 2008 +0300 +++ b/memcache_test.c Sat Aug 30 19:13:15 2008 +0300 @@ -39,6 +39,7 @@ struct common { unsigned int max_connections; + char pipeline_requests; struct timeval start; } common; @@ -65,10 +66,18 @@ void benchmark_cb (struct memcache_req *req, void *arg); void benchmark_fetch_fn (void); +enum option_test { + TEST_INVALID, + TEST_COMMON, + + TEST_BENCH_FETCH, +}; + enum option_code { OPT_CODE_INVALID, COMMON_CONN_MAX, + COMMON_PIPELINE, BENCH_FETCH_REQ_CONCURRENCY, BENCH_FETCH_REQ_AMOUNT, BENCH_FETCH_KEY_PREFIX, @@ -83,6 +92,7 @@ enum option_type { OPT_TYPE_NONE, + OPT_TYPE_BOOL, OPT_TYPE_UINT, OPT_TYPE_STR, }; @@ -93,13 +103,19 @@ void (*test_fn) (void); + enum option_test code; + + char *descr; + } test_list[] = { - { "benchmark_fetch", &benchmark_cb, &benchmark_fetch_fn }, - { 0, 0, 0, } + { "benchmark_fetch", &benchmark_cb, &benchmark_fetch_fn, TEST_BENCH_FETCH, + "Measure the speed of fetch requests" }, + { 0, 0, 0, 0, 0 } }; static struct option options[] = { { "conn-max", required_argument, NULL, COMMON_CONN_MAX }, + { "pipeline", required_argument, NULL, COMMON_PIPELINE }, { "req-concurrency", required_argument, NULL, BENCH_FETCH_REQ_CONCURRENCY }, { "req-amount", required_argument, NULL, BENCH_FETCH_REQ_AMOUNT }, { "key-prefix", required_argument, NULL, BENCH_FETCH_KEY_PREFIX }, @@ -112,10 +128,16 @@ }; static struct opt { + enum option_test test; enum option_code code; enum option_type type; union opt_type_data { + struct { + char *value; + char default_value; + } bool; + struct { unsigned int *value; unsigned int default_value; @@ -126,17 +148,41 @@ const char *default_value; } str; } data; + + const char *name; + const char *descr; } option_info[OPT_CODE_MAX] = { - { OPT_CODE_INVALID, OPT_TYPE_NONE }, - { COMMON_CONN_MAX, OPT_TYPE_UINT, { .uint = { &common.max_connections, 1 }} }, - { BENCH_FETCH_REQ_CONCURRENCY, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.concurrent_ops, 1 }} }, - { BENCH_FETCH_REQ_AMOUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.total_ops, 500 }} }, - { BENCH_FETCH_KEY_PREFIX, OPT_TYPE_STR, { .str = { &benchmark_fetch.key_prefix, "bf_" }} }, - { BENCH_FETCH_KEY_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_min, 8 }} }, - { BENCH_FETCH_KEY_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_max, 8 }} }, - { BENCH_FETCH_KEY_COUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_count, 1 }} }, - { BENCH_FETCH_DATA_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_min, 64 }} }, - { BENCH_FETCH_DATA_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_max, 64 }} }, + { TEST_INVALID, OPT_CODE_INVALID, OPT_TYPE_NONE }, + + { TEST_COMMON, COMMON_CONN_MAX, OPT_TYPE_UINT, { .uint = { &common.max_connections, 1 }}, + "conn-max", "number of connections to use" }, + + { TEST_COMMON, COMMON_PIPELINE, OPT_TYPE_BOOL, { .bool = { &common.pipeline_requests, 1 }}, + "pipeline", "pipeline requests" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_REQ_CONCURRENCY, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.concurrent_ops, 1 }}, + "req-concurrency", "number of requests to have running" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_REQ_AMOUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.total_ops, 500 }}, + "req-amount", "number of requests to issue" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_KEY_PREFIX, OPT_TYPE_STR, { .str = { &benchmark_fetch.key_prefix, "bf_" }}, + "key-prefix", "key prefix" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_KEY_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_min, 8 }}, + "key-len-min", "minimum key length" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_KEY_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_max, 8 }}, + "key-len-max", "maximum key length" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_KEY_COUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_count, 1 }}, + "key-count", "how many keys to use" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_DATA_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_min, 64 }}, + "data-len-min", "minimum data length" }, + + { TEST_BENCH_FETCH, BENCH_FETCH_DATA_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_max, 64 }}, + "data-len-max", "maximum data length" }, }; void time_reset () { @@ -152,11 +198,15 @@ return ((double) (time.tv_sec - common.start.tv_sec)) + ((double) (time.tv_usec - common.start.tv_usec)) / 1000000; } +int ratelimit (int count, int total) { + return ((total / 10) ? (count % (total / 10) == 0) : 1); +} + void mc_init (int max_connections, memcache_cb cb_fn) { // memcache init - if ((mc = memcache_alloc(cb_fn)) == NULL) + if ((mc = memcache_alloc(cb_fn, common.pipeline_requests)) == NULL) ERROR("memcache_alloc"); // fix up the endpoint @@ -166,10 +216,10 @@ ERROR("config_endpoint_parse"); // add the server - if (memcache_add_server(mc, &server_endpoint, max_connections)) + if (memcache_add_server(mc, &server_endpoint, common.max_connections)) ERROR("memcache_add_server"); - INFO("[memcache] initialized with max_connections=%d", max_connections); + INFO("[memcache] initialized with pipeline_requests=%c max_connections=%d", common.pipeline_requests ? 't' : 'f', common.max_connections); return; @@ -181,7 +231,7 @@ const struct memcache_obj *obj; const struct memcache_buf *buf; - INFO("[%*s]: cmd=%s state=%s reply=%s", + INFO("[%.*s]: cmd=%s state=%s reply=%s", (int) memcache_req_key(req)->len, memcache_req_key(req)->buf, memcache_command_str(memcache_req_cmd(req)), memcache_state_str(memcache_req_state(req)), @@ -288,8 +338,8 @@ assert(memcache_fetch(mc, &benchmark_fetch.keys[random_value(0, benchmark_fetch.key_count)].key, NULL) != NULL); benchmark_fetch.cur_ops++; - - if ((benchmark_fetch.op_count + benchmark_fetch.cur_ops) % (benchmark_fetch.total_ops / 10) == 0) + + if (ratelimit(benchmark_fetch.op_count + benchmark_fetch.cur_ops, benchmark_fetch.total_ops)) INFO("[benchmark] %0.6f: %d+%d/%d requests", time_offset(), benchmark_fetch.op_count, benchmark_fetch.cur_ops, benchmark_fetch.total_ops @@ -357,8 +407,8 @@ } benchmark_fetch.keys_stored++; - - if (benchmark_fetch.keys_stored % (benchmark_fetch.key_count / 10) == 0) + + if (ratelimit(benchmark_fetch.keys_stored, benchmark_fetch.key_count)) INFO("[benchmark] %.6f: key %u/%u stored: %.*s", time_offset(), benchmark_fetch.keys_stored, benchmark_fetch.key_count, (int) memcache_req_key(req)->len, memcache_req_key(req)->buf ); @@ -419,27 +469,51 @@ } } +void usage_opt (enum option_test test) { + struct opt *opt; + + for (opt = option_info + 1; opt->code; opt++) { + if (opt->test == test) { + switch (opt->type) { + case OPT_TYPE_BOOL: + INFO("\t%-20s %c %s", opt->name, opt->data.bool.default_value ? 't' : 'f', opt->descr); + break; + + case OPT_TYPE_UINT: + INFO("\t%-20s %-6d %s", opt->name, opt->data.uint.default_value, opt->descr); + break; + + case OPT_TYPE_STR: + INFO("\t%-20s %-6s %s", opt->name, opt->data.str.default_value, opt->descr); + break; + + default: + assert(0); + } + } + } +} + void usage (char *cmd) { - INFO( - "Usage: %s [ ... ]\n" - "\n" - "COMMANDS\n" - "\n" - "benchmark_fetch:\n" - "\tMeasure the speed of fetch requests\n" - "\t\n" - "\tconn-max 1 number of connections to use\n" - "\treq-concurrency 1 number of requests to have running\n" - "\treq-amount 500 number of requests to issue\n" - "\tkey-prefix bf_ key prefix\n" - "\tkey-len-min 8 minimum key lengt\n" - "\tkey-len-max 8 maximum key length\n" - "\tkey-count 1 how many keys to use\n" - "\tdata-len-min 64 minimum data length\n" - "\tdata-len-max 64 maximum data length\n", + struct test *test; - cmd - ); + INFO("Usage: %s [ ... ]", cmd); + + INFO("\nCOMMON OPTIONS\n"); + + usage_opt(TEST_COMMON); + + INFO("\nCOMMANDS\n"); + + for (test = test_list; test->name; test++) { + INFO("%s:", test->name); + INFO("\t%s", test->descr); + INFO("\t"); + + usage_opt(test->code); + + INFO("\t"); + } exit(1); } @@ -472,6 +546,11 @@ switch (option_info[c].type) { case OPT_TYPE_NONE: break; + + case OPT_TYPE_BOOL: + *option_info[c].data.bool.value = option_info[c].data.bool.default_value; + + break; case OPT_TYPE_UINT: *option_info[c].data.uint.value = option_info[c].data.uint.default_value; @@ -491,8 +570,31 @@ while ((c = getopt_long(argc, argv, "", options, &option_index)) != -1) { if (c <= OPT_CODE_INVALID || c >= OPT_CODE_MAX) FATAL("invalid argument %s", options[option_index].name); + + if (option_info[c].test != test->code && option_info[c].test != TEST_COMMON) + FATAL("invalid option %s for test %s", options[option_index].name, test->name); switch (option_info[c].type) { + case OPT_TYPE_BOOL: + assert(optarg); + + switch (optarg[0]) { + case 't': + case '1': + *option_info[c].data.bool.value = 1; + break; + + case 'f': + case '0': + *option_info[c].data.bool.value = 0; + break; + + default: + FATAL("invalid true/false value: %s: %s", options[option_index].name, optarg); + } + + break; + case OPT_TYPE_UINT: assert(optarg);