memcache connect error handling and req queuein
authorTero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 10:11:44 +0300
changeset 39 0e21a65074a6
parent 38 9894df13b779
child 40 9cecd22e643a
memcache connect error handling and req queuein
Makefile
memcache.h
memcache/connection.c
memcache/connection.h
memcache/memcache.c
memcache/memcache.h
memcache/request.h
memcache/server.c
memcache/server.h
socket.c
socket.h
--- a/Makefile	Tue Aug 26 01:30:53 2008 +0300
+++ b/Makefile	Wed Aug 27 10:11:44 2008 +0300
@@ -16,7 +16,7 @@
 web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
 coro_test: coro_test.o common.o config.o socket.o
 cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
-memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o socket.o config.o
+memcache_test: memcache_test.o common.o memcache/memcache.o memcache/connection.o memcache/server.o memcache/request.o socket.o config.o
 
 clean :
 	-rm *.o ${EXECS}
--- a/memcache.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache.h	Wed Aug 27 10:11:44 2008 +0300
@@ -21,7 +21,7 @@
  * Keys used
  */
 struct memcache_key {
-    const char *buf;
+    char *buf;
     size_t len;
 };
 
@@ -48,7 +48,7 @@
     STORE_CAS,
 };
 
-enum memcache_state {
+enum memcache_req_state {
     STATE_INVALID,
 
     STATE_ERROR,
@@ -82,7 +82,7 @@
 /*
  * Request state
  */ 
-int memcache_req_state (struct memcache_req *req, enum memcache_state *state);
+enum memcache_req_state memcache_req_state (struct memcache_req *req);
 
 /*
  * Request key
--- a/memcache/connection.c	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/connection.c	Wed Aug 27 10:11:44 2008 +0300
@@ -1,6 +1,7 @@
 
 #include <stdlib.h>
 #include <unistd.h>
+#include <string.h>
 #include <assert.h>
 
 #include "connection.h"
@@ -32,12 +33,15 @@
 }
 
 int memcache_conn_connect (struct memcache_conn *conn) {
-    assert(conn->fd <= 0);
+    assert(conn->fd <= 0 && !conn->is_connected);
     
     // begin connect
     if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
         goto error;
 
+    // fd 0 should be stdin...
+    assert(conn->fd > 0);
+
     // set up the connect event
     event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
 
@@ -59,8 +63,12 @@
     return -1;
 }
 
+int memcache_conn_is_available (struct memcache_conn *conn) {
+    return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
+}
+
 int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
-    assert(conn->fd > 0);
+    assert(conn->fd > 0 && conn->is_connected);
     assert(conn->req == NULL);
 
     // store the req
@@ -77,10 +85,44 @@
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
     struct memcache_conn *conn = arg;
+    int error;
 
-   // XXX: check if the connect() failed
+    if (socket_check_error(fd, &error))
+        goto error;
+
+    if (error)
+        ERROR("connect failed: %s", strerror(error));
+
+    // mark us as succesfully connected
+    conn->is_connected = 1;
 
     // notify the server
     memcache_server_conn_ready(conn->server, conn);
+    
+    // good
+    return;
+
+error:
+    // notify the server
+    memcache_server_conn_dead(conn->server, conn);
 }
 
+void memcache_conn_free (struct memcache_conn *conn) {
+    // ensure that the connection is not considered to be connected anymore
+    assert(!conn->is_connected);
+    
+    // close the fd if needed
+    if (conn->fd > 0) {
+        if (close(conn->fd))
+            PWARNING("close");
+
+        conn->fd = 0;
+    }
+    
+    // ensure that the event is not pending anymore
+    assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0);
+    
+    // free it
+    free(conn);
+}
+
--- a/memcache/connection.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/connection.h	Wed Aug 27 10:11:44 2008 +0300
@@ -20,6 +20,9 @@
 
     // socket event
     struct event ev;
+
+    // have we succesfully connected yet?
+    int is_connected;
     
     // the request (if any) that we are currently processing - NULL for idle connections
     struct memcache_req *req;
@@ -38,8 +41,18 @@
 int memcache_conn_connect (struct memcache_conn *conn);
 
 /*
+ * Check if the given connection is available, that is, connected and not busy
+ */
+int memcache_conn_is_available (struct memcache_conn *conn);
+
+/*
  * The given connection must be idle, whereupon the given request is processed.
  */
 int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
 
+/*
+ * Free all resources associated with a closed/failed connection
+ */
+void memcache_conn_free (struct memcache_conn *conn);
+
 #endif /* MEMCACHE_CONNECTION_H */
--- a/memcache/memcache.c	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/memcache.c	Wed Aug 27 10:11:44 2008 +0300
@@ -44,3 +44,34 @@
     return -1;
 }
 
+struct memcache_server *memcache_choose_server (struct memcache *mc, const struct memcache_key *key) {
+    // XXX: support multiple servers
+    return mc->server_list.lh_first;
+}
+
+struct memcache_req *memcache_fetch (struct memcache *mc, const struct memcache_key *key, void *cb_arg) {
+    struct memcache_req *req = NULL;
+    struct memcache_server *server = NULL;
+    
+    // alloc the request
+    if ((req = memcache_req_alloc(key, cb_arg)) == NULL)
+        ERROR("failed to allocate request");
+
+    // pick a server
+    if ((server = memcache_choose_server(mc, key)) == NULL)
+        ERROR("failed to find a server to use");
+
+    // enqueue it!
+    if (memcache_server_add_req(server, req))
+        ERROR("failed to hand over the request for processing");
+
+    // success!
+    return 0;
+
+error:
+    if (req)
+        memcache_req_free(req);
+
+    return -1;
+}  
+
--- a/memcache/memcache.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/memcache.h	Wed Aug 27 10:11:44 2008 +0300
@@ -13,4 +13,6 @@
     LIST_HEAD(memcache_serverlist_head, memcache_server) server_list;
 };
 
+struct memcache_server *memcache_choose_server (struct memcache *mc, const struct memcache_key *key);
+
 #endif /* MEMCACHE_MEMCACHE_H */
--- a/memcache/request.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/request.h	Wed Aug 27 10:11:44 2008 +0300
@@ -7,14 +7,41 @@
 #include "connection.h"
 
 struct memcache_req {
-    struct memcached_connection *conn;
+    // the memcache context that we belong to
+    struct memcache *mc;
+
+    // our key/obj
+    struct memcache_key key;
+    struct memcache_obj *obj;
+
+    // our state
+    enum memcache_req_state state;
+
+    // our user callback argument
+    void *cb_arg;
+
+    // the conn 
+    struct memcache_connection *conn;
     
     // we are a member of struct memcache_server.req_queue
     TAILQ_ENTRY(memcache_req) reqqueue_node;
-    
-    // our key/obj
-    struct memcached_key *key;
-    struct memcached_obj *obj;
 };
 
+/*
+ * Allocate and return a new req, or NULL if unsuccesfull.
+ */
+struct memcache_req *memcache_req_alloc (struct memcache *mc, struct memcache_key *key, void *cb_arg);
+
+
+/*
+ * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
+ * anymore, and the req should not be accessed by memcache_* code after this.
+ */
+void memcache_req_error (struct memcache_req *req);
+
+/*
+ * Free an unused req. Should always be called by the user, not via internal means.
+ */
+void memcache_req_free (struct memcache_req *req);
+
 #endif /* MEMCACHE_REQ_H */
--- a/memcache/server.c	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/server.c	Wed Aug 27 10:11:44 2008 +0300
@@ -59,7 +59,7 @@
     
     // look for an idle connection
     for (conn = server->conn_list.lh_first; conn != NULL; conn = conn->connlist_node.le_next) {
-        if (conn->req == NULL) {
+        if (memcache_conn_is_available(conn)) {
             // we found an idle connection
             break;
         }
@@ -91,9 +91,22 @@
         
         if (memcache_conn_do_req(conn, req)) {
             WARNING("processing enqueued request failed");
-
-            // XXX: error handling for req
+            
+            // notify the request/user, the req is now out of our hands
+            memcache_req_error(req);
         }
     }
 }
 
+void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn) {
+    assert(server == conn->server);
+
+    // XXX: reconnect/error out requests?
+    
+    // remove it from the list
+    LIST_REMOVE(conn, connlist_node);
+
+    // free it
+    memcache_conn_free(conn);
+}
+
--- a/memcache/server.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/memcache/server.h	Wed Aug 27 10:11:44 2008 +0300
@@ -43,4 +43,9 @@
  */
 void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn);
 
+/*
+ * The given connection failed/died
+ */
+void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn);
+
 #endif /* MEMCACHE_SERVER_H */
--- a/socket.c	Tue Aug 26 01:30:53 2008 +0300
+++ b/socket.c	Wed Aug 27 10:11:44 2008 +0300
@@ -77,6 +77,23 @@
     return sock;
 }
 
+/*
+ * Check if the given socket has an error condition set, mostly intended for use with socket_connect_async.
+ *
+ * Returns 0 and sets *error on success (zero = no error, nonzero = error), -1 on failure (invalid socket).
+ */
+int socket_check_error (int sock, int *error) {
+    socklen_t optlen = sizeof(*error);
+
+    if (getsockopt(sock, SOL_SOCKET, SO_ERROR, error, &optlen))
+        PERROR("getsockopt");
+
+    return 0;
+
+error:
+    return -1;
+}
+
 
 /*
  * Do something to apply an endpoint to a socket
--- a/socket.h	Tue Aug 26 01:30:53 2008 +0300
+++ b/socket.h	Wed Aug 27 10:11:44 2008 +0300
@@ -9,5 +9,6 @@
 
 int socket_listen (struct config_endpoint *endpoint, int sock_type);
 int socket_connect_async (struct config_endpoint *endpoint, int sock_type);
+int socket_check_error (int sock, int *error);
 
 #endif /* SOCKET_H */