# HG changeset patch # User Tero Marttila # Date 1220041877 -10800 # Node ID 1c67f512779bfdeddbe184c8d732ae031840cf1b # Parent a5c09677ca6fcde36019c73f17dffcbc74cc657e fix doc tpyos, rename some enums, fix printf format len for non-zero terminated strings (hg status), pass args to memcache_cmd_format_header via memcache_req_*, handle zero-length STORE requests, memcache_req is_buf_ours + free, other function name typos (keymemcache_req_key), fix req state behaviour re *_DATA_* for STORE requests and FETCH/END, better memcache_server connpool events/management, modular memcache_test with a working benchmark. This is a long commit message. diff -r a5c09677ca6f -r 1c67f512779b memcache.h --- a/memcache.h Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache.h Fri Aug 29 23:31:17 2008 +0300 @@ -43,7 +43,7 @@ /* * The length of the key buffer in bytes. This does not include a NUL byte. * - * If this is given as zero, then the length will be calculsted from buf using strlen(). Empty keys are not + * If this is given as zero, then the length will be calculated from buf using strlen(). Empty keys are not * allowed, so this will result in an error if buf is an empty string. */ size_t len; @@ -54,7 +54,7 @@ */ struct memcache_obj { /* - * An arbitrary 16-bit (32-bit for >1.2.1) that the server stores transparently. + * An arbitrary 16-bit (32-bit for >1.2.1) integer that the server stores transparently. */ unsigned int flags; @@ -230,7 +230,7 @@ /* * Request states */ -enum memcache_req_state { +enum memcache_state { MEMCACHE_STATE_INVALID, /* @@ -269,7 +269,7 @@ * * req_reply, req_obj and req_buf will all return non-NULL values, and buf.offset will be equal to buf.len. */ - MEMCACHE_STATE_DATA_DONE, + MEMCACHE_STATE_DONE_DATA, /* * An error has occurred. @@ -312,10 +312,12 @@ * --------------------------------------------- * STATE_QUEUE ? * STATE_SEND - * STATE_REPLY RPL_VALUE + * STATE_REPLY RPL_VALUE * STATE_REPLY_DATA * RPL_VALUE - * STATE_DATA_DONE RPL_END + * STATE_REPLY_DATA RPL_END + * STATE_DONE_DATA RPL_END * + * STATE_REPLY RPL_END * STATE_DONE RPL_END * * STATE_ERROR RPL_{ERROR,CLIENT_ERROR,SERVER_ERROR} @@ -353,7 +355,7 @@ * * Should always return a valid value. */ -enum memcache_req_state memcache_req_state (struct memcache_req *req); +enum memcache_state memcache_req_state (struct memcache_req *req); /* * Request command. @@ -365,7 +367,7 @@ /* * Request reply. * - * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DATA_DONE states. + * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DONE_DATA states. */ enum memcache_reply memcache_req_reply (struct memcache_req *req); @@ -374,19 +376,19 @@ * * Will return a valid valuein all states */ -const struct memcache_key *keymemcache_req_key (struct memcache_req *req); +const struct memcache_key *memcache_req_key (struct memcache_req *req); /* * Request data. * - * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DATA_DONE states. + * Will return a valid value in the STATE_REPLY, STATE_REPLY_DATA, STATE_DONE and STATE_DONE_DATA states. */ const struct memcache_obj *memcache_req_obj (struct memcache_req *req); /* * Request buf. * - * Will return a valid value in the STATE_REPLY_DATA and STATE_DATA_DONE states. + * Will return a valid value in the STATE_REPLY_DATA and STATE_DONE_DATA states. * * Note that buf.offset may be less than buf.len in the STATE_REPLY_DATA state. */ @@ -402,7 +404,7 @@ void memcache_req_abort (struct memcache_req *req); /* - * Free a req that is in the STATE_DONE, STATE_DATA_DONE or STATE_ERROR state. + * Free a req that is in the STATE_DONE, STATE_DONE_DATA or STATE_ERROR state. */ void memcache_req_free (struct memcache_req *req); @@ -411,6 +413,6 @@ */ const char *memcache_command_str (enum memcache_command cmd); const char *memcache_reply_str (enum memcache_reply reply); -const char *memcache_state_str (enum memcache_req_state state); +const char *memcache_state_str (enum memcache_state state); #endif /* MEMCACHE_H */ diff -r a5c09677ca6f -r 1c67f512779b memcache/command.c --- a/memcache/command.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/command.c Fri Aug 29 23:31:17 2008 +0300 @@ -62,7 +62,7 @@ return -1; } -int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) { +int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, const struct memcache_key *key, const struct memcache_obj *obj) { char *cmd_name; // valid command @@ -80,7 +80,7 @@ assert(key != NULL && obj == NULL); assert(key->len > 0 && key->buf != NULL); - if (evbuffer_add_printf(buf, "%s %*s\r\n", cmd_name, (int) key->len, key->buf) == -1) + if (evbuffer_add_printf(buf, "%s %.*s\r\n", cmd_name, (int) key->len, key->buf) == -1) ERROR("evbuffer_add_printf"); break; @@ -94,7 +94,7 @@ assert(key->len > 0 && key->buf != NULL); // XXX: ensure that we have a valid buf - if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes) == -1) + if (evbuffer_add_printf(buf, "%s %.*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes) == -1) ERROR("evbuffer_add_printf"); break; diff -r a5c09677ca6f -r 1c67f512779b memcache/command.h --- a/memcache/command.h Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/command.h Fri Aug 29 23:31:17 2008 +0300 @@ -17,7 +17,7 @@ * * This must be atomic, so if it fails, it must not modify buf. */ -int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj); +int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, const struct memcache_key *key, const struct memcache_obj *obj); /* * Attempt to parse a response line from the given buf. *header_data will be set to NULL if no complete response line diff -r a5c09677ca6f -r 1c67f512779b memcache/command.o Binary file memcache/command.o has changed diff -r a5c09677ca6f -r 1c67f512779b memcache/connection.c --- a/memcache/connection.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/connection.c Fri Aug 29 23:31:17 2008 +0300 @@ -11,6 +11,12 @@ #include "../socket.h" #include "../common.h" + +void memcache_conn_send_req_data (struct memcache_conn *conn); +void memcache_conn_finish_req_data (struct memcache_conn *conn); +void memcache_conn_handle_reply (struct memcache_conn *conn); +void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf); + static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); @@ -85,7 +91,11 @@ assert(conn->req == NULL); // write the request header into our bufferevent's output buffer - if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) { + if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), + memcache_req_cmd(req), + memcache_req_key(req), + memcache_req_obj(req) + )) { ERROR("failed to init the cmd"); } @@ -114,11 +124,17 @@ * Start writing out the request data */ void memcache_conn_send_req_data (struct memcache_conn *conn) { - // set up the ev_write - event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn); + if (conn->req->obj.bytes > 0) { + // set up the ev_write + event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn); - // just fake a call to the event handler - _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); + // just fake a call to the event handler + _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); + + } else { + // just send the \r\n + memcache_conn_finish_req_data(conn); + } } /* @@ -167,6 +183,8 @@ // bytes *may* be zero if we have an empty cache entry if (conn->req->obj.bytes > 0) { + // XXX: memcache_req_make_buffer? + // allocate a buffer for the reply data if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL) ERROR("malloc"); @@ -176,6 +194,10 @@ // set offset to zero conn->req->buf.offset = 0; + + // and note that it is present, and is ours + conn->req->have_buf = 1; + conn->req->is_buf_ours = 1; // do we have any data in the buf that we need to copy? if (evbuffer_get_length(buf) > 0) { @@ -453,7 +475,6 @@ if (conn->req) { // error out the req memcache_req_error(conn->req); - assert(conn->req->conn == NULL); // we are now available again conn->req = NULL; @@ -473,10 +494,9 @@ // ensure that we do currently have a req assert(conn->req); - // have the req detach and check it did so + // have the req detach memcache_req_done(conn->req); - assert(conn->req->conn == NULL); - + // we are now available again conn->req = NULL; diff -r a5c09677ca6f -r 1c67f512779b memcache/request.c --- a/memcache/request.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/request.c Fri Aug 29 23:31:17 2008 +0300 @@ -47,6 +47,7 @@ memcpy(&req->buf, buf, sizeof(req->buf)); req->have_buf = 1; + req->is_buf_ours = 0; // set offset to zero req->buf.offset = 0; @@ -70,7 +71,7 @@ } // accessors -enum memcache_req_state memcache_req_state (struct memcache_req *req) { +enum memcache_state memcache_req_state (struct memcache_req *req) { return req->state; } @@ -82,7 +83,7 @@ return req->reply_type; } -const struct memcache_key *keymemcache_req_key (struct memcache_req *req) { +const struct memcache_key *memcache_req_key (struct memcache_req *req) { return &req->key; } @@ -112,7 +113,8 @@ } void memcache_req_recv (struct memcache_req *req, enum memcache_reply reply_type) { - req->state = MEMCACHE_STATE_REPLY; + // set state to REPLY_DATA/REPLY based on have_buf/is_buf_ours + req->state = (req->have_buf && req->is_buf_ours) ? MEMCACHE_STATE_REPLY_DATA : MEMCACHE_STATE_REPLY; req->reply_type = reply_type; // we must surely have a valid obj now @@ -136,21 +138,8 @@ // make sure we are in the REPLY/REPLY_DATA state assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA); - // are we supposed to have data? - if (req->buf.data) { - // make sure we really have the full data, if applicable - assert(req->buf.offset == req->buf.len); - - // yes... - req->have_buf = 1; - - // have data - req->state = MEMCACHE_STATE_DATA_DONE; - - } else { - // no data - req->state = MEMCACHE_STATE_DONE; - } + // set state to DONE_DATA/DONE based on have_buf/is_buf_ours + req->state = (req->have_buf && req->is_buf_ours) ? MEMCACHE_STATE_DONE_DATA : MEMCACHE_STATE_DONE; // forget the connection req->conn = NULL; @@ -170,7 +159,10 @@ void memcache_req_free (struct memcache_req *req) { // must be unused assert(req->conn == NULL); - assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DATA_DONE); + assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DONE_DATA); + + if (req->have_buf && req->is_buf_ours) + free(req->buf.data); free(req->key.buf); free(req); diff -r a5c09677ca6f -r 1c67f512779b memcache/request.h --- a/memcache/request.h Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/request.h Fri Aug 29 23:31:17 2008 +0300 @@ -21,11 +21,11 @@ struct memcache_obj obj; struct memcache_buf buf; - // flags to indicate if we have valid obj/buf data - char have_obj, have_buf; + // flags to indicate if we have valid obj/buf data, and if buf.data was allocated by us or supplied by the user + char have_obj, have_buf, is_buf_ours; // our state - enum memcache_req_state state; + enum memcache_state state; // our user callback argument void *cb_arg; @@ -81,12 +81,15 @@ * The request was sent and is now done, and is not associated with the connection anymore. * * This will be called before req_reply/req_data, but not before/after req_error. + * + * The request will not be accessed anymore after calling this. */ void memcache_req_done (struct memcache_req *req); /* - * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued - * anymore, and the req should not be accessed by memcache_* code after this. + * An error occurred, and the request must be abandoned. The request is not active or enqueued anymore. + * + * The request will not be accessed anymore after calling this. */ void memcache_req_error (struct memcache_req *req); diff -r a5c09677ca6f -r 1c67f512779b memcache/server.c --- a/memcache/server.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/server.c Fri Aug 29 23:31:17 2008 +0300 @@ -23,8 +23,7 @@ TAILQ_INIT(&server->req_queue); // grow connpool so as to have a connection ready - if (memcache_server_grow_connpool(server)) - ERROR("error opening initial connection"); + memcache_server_grow_connpool(server); // success return server; @@ -35,34 +34,43 @@ return NULL; } -int memcache_server_grow_connpool (struct memcache_server *server) { - struct memcache_conn *conn; - int count; +void memcache_server_grow_connpool (struct memcache_server *server) { + struct memcache_conn *conn; + int count = 0; - // count connections - for (count = 0, conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next, count++) ; + // count connections + LIST_FOREACH(conn, &server->conn_list, connlist_node) { + count++; + } - // room for more? - if (count < server->max_connections) { + // room for more? + if (count < server->max_connections) { // create a new one if ((conn = memcache_conn_open(server)) == NULL) - return -1; - + ERROR("failed to grow the connpool"); + // enlist it LIST_INSERT_HEAD(&server->conn_list, conn, connlist_node); // the connection will call memcache_server_coon_ready once it's ready for use... - } - - // success - return 0; + } + + // ok + return; + +error: + // XXX: we might be deadlocked now... requests queued, but no connections! + if (LIST_EMPTY(&server->conn_list) && !TAILQ_EMPTY(&server->req_queue)) + FATAL("deadlock; requests queued, but no connections"); + + // XXX: harmless... but need some retry logic } int memcache_server_add_req (struct memcache_server *server, struct memcache_req *req) { struct memcache_conn *conn; // look for an idle connection - for (conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next) { + LIST_FOREACH(conn, &server->conn_list, connlist_node) { if (memcache_conn_is_available(conn)) { // we found an idle connection break; @@ -85,34 +93,60 @@ // notify the req memcache_req_queued(req); + // grow the connpool, as we apparently don't have enough connections + memcache_server_grow_connpool(server); + return 0; } } +/* + * We might have available connections, process any queued requests, or try and grow the connpool if non available + */ +void memache_server_dequeue (struct memcache_server *server) { + struct memcache_conn *conn; + struct memcache_req *req; + + // if no requests are queued, nothing needs doing + if ((req = TAILQ_FIRST(&server->req_queue)) == NULL) + return; + + // look for idle connections to service the request + LIST_FOREACH(conn, &server->conn_list, connlist_node) { + if (memcache_conn_is_available(conn)) { + // remove the req from the queue and execute it + TAILQ_REMOVE(&server->req_queue, req, reqqueue_node); + + // this will take care of any error handling by itself + memcache_conn_do_req(conn, req); + + // if that was the last req, return, otherwise continue + if ((req = TAILQ_FIRST(&server->req_queue)) == NULL) + return; + } + } + + // no idle connections remaining, try and grow if applicable + memcache_server_grow_connpool(server); +} + void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn) { assert(server == conn->server); - - // do we have any queued requests waiting? - struct memcache_req *req = server->req_queue.tqh_first; - - if (req) { - // remove it from the queue and execute it - TAILQ_REMOVE(&server->req_queue, req, reqqueue_node); - - // this will take care of any error handling by itself - memcache_conn_do_req(conn, req); - } + + // grab the next queued request + memache_server_dequeue(server); } void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn) { assert(server == conn->server); - // XXX: reconnect/error out requests? - // remove it from the list LIST_REMOVE(conn, connlist_node); // free it memcache_conn_free(conn); + + // this should grow the connpool back again + memache_server_dequeue(server); } diff -r a5c09677ca6f -r 1c67f512779b memcache/server.h --- a/memcache/server.h Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/server.h Fri Aug 29 23:31:17 2008 +0300 @@ -31,7 +31,7 @@ * Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections, * otherwise the new connection will be opened and added to the conn_list. */ -int memcache_server_grow_connpool (struct memcache_server *server); +void memcache_server_grow_connpool (struct memcache_server *server); /* * Process the given request on this server. diff -r a5c09677ca6f -r 1c67f512779b memcache/strings.c --- a/memcache/strings.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache/strings.c Fri Aug 29 23:31:17 2008 +0300 @@ -34,7 +34,7 @@ } } -const char *memcache_state_str (enum memcache_req_state state) { +const char *memcache_state_str (enum memcache_state state) { switch (state) { CASE_STR(MEMCACHE_STATE_INVALID); CASE_STR(MEMCACHE_STATE_QUEUED); @@ -42,7 +42,7 @@ CASE_STR(MEMCACHE_STATE_REPLY); CASE_STR(MEMCACHE_STATE_REPLY_DATA); CASE_STR(MEMCACHE_STATE_DONE); - CASE_STR(MEMCACHE_STATE_DATA_DONE); + CASE_STR(MEMCACHE_STATE_DONE_DATA); CASE_STR(MEMCACHE_STATE_ERROR); CASE_DEFAULT(MEMCACHE_STATE); } diff -r a5c09677ca6f -r 1c67f512779b memcache_test.c --- a/memcache_test.c Thu Aug 28 03:14:07 2008 +0300 +++ b/memcache_test.c Fri Aug 29 23:31:17 2008 +0300 @@ -1,3 +1,11 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include #include #include @@ -7,6 +15,12 @@ #include "config.h" #include "common.h" +/* + * Test: + * * zero-length entries + */ + +static struct event_base *ev_base; static struct memcache *mc; static struct config_endpoint server_endpoint; static char *data_1 = "rei4quohV8Oocio1ua0co8ni4Ae1re4houcheixahchoh3ioghie0aShooShoh6Ahboequ9eiX5eashuu6Chu1quo" @@ -15,33 +29,134 @@ static char *data_2 = "iefaek7ighi5UpueThageish5ieshohyeil1raiceerahjahng5ui7vuzie9quu4dai5ar2aiXi5ieth4looweigi" "e3fo5ieri1queengaiphuaghaic1xahvoo9joo6baiNaig8puCootheowah4moocohDoiquoh3quieka5ao3aeNg9" "Aimei1soangu4Duch5pho5buu2ohzaich4chahz9iTh3Pei4beep1ongie6au1aafoosh2vierei5E"; - -void _memcache_cb (struct memcache_req *req, void *arg) { - char *key = arg; - const struct memcache_obj *obj; - const struct memcache_buf *buf; + +#define BENCHMARK_KEY "memcache_benchmark" +#define BENCHMARK_KEY_MAX 256 +#define BENCHMARK_DATA_MAX 1024 * 1024 - INFO("[%s]: cmd=%15s state=%15s reply=%15s", key, - memcache_command_str(memcache_req_cmd(req)), - memcache_state_str(memcache_req_state(req)), - memcache_reply_str(memcache_req_reply(req)) - ); +#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#define MAX(a, b) ((a) > (b) ? (a) : (b)) + +struct common { + unsigned int max_connections; - if ((obj = memcache_req_obj(req))) - INFO("\tobj: flags=0x%04X exptime=%9zu bytes=%6zu cas=%llu", obj->flags, obj->exptime, obj->bytes, obj->cas); + struct timeval start; +} common; - if ((buf = memcache_req_buf(req))) - INFO("\tbuf: data=%p len=%6zu offset=%6zu", buf->data, buf->len, buf->offset); +struct benchmark_fetch { + unsigned int concurrent_ops; + unsigned int total_ops; - INFO("%s", ""); + const char *key_prefix; + unsigned int key_len_min, key_len_max, key_count; + unsigned int data_len_min, data_len_max; + + int keys_stored; + int cur_ops; + int op_count; + + struct key_buf { + char buf[BENCHMARK_KEY_MAX]; + struct memcache_key key; + } *keys; + +} benchmark_fetch; + +void benchmark_cb (struct memcache_req *req, void *arg); +void benchmark_fetch_fn (void); + +enum option_code { + OPT_CODE_INVALID, + + COMMON_CONN_MAX, + BENCH_FETCH_REQ_CONCURRENCY, + BENCH_FETCH_REQ_AMOUNT, + BENCH_FETCH_KEY_PREFIX, + BENCH_FETCH_KEY_LEN_MIN, + BENCH_FETCH_KEY_LEN_MAX, + BENCH_FETCH_KEY_COUNT, + BENCH_FETCH_DATA_LEN_MIN, + BENCH_FETCH_DATA_LEN_MAX, + + OPT_CODE_MAX, +}; + +enum option_type { + OPT_TYPE_NONE, + OPT_TYPE_UINT, + OPT_TYPE_STR, +}; + +static struct test { + char *name; + memcache_cb cb_fn; + + void (*test_fn) (void); + +} test_list[] = { + { "benchmark_fetch", &benchmark_cb, &benchmark_fetch_fn }, + { 0, 0, 0, } +}; + +static struct option options[] = { + { "conn-max", required_argument, NULL, COMMON_CONN_MAX }, + { "req-concurrency", required_argument, NULL, BENCH_FETCH_REQ_CONCURRENCY }, + { "req-amount", required_argument, NULL, BENCH_FETCH_REQ_AMOUNT }, + { "key-prefix", required_argument, NULL, BENCH_FETCH_KEY_PREFIX }, + { "key-len-min", required_argument, NULL, BENCH_FETCH_KEY_LEN_MIN }, + { "key-len-max", required_argument, NULL, BENCH_FETCH_KEY_LEN_MAX }, + { "key-count", required_argument, NULL, BENCH_FETCH_KEY_COUNT }, + { "data-len-min", required_argument, NULL, BENCH_FETCH_DATA_LEN_MIN }, + { "data-len-max", required_argument, NULL, BENCH_FETCH_DATA_LEN_MAX }, + { 0, 0, 0, 0 }, +}; + +static struct opt { + enum option_code code; + enum option_type type; + + union opt_type_data { + struct { + unsigned int *value; + unsigned int default_value; + } uint; + + struct { + const char **value; + const char *default_value; + } str; + } data; +} option_info[OPT_CODE_MAX] = { + { OPT_CODE_INVALID, OPT_TYPE_NONE }, + { COMMON_CONN_MAX, OPT_TYPE_UINT, { .uint = { &common.max_connections, 1 }} }, + { BENCH_FETCH_REQ_CONCURRENCY, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.concurrent_ops, 1 }} }, + { BENCH_FETCH_REQ_AMOUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.total_ops, 500 }} }, + { BENCH_FETCH_KEY_PREFIX, OPT_TYPE_STR, { .str = { &benchmark_fetch.key_prefix, "bf_" }} }, + { BENCH_FETCH_KEY_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_min, 8 }} }, + { BENCH_FETCH_KEY_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_len_max, 8 }} }, + { BENCH_FETCH_KEY_COUNT, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.key_count, 1 }} }, + { BENCH_FETCH_DATA_LEN_MIN, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_min, 64 }} }, + { BENCH_FETCH_DATA_LEN_MAX, OPT_TYPE_UINT, { .uint = { &benchmark_fetch.data_len_max, 64 }} }, +}; + +void time_reset () { + // start timing + assert(gettimeofday(&common.start, NULL) == 0); } -void begin_test () { - struct memcache_key key_1, key_2; - struct memcache_obj obj_1, obj_2; - struct memcache_buf buf_1, buf_2; +double time_offset () { + struct timeval time; - if ((mc = memcache_alloc(&_memcache_cb)) == NULL) + assert(gettimeofday(&time, NULL) == 0); + + return ((double) (time.tv_sec - common.start.tv_sec)) + ((double) (time.tv_usec - common.start.tv_usec)) / 1000000; +} + + + +void mc_init (int max_connections, memcache_cb cb_fn) { + // memcache init + if ((mc = memcache_alloc(cb_fn)) == NULL) ERROR("memcache_alloc"); // fix up the endpoint @@ -51,9 +166,84 @@ ERROR("config_endpoint_parse"); // add the server - if (memcache_add_server(mc, &server_endpoint, 1)) + if (memcache_add_server(mc, &server_endpoint, max_connections)) ERROR("memcache_add_server"); - + + INFO("[memcache] initialized with max_connections=%d", max_connections); + + return; + +error: + assert(0); +} + +void dump_req (struct memcache_req *req, void *unused) { + const struct memcache_obj *obj; + const struct memcache_buf *buf; + + INFO("[%*s]: cmd=%s state=%s reply=%s", + (int) memcache_req_key(req)->len, memcache_req_key(req)->buf, + memcache_command_str(memcache_req_cmd(req)), + memcache_state_str(memcache_req_state(req)), + memcache_reply_str(memcache_req_reply(req)) + ); + + if ((obj = memcache_req_obj(req))) + INFO("\tobj: flags=0x%04X exptime=%zu bytes=%zu cas=%llu", obj->flags, obj->exptime, obj->bytes, obj->cas); + + if ((buf = memcache_req_buf(req))) + INFO("\tbuf: data=%p len=%zu offset=%zu", buf->data, buf->len, buf->offset); + + INFO("%s", ""); +} + +size_t random_value (size_t min, size_t max) { + return ((max == min) ? min : (random() % (max - min)) + min); +} + +size_t random_data (char *buf, size_t min, size_t max) { +#define CHAR_TABLE_MAX (('z' - 'a' + 1) + ('Z' - 'A' + 1) + ('9' - '0' + 1)) + + static char char_table[CHAR_TABLE_MAX] = { + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' + }; + + assert(max >= min); + + size_t size = random_value(min, max), i; + + for (i = 0; i < size; i++) { + buf[i] = char_table[random_value(0, CHAR_TABLE_MAX)]; + + assert(isalnum(buf[i])); +/* + switch (MIN((size - i), 4)) { + case 4: * ((u_int32_t*) &buf[i]) = random(); i += 4; break; + case 3: + case 2: * ((u_int16_t*) &buf[i]) = random(); i += 2; break; + case 1: * ((u_int8_t*) &buf[i]) = random(); i += 1; break; + default: assert(0); + } +*/ + } + + return size; +} + +void test_cb (struct memcache_req *req, void *arg) { + dump_req(req, arg); +} + +void begin_test () { + struct memcache_key key_1, key_2; + struct memcache_obj obj_1, obj_2; + struct memcache_buf buf_1, buf_2; + struct memcache_req *req_1s, *req_2s, *req_1f, *req_2f; + + mc_init(1, &test_cb); + // add a request or two key_1.buf = "memcache_test_k1"; key_2.buf = "memcache_test_k2"; @@ -76,36 +266,280 @@ buf_2.len = strlen(data_2); buf_2.offset = buf_2.len; - if (memcache_store(mc, MEMCACHE_CMD_STORE_SET, &key_1, &obj_1, &buf_1, key_1.buf)) + if ((req_1s = memcache_store(mc, MEMCACHE_CMD_STORE_SET, &key_1, &obj_1, &buf_1, key_1.buf)) == NULL) ERROR("memcache_store: key_1"); - if (memcache_store(mc, MEMCACHE_CMD_STORE_ADD, &key_2, &obj_2, &buf_2, key_2.buf)) + if ((req_2s = memcache_store(mc, MEMCACHE_CMD_STORE_ADD, &key_2, &obj_2, &buf_2, key_2.buf)) == NULL) ERROR("memcache_store: key_2"); - if (memcache_fetch(mc, &key_1, key_1.buf)) + if ((req_1f = memcache_fetch(mc, &key_1, key_1.buf)) == NULL) ERROR("memcache_fetch: key_1"); - if (memcache_fetch(mc, &key_2, key_2.buf)) + if ((req_2f = memcache_fetch(mc, &key_2, key_2.buf)) == NULL) ERROR("memcache_fetch: key_2"); error: return; } +void benchmark_continue () { + while (benchmark_fetch.cur_ops < benchmark_fetch.concurrent_ops && (benchmark_fetch.op_count + benchmark_fetch.cur_ops) < benchmark_fetch.total_ops) { + // launch + assert(memcache_fetch(mc, &benchmark_fetch.keys[random_value(0, benchmark_fetch.key_count)].key, NULL) != NULL); + + benchmark_fetch.cur_ops++; + + if ((benchmark_fetch.op_count + benchmark_fetch.cur_ops) % (benchmark_fetch.total_ops / 10) == 0) + INFO("[benchmark] %0.6f: %d+%d/%d requests", + time_offset(), + benchmark_fetch.op_count, benchmark_fetch.cur_ops, benchmark_fetch.total_ops + ); + } + + if (benchmark_fetch.op_count == benchmark_fetch.total_ops) { + // done + assert(event_base_loopexit(ev_base, NULL) == 0); + + INFO("[benchmark] %.6f: %.6f req/s", + time_offset(), + benchmark_fetch.op_count / time_offset() + ); + } +} + +void benchmark_fetch_start () { + INFO( + "[benchmark] %0.6f: starting\n" + "\tconcurrent_ops = %u\n" + "\ttotal_ops = %u\n" + "\tkey_prefix = %s\n" + "\tkey_len_min = %u\n" + "\tkey_len_max = %u\n" + "\tkey_count = %u\n" + "\tdata_len_min = %u\n" + "\tdata_len_max = %u\n" + , time_offset(), + benchmark_fetch.concurrent_ops, benchmark_fetch.total_ops, + benchmark_fetch.key_prefix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max, benchmark_fetch.key_count, + benchmark_fetch.data_len_min, benchmark_fetch.data_len_max + ); + + time_reset(); + + benchmark_continue(); + + INFO("[benchmark] %0.6f: running", + time_offset() + ); +} + + +void benchmark_cb (struct memcache_req *req, void *arg) { + enum memcache_command cmd = memcache_req_cmd(req); + enum memcache_state state = memcache_req_state(req); + + if (state == MEMCACHE_STATE_ERROR) { + dump_req(req, arg); + FATAL("request failed"); + } + + if (state == MEMCACHE_STATE_DONE || state == MEMCACHE_STATE_DONE_DATA) { + if (cmd == MEMCACHE_CMD_FETCH_GET) { + benchmark_fetch.cur_ops--; + benchmark_fetch.op_count++; + + benchmark_continue(); + + } else if (cmd == MEMCACHE_CMD_STORE_SET) { + if (memcache_req_reply(req) != MEMCACHE_RPL_STORED) { + dump_req(req, arg); + WARNING("value was not stored"); + } + + benchmark_fetch.keys_stored++; + + if (benchmark_fetch.keys_stored % (benchmark_fetch.key_count / 10) == 0) + INFO("[benchmark] %.6f: key %u/%u stored: %.*s", + time_offset(), benchmark_fetch.keys_stored, benchmark_fetch.key_count, (int) memcache_req_key(req)->len, memcache_req_key(req)->buf + ); + + if (benchmark_fetch.keys_stored == benchmark_fetch.key_count) + benchmark_fetch_start(); + } + + memcache_req_free(req); + } +} + +/* + * Run ops in parrallel, until we have completed total_ops, at which point we shut down. + */ +void benchmark_fetch_fn () { + static char data[BENCHMARK_DATA_MAX]; + char key_postfix[BENCHMARK_KEY_MAX]; + int i, key_len, data_len; + struct memcache_obj obj; + struct memcache_buf buf; + + assert(benchmark_fetch.key_len_min > 0 && (strlen(benchmark_fetch.key_prefix) + benchmark_fetch.key_len_max) < BENCHMARK_KEY_MAX); + assert(benchmark_fetch.data_len_min > 0 && benchmark_fetch.data_len_max < BENCHMARK_DATA_MAX); + + benchmark_fetch.cur_ops = benchmark_fetch.op_count = benchmark_fetch.keys_stored = 0; + + if ((benchmark_fetch.keys = calloc(benchmark_fetch.key_count, sizeof(struct key_buf))) == NULL) + FATAL("calloc"); + + // pregenerate the data + data_len = random_data(data, benchmark_fetch.data_len_min, benchmark_fetch.data_len_max); + + obj.flags = 0x1234; + obj.exptime = 0; + obj.bytes = data_len; + buf.data = data; + buf.len = buf.offset = data_len; + + // insert keys + INFO("[benchmark] %0.6f: inserting %u keys with prefix=%s and len=(%u -> %u)", + time_offset(), + benchmark_fetch.key_count, benchmark_fetch.key_prefix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max + ); + + for (i = 0; i < benchmark_fetch.key_count; i++) { + struct key_buf *keybuf = &benchmark_fetch.keys[i]; + + key_len = random_data(key_postfix, benchmark_fetch.key_len_min, benchmark_fetch.key_len_max); + + key_postfix[key_len] = '\0'; + + assert((keybuf->key.len = snprintf(keybuf->buf, BENCHMARK_KEY_MAX, "%s%*s", benchmark_fetch.key_prefix, key_len, key_postfix)) < BENCHMARK_KEY_MAX); + + keybuf->key.buf = keybuf->buf; + + assert(memcache_store(mc, MEMCACHE_CMD_STORE_SET, &keybuf->key, &obj, &buf, NULL) != NULL); + } +} + +void usage (char *cmd) { + INFO( + "Usage: %s [ ... ]\n" + "\n" + "COMMANDS\n" + "\n" + "benchmark_fetch:\n" + "\tMeasure the speed of fetch requests\n" + "\t\n" + "\tconn-max 1 number of connections to use\n" + "\treq-concurrency 1 number of requests to have running\n" + "\treq-amount 500 number of requests to issue\n" + "\tkey-prefix bf_ key prefix\n" + "\tkey-len-min 8 minimum key lengt\n" + "\tkey-len-max 8 maximum key length\n" + "\tkey-count 1 how many keys to use\n" + "\tdata-len-min 64 minimum data length\n" + "\tdata-len-max 64 maximum data length\n", + + cmd + ); + + exit(1); +} + int main (int argc, char **argv) { - // libevent init - struct event_base *ev_base = event_init(); + char *name, *invalid; + struct test *test; + int c, option_index; + + // argument-parsing + if (argc < 2) { + WARNING("No command given"); + usage(argv[0]); + } + + // look up the test + name = argv[1]; + test = test_list; + + while (test->name && strcmp(test->name, name) != 0) + test++; + + if (!test->name) { + WARNING("Unknown cmd '%s'", name); + usage(argv[0]); + } + + // default values + for (c = OPT_CODE_INVALID; c < OPT_CODE_MAX; c++) { + switch (option_info[c].type) { + case OPT_TYPE_NONE: + break; + + case OPT_TYPE_UINT: + *option_info[c].data.uint.value = option_info[c].data.uint.default_value; + + break; + + case OPT_TYPE_STR: + *option_info[c].data.str.value = option_info[c].data.str.default_value; + + break; + + default: + assert(0); + } + } + + while ((c = getopt_long(argc, argv, "", options, &option_index)) != -1) { + if (c <= OPT_CODE_INVALID || c >= OPT_CODE_MAX) + FATAL("invalid argument %s", options[option_index].name); + + switch (option_info[c].type) { + case OPT_TYPE_UINT: + assert(optarg); + + *option_info[c].data.uint.value = strtol(optarg, &invalid, 10); + + if (*invalid) + FATAL("invalid argument value: %s: %s (%s)", options[option_index].name, optarg, invalid); + + break; + + case OPT_TYPE_STR: + assert(optarg); + + *option_info[c].data.str.value = optarg; + + break; + + case OPT_TYPE_NONE: + default: + FATAL("invalid argument type %s", options[option_index].name); + + break; + } + } + + // libevent init + ev_base = event_init(); if (!ev_base) FATAL("event_init"); - begin_test(); + // set up the memcache + mc_init(common.max_connections, test->cb_fn); + + // start timing + time_reset(); + + // start the test + test->test_fn(); + + INFO("[libevent] run"); // run the libevent mainloop if (event_base_dispatch(ev_base)) WARNING("event_dispatch"); - INFO("SHUTDOWN"); + INFO("[libevent] shutdown"); // clean up event_base_free(ev_base); diff -r a5c09677ca6f -r 1c67f512779b socket.c --- a/socket.c Thu Aug 28 03:14:07 2008 +0300 +++ b/socket.c Fri Aug 29 23:31:17 2008 +0300 @@ -101,6 +101,7 @@ static int _socket_do (struct config_endpoint *endpoint, int *sock, int sock_type, struct sockaddr_storage *addr, enum socket_op sockop) { struct addrinfo *res = NULL, *info, _fake_res; struct sockaddr_un _fake_addr_un; + int err = -1; if (endpoint->family == PF_UNIX) { // getaddrinfo doesn't handle PF_UNIX, so we need to build a fake result @@ -189,13 +190,14 @@ if (*sock == -1) ERROR("no working results from getaddrinfo: %s#%s", endpoint->af.inet.addr, endpoint->af.inet.port); - - return 0; + + // success + err = 0; error: if (res != 0 && res != &_fake_res) freeaddrinfo(res); - return -1; + return err; }