added the beginnings of the memcache client module (only up to connect() yet)
authorTero Marttila <terom@fixme.fi>
Tue, 26 Aug 2008 01:30:53 +0300
changeset 38 9894df13b779
parent 37 f0188b445c84
child 39 0e21a65074a6
added the beginnings of the memcache client module (only up to connect() yet)
Makefile
memcache.h
memcache/connection.c
memcache/connection.h
memcache/memcache.c
memcache/memcache.h
memcache/request.h
memcache/server.c
memcache/server.h
memcache_test.c
socket.h
--- a/Makefile	Sat Aug 09 20:11:59 2008 +0300
+++ b/Makefile	Tue Aug 26 01:30:53 2008 +0300
@@ -7,7 +7,7 @@
 
 SRCS = *.c cache/*.c cache/engines/*.c
 OBJS = *.o
-EXECS = file_main web_main node_main cache_test
+EXECS = file_main web_main node_main cache_test memcache_test
 
 all: depend ${EXECS}
 
@@ -16,6 +16,7 @@
 web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
 coro_test: coro_test.o common.o config.o socket.o
 cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
+memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o socket.o config.o
 
 clean :
 	-rm *.o ${EXECS}
@@ -40,6 +41,12 @@
 http.o: lib/libevent-dev/include/event-config.h
 http.o: lib/libevent-dev/include/event2/util.h http.h
 http.o: lib/libevent-dev/include/event2/http.h common.h
+memcache_test.o: lib/libevent-dev/include/event2/event.h
+memcache_test.o: lib/libevent-dev/include/event-config.h
+memcache_test.o: lib/libevent-dev/include/event2/util.h
+memcache_test.o: lib/libevent-dev/include/event2/event_compat.h
+memcache_test.o: lib/libevent-dev/include/event2/event_struct.h memcache.h
+memcache_test.o: config.h common.h
 node_main.o: lib/libevent-dev/include/event2/event.h
 node_main.o: lib/libevent-dev/include/event-config.h
 node_main.o: lib/libevent-dev/include/event2/util.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache.h	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,97 @@
+#ifndef MEMCACHE_H
+#define MEMCACHE_H
+
+/*
+ * A libevent based memcached client that aims for high performance, concurrency and low latency.
+ */
+
+#include "config.h"
+
+/*
+ * Used to store the global information for a memcache context. A context contains both servers and active connections.
+ */
+struct memcache;
+
+/*
+ * A transaction.
+ */
+struct memcache_req;
+
+/*
+ * Keys used
+ */
+struct memcache_key {
+    const char *buf;
+    size_t len;
+};
+
+/*
+ * Object attributes
+ */
+struct memcache_obj {
+    unsigned int flags;
+    time_t exptime;
+    size_t bytes;
+    unsigned long long cas;
+};
+
+/*
+ * Available commands
+ */
+enum memcache_command {
+    FETCH_GET,
+    STORE_SET,
+    STORE_ADD,
+    STORE_REPLACE,
+    STORE_APPEND,
+    STORE_PREPEND,
+    STORE_CAS,
+};
+
+enum memcache_state {
+    STATE_INVALID,
+
+    STATE_ERROR,
+};
+
+/*
+ * Callback used
+ */
+typedef int (*memcache_cb) (struct memcache_req *, void*);
+
+/*
+ * Allocate a new memcache context for use with other methods.
+ */
+struct memcache *memcache_alloc (memcache_cb cb_fn);
+
+/*
+ * Add a server to the pool of available servers.
+ */
+int memcache_add_server (struct memcache *mc, struct config_endpoint *endpoint, int max_connections);
+
+/*
+ * Attempt to fetch a key from the cache.
+ */
+struct memcache_req *memcache_fetch (struct memcache *mc, const struct memcache_key *key, void *cb_arg);
+
+/*
+ * Attempt to store a key into the cache
+ */
+struct memcache_req *memcache_store (struct memcache *mc, enum memcache_command cmd, const struct memcache_key *key, const struct memcache_obj *obj, void *cb_arg);
+
+/*
+ * Request state
+ */ 
+int memcache_req_state (struct memcache_req *req, enum memcache_state *state);
+
+/*
+ * Request key
+ */
+int memcache_req_key (struct memcache_req *req, const struct memcache_key *key);
+
+/*
+ * Request data
+ */
+int memcache_req_obj (struct memcache_req *req, const struct memcache_obj *obj);
+
+#endif /* MEMCACHE_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/connection.c	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,86 @@
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <assert.h>
+
+#include "connection.h"
+#include "../socket.h"
+#include "../common.h"
+
+static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
+
+struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
+    struct memcache_conn *conn = NULL;
+
+    if ((conn = calloc(1, sizeof(*conn))) == NULL)
+        ERROR("calloc");
+    
+    // remember the server
+    conn->server = server;
+    
+    // attempt connect
+    if (memcache_conn_connect(conn))
+        ERROR("failed to connect to server");
+    
+    // success
+    return conn;
+
+error:
+    free(conn);
+
+    return NULL;
+}
+
+int memcache_conn_connect (struct memcache_conn *conn) {
+    assert(conn->fd <= 0);
+    
+    // begin connect
+    if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
+        goto error;
+
+    // set up the connect event
+    event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
+
+    // add it
+    if (event_add(&conn->ev, NULL))
+        PERROR("event_add");
+    
+    // success
+    return 0;
+
+error:
+    if (conn->fd > 0) {
+        if (close(conn->fd))
+            PWARNING("close %d", conn->fd);
+        
+        conn->fd = -1;
+    }
+   
+    return -1;
+}
+
+int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
+    assert(conn->fd > 0);
+    assert(conn->req == NULL);
+
+    // store the req
+    conn->req = req;
+
+    // XXX: transmit it
+
+error:
+    return -1;
+}
+
+/*
+ * The connect() has finished
+ */
+static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
+    struct memcache_conn *conn = arg;
+
+   // XXX: check if the connect() failed
+
+    // notify the server
+    memcache_server_conn_ready(conn->server, conn);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/connection.h	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,45 @@
+#ifndef MEMCACHE_CONNECTION_H
+#define MEMCACHE_CONNECTION_H
+
+#include <sys/queue.h>
+
+#include <event2/event.h>
+#include <event2/event_struct.h>
+
+#include "server.h"
+
+struct memcache_conn {
+    // the server we are connected to
+    struct memcache_server *server;
+
+    // we are a member of struct memcache_server.conn_list
+    LIST_ENTRY(memcache_conn) connlist_node;
+    
+    // our socket fd
+    int fd;
+
+    // socket event
+    struct event ev;
+    
+    // the request (if any) that we are currently processing - NULL for idle connections
+    struct memcache_req *req;
+};
+
+/*
+ * Allocate a memcache_conn struct, and initiate the connect
+ */
+struct memcache_conn *memcache_conn_open (struct memcache_server *server);
+
+/*
+ * Attempt to connect to the server.
+ *
+ * Assumes we are not already connecting/connected
+ */
+int memcache_conn_connect (struct memcache_conn *conn);
+
+/*
+ * The given connection must be idle, whereupon the given request is processed.
+ */
+int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
+
+#endif /* MEMCACHE_CONNECTION_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/memcache.c	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,46 @@
+
+#include <stdlib.h>
+
+#include "memcache.h"
+#include "server.h"
+#include "../common.h"
+
+struct memcache *memcache_alloc (memcache_cb cb_fn) {
+    struct memcache *mc = NULL;
+
+    if ((mc = calloc(1, sizeof(*mc))) == NULL)
+        ERROR("calloc");
+    
+    // store callback
+    mc->cb_fn = cb_fn;
+
+    // init server list
+    LIST_INIT(&mc->server_list);
+    
+    // success
+    return mc;
+
+error:
+    if (mc)
+        free(mc);
+    
+    return NULL;
+}
+
+int memcache_add_server (struct memcache *mc, struct config_endpoint *endpoint, int max_connections) {
+    struct memcache_server *server = NULL;
+    
+    // alloc the server
+    if ((server = memcache_server_alloc(endpoint, max_connections)) == NULL)
+        goto error;
+
+    // enlist it
+    LIST_INSERT_HEAD(&mc->server_list, server, serverlist_node);
+
+    // done
+    return 0;
+
+error:
+    return -1;
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/memcache.h	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,16 @@
+#ifndef MEMCACHE_MEMCACHE_H
+#define MEMCACHE_MEMCACHE_H
+
+#include <sys/queue.h>
+
+#include "../memcache.h"
+
+struct memcache {
+    // user callback
+    memcache_cb cb_fn;
+
+    // list of servers
+    LIST_HEAD(memcache_serverlist_head, memcache_server) server_list;
+};
+
+#endif /* MEMCACHE_MEMCACHE_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/request.h	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,20 @@
+#ifndef MEMCACHE_REQ_H
+#define MEMCACHE_REQ_H
+
+#include <sys/queue.h>
+
+#include "../memcache.h"
+#include "connection.h"
+
+struct memcache_req {
+    struct memcached_connection *conn;
+    
+    // we are a member of struct memcache_server.req_queue
+    TAILQ_ENTRY(memcache_req) reqqueue_node;
+    
+    // our key/obj
+    struct memcached_key *key;
+    struct memcached_obj *obj;
+};
+
+#endif /* MEMCACHE_REQ_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/server.c	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,99 @@
+
+#include <stdlib.h>
+#include <assert.h>
+
+#include "server.h"
+#include "connection.h"
+#include "request.h"
+#include "../memcache.h"
+#include "../common.h"
+
+struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections) {
+    struct memcache_server *server = NULL;
+
+    if ((server = calloc(1, sizeof(*server))) == NULL)
+        ERROR("calloc");
+    
+    // store the vars
+    server->endpoint = endpoint;
+    server->max_connections = max_connections;
+    
+    // grow connpool so as to have a connection ready
+    if (memcache_server_grow_connpool(server))
+        ERROR("error opening initial connection");
+
+    // success
+    return server;
+
+error:
+    free(server);
+
+    return NULL;
+}
+
+int memcache_server_grow_connpool (struct memcache_server *server) {
+   struct memcache_conn *conn;
+   int count;
+
+   // count connections
+   for (count = 0, conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next, count++) ;
+
+   // room for more?
+   if (count < server->max_connections) {
+        // create a new one
+        if ((conn = memcache_conn_open(server)) == NULL)
+            return -1;
+        
+        // enlist it
+        LIST_INSERT_HEAD(&server->conn_list, conn, connlist_node);
+
+        // the connection will call memcache_server_coon_ready once it's ready for use...
+   }
+    
+   // success
+   return 0;
+}
+
+int memcache_server_add_req (struct memcache_server *server, struct memcache_req *req) {
+    struct memcache_conn *conn;
+    
+    // look for an idle connection
+    for (conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next) {
+        if (conn->req == NULL) {
+            // we found an idle connection
+            break;
+        }
+    }
+
+    if (conn != NULL) {
+        // we found an available connection
+        return memcache_conn_do_req(conn, req);
+
+    } else {
+        // enqueue the request until a connection is available
+        // XXX: queue size limits
+
+        TAILQ_INSERT_TAIL(&server->req_queue, req, reqqueue_node);
+
+        return 0;
+    }
+}
+
+void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn) {
+    assert(server == conn->server);
+
+    // do we have any queued requests waiting?
+    struct memcache_req *req = server->req_queue.tqh_first;
+
+    if (req) {
+        // remove it from the queue and execute it
+        TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
+        
+        if (memcache_conn_do_req(conn, req)) {
+            WARNING("processing enqueued request failed");
+
+            // XXX: error handling for req
+        }
+    }
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache/server.h	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,46 @@
+#ifndef MEMCACHE_SERVER_H
+#define MEMCACHE_SERVER_H
+
+#include <sys/queue.h>
+
+#include "../memcache.h"
+#include "../config.h"
+
+struct memcache_server {
+    struct config_endpoint *endpoint;
+    
+    // we are a member of struct memcache.server_list
+    LIST_ENTRY(memcache_server) serverlist_node;
+
+    // a list of connections for this server
+    LIST_HEAD(memcache_connlist_head, memcache_conn) conn_list;
+
+    // a list of enqueued requests waiting for a connection
+    TAILQ_HEAD(memcache_reqqueue_head, memcache_req) req_queue;
+
+    // how many connections we should have at most
+    int max_connections;
+};
+
+/*
+ * Alloc and return a new memcache_server
+ */
+struct memcache_server *memcache_server_alloc (struct config_endpoint *endpoint, int max_connections);
+
+/*
+ * Attempt to grow the connection pool by one connection. Doesn't do anything if we already have too many connections,
+ * otherwise the new connection will be opened and added to the conn_list.
+ */
+int memcache_server_grow_connpool (struct memcache_server *server);
+
+/*
+ * Process the given request on this server.
+ */
+int memcache_server_add_req (struct memcache_server *server, struct memcache_req *req);
+
+/*
+ * The given connection is ready for use
+ */
+void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn);
+
+#endif /* MEMCACHE_SERVER_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/memcache_test.c	Tue Aug 26 01:30:53 2008 +0300
@@ -0,0 +1,55 @@
+
+#include <event2/event.h>
+#include <event2/event_compat.h>
+#include <event2/event_struct.h>
+
+#include "memcache.h"
+#include "config.h"
+#include "common.h"
+
+static struct memcache *mc;
+static struct config_endpoint server_endpoint;
+
+int _memcache_cb (struct memcache_req *req, void *arg) {
+    return 0;
+}
+
+void begin_test () {
+    if ((mc = memcache_alloc(&_memcache_cb)) == NULL)
+        ERROR("memcache_alloc");
+    
+    endpoint_init(&server_endpoint, 11211);
+    
+    if (endpoint_parse(&server_endpoint, "localhost"))
+        ERROR("config_endpoint_parse");
+
+    if (memcache_add_server(mc, &server_endpoint, 1))
+        ERROR("memcache_add_server");
+    
+    // XXX: we should have a connect() running now
+
+error:
+    return;
+}
+
+int main (int argc, char **argv) {
+    // libevent init
+    struct event_base *ev_base = event_init();
+
+    if (!ev_base)
+        FATAL("event_init");
+    
+    begin_test();
+
+    // run the libevent mainloop
+    if (event_base_dispatch(ev_base))
+        WARNING("event_dispatch");
+
+    INFO("SHUTDOWN");
+    
+    // clean up
+    event_base_free(ev_base);
+    
+    // successfull exit
+    return 0;
+}
--- a/socket.h	Sat Aug 09 20:11:59 2008 +0300
+++ b/socket.h	Tue Aug 26 01:30:53 2008 +0300
@@ -1,6 +1,8 @@
 #ifndef SOCKET_H
 #define SOCKET_H
 
+#include <sys/socket.h>
+
 #include "config.h"
 
 #define SOCKET_LISTEN_BACKLOG 128