more reply-receiving code, but still incomplete
authorTero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 22:42:27 +0300
changeset 42 0e503189af2f
parent 41 540737bf6bac
child 43 e5b714190dee
more reply-receiving code, but still incomplete
memcache.h
memcache/command.c
memcache/command.o
memcache/connection.c
memcache/request.c
memcache/request.h
memcache/server.c
--- a/memcache.h	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache.h	Wed Aug 27 22:42:27 2008 +0300
@@ -84,6 +84,10 @@
 enum memcache_req_state {
     MEMCACHE_STATE_INVALID,
 
+    MEMCACHE_STATE_QUEUED,
+    MEMCACHE_STATE_SEND,
+    MEMCACHE_STATE_REPLY,
+
     MEMCACHE_STATE_ERROR,
 };
 
--- a/memcache/command.c	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/command.c	Wed Aug 27 22:42:27 2008 +0300
@@ -1,4 +1,5 @@
-
+#include <string.h>
+#include <stdlib.h>
 #include <assert.h>
 
 #include "command.h"
@@ -14,32 +15,28 @@
     "prepend"   // MEMCACHE_CMD_STORE_PREPEND
 };
 
-/*
 static struct memcache_reply_info {
     enum memcache_reply type;
     char *name;
-    int has_data;
 
-} *memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
-    MEMCACHE_RPL_INVALID,
-
-    MEMCACHE_RPL_ERROR,
-    MEMCACHE_RPL_CLIENT_ERROR,
-    MEMCACHE_RPL_SERVER_ERROR,
+} memcache_cmd_replies[MEMCACHE_RPL_MAX] = {
+    {   MEMCACHE_RPL_INVALID,       NULL            },
+    {   MEMCACHE_RPL_ERROR,         "ERROR"         },
+    {   MEMCACHE_RPL_CLIENT_ERROR,  "CLIENT_ERROR"  },
+    {   MEMCACHE_RPL_SERVER_ERROR,  "SERVER_ERROR"  },
     
     // MEMCACHE_CMD_FETCH_*
-    MEMCACHE_RPL_VALUE,
-    MEMCACHE_RPL_END,
+    {   MEMCACHE_RPL_VALUE,         "VALUE"         },
+    {   MEMCACHE_RPL_END,           "END"           },
     
     // MEMCACHE_CMD_STORE_*
-    MEMCACHE_RPL_STORED,
-    MEMCACHE_RPL_NOT_STORED,
-    MEMCACHE_RPL_EXISTS,
-    MEMCACHE_RPL_NOT_FOUND,
-
+    {   MEMCACHE_RPL_STORED,        "STORED"        },
+    {   MEMCACHE_RPL_NOT_STORED,    "NOT_STORED"    },
+    {   MEMCACHE_RPL_EXISTS,        "EXISTS"        },
+    {   MEMCACHE_RPL_NOT_FOUND,     "NOT_FOUND"     }
 };
 
-*/
+
 
 int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj) {
     // shouldn't already have a request header yet?
@@ -117,7 +114,142 @@
 }
 
 int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data) {
-    // XXX: implement
-    assert(0);
+    size_t line_length, buf_size;
+    char *line = NULL, *token_cursor, *token, *invalid;
+    int i;
+
+    // take note of how long the buffer is
+    buf_size = evbuffer_get_length(buf);
+
+    // first, try and read a full line
+    if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
+        // check if any data was consumed
+        if (evbuffer_get_length(buf) != buf_size) {
+            // faaaail!
+            return -1;
+
+        } else {
+            // no complete line found
+            return 0;
+        }
+    }
+    
+    // just check to make sure that it really is null-delimited
+    assert(line[line_length - 1] == '\0');
+
+    // empty lines?
+    if (line_length == 0) {
+        PWARNING("empty reply line !?!");
+        return 0;
+    }
+
+    // use strsep
+    token_cursor = line;
+
+    // the first token should be the reply name
+    if ((token = strsep(&token_cursor, " ")) == NULL)
+        ERROR("no reply name in response line: %s", line);
+    
+    // figure out the reply type
+    for (i = MEMCACHE_RPL_INVALID + 1; i < MEMCACHE_RPL_MAX; i++) {
+        if (strcmp(token, memcache_cmd_replies[i].name) == 0)
+            break;
+    }
+    
+    // did we figure out what this reply means?
+    if (i == MEMCACHE_RPL_MAX)
+        ERROR("unrecognized reply: %s", line);
+    
+    // we found the type
+    *reply_type = memcache_cmd_replies[i].type;
+
+    // default to no data
+    *has_data = 0;
+
+    switch (*reply_type) {
+        case MEMCACHE_RPL_ERROR:
+            // no additional data
+
+            break;
+        
+        case MEMCACHE_RPL_CLIENT_ERROR:
+        case MEMCACHE_RPL_SERVER_ERROR:
+            // the rest of the line is a human-readable error message
+            WARNING("received a %s reply: %s", token, token_cursor);
+            
+            break;
+        
+        case MEMCACHE_RPL_VALUE:
+            // <key> <flags> <bytes> [<cas unique>]
+
+            // the key field
+            if ((key->buf = strsep(&token_cursor, " ")) == NULL)
+                ERROR("missing key in VALUE reply");
+            
+            if ((key->len = strlen(key->buf)) == 0)
+                ERROR("zero-length key in VALUE reply");
+            
+            // the flags field
+            if ((token = strsep(&token_cursor, " ")) == NULL)
+                ERROR("missing flags in VALUE reply");
+
+            obj->flags = (u_int32_t) strtol(token, &invalid, 10);
+
+            if (*invalid != '\0')
+                ERROR("invalid flags in VALUE reply: %s (%s)", token, invalid);
+            
+            // the bytes field
+            if ((token = strsep(&token_cursor, " ")) == NULL)
+                ERROR("missing bytes in VALUE reply");
+
+            obj->bytes = (u_int32_t) strtol(token, &invalid, 10);
+
+            if (*invalid != '\0')
+                ERROR("invalid bytes in VALUE reply: %s (%s)", token, invalid);
+
+            // the optional cas field
+            if ((token = strsep(&token_cursor, " ")) != NULL) {
+                // there is a cas value present
+                obj->cas = (u_int64_t) strtoll(token, &invalid, 10);
+                
+                if (*invalid != '\0')
+                    ERROR("invalid cas value in VALUE reply: %s (%s)", token, invalid);
+
+            } else {
+                obj->cas = 0;
+
+            }
+            
+            // and we do have data following this
+            *has_data = 1;
+
+            break;
+        
+        case MEMCACHE_RPL_END:
+            // no additional data
+
+            break;
+
+        case MEMCACHE_RPL_STORED:
+        case MEMCACHE_RPL_NOT_STORED:
+        case MEMCACHE_RPL_EXISTS:
+        case MEMCACHE_RPL_NOT_FOUND:
+            // no additional data
+
+            break;
+        
+        default:
+            assert(0);
+    };
+
+    // success
+    *header_data = line;
+
+    return 0;
+
+error:
+    free(line);
+
+    return -1;
 }
 
Binary file memcache/command.o has changed
--- a/memcache/connection.c	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/connection.c	Wed Aug 27 22:42:27 2008 +0300
@@ -17,6 +17,8 @@
 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
 
+static void memcache_conn_req_done (struct memcache_conn *conn);
+
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
     struct memcache_conn *conn = NULL;
 
@@ -89,6 +91,9 @@
     if (bufferevent_enable(conn->bev, EV_WRITE))
         PERROR("bufferevent_enable");
     
+    // tell the req that it is underway
+    memcache_req_send(req);
+
     // wait for that to complete
     return 0;
 
@@ -124,6 +129,14 @@
 }
 
 /*
+ * Start reading reply data from the connection
+ */
+void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+    // XXX: implement
+    assert(0);
+}
+
+/*
  * The connect() has finished
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -185,6 +198,7 @@
 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
     struct memcache_conn *conn = arg;
     struct evbuffer *in_buf = bufferevent_get_input(bev);
+    struct memcache_key key;
     char *header_data;
     enum memcache_reply reply_type;
     int has_data;
@@ -193,10 +207,35 @@
     assert(evbuffer_get_length(in_buf) > 0);
 
     // attempt to parse the response header
-    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
         ERROR("memcache_cmd_parse_header");
+    
+    if (!header_data) {
+        // no complete header received yet
+        return;
+    }
 
-    // XXX: read reply data
+    // disable reads again
+    if (bufferevent_disable(bev, EV_READ))
+        PERROR("bufferevent_disable");
+    
+    // does the reply include data?
+    if (has_data) {
+        // check that they key is the same
+        if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+            ERROR("got reply with wrong key !?!");
+
+        // start reading the data (including whatever might be left over in the bufferevent buffer...)
+        // XXX: what if this triggers a req notify before we do?
+        memcache_conn_handle_reply_data(conn, in_buf);
+
+    } else {
+        // the request is done with
+        memcache_conn_req_done(conn);
+    }
+    
+    // notify the request
+    memcache_req_reply(conn->req, reply_type);
 
 error:
     // XXX: error handling
@@ -253,7 +292,24 @@
     assert(0);
 }
 
+/*
+ * Detach the request
+ */
+static void memcache_conn_req_done (struct memcache_conn *conn) {
+    // ensure that we do currently have a req
+    assert(conn->req);
+    
+    // have the req detach and check it did so
+    memcache_req_done(conn->req);
+    assert(conn->req->conn == NULL);
+    
+    // we are now available again
+    conn->req = NULL;
+}
+
 void memcache_conn_free (struct memcache_conn *conn) {
+    // XXX: conn->req?
+
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);
     
--- a/memcache/request.c	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/request.c	Wed Aug 27 22:42:27 2008 +0300
@@ -41,20 +41,47 @@
     return NULL;
 }
 
-static int _memcache_req_notify (struct memcache_req *req) {
-    return req->mc->cb_fn(req, req->cb_arg);
+static void _memcache_req_notify (struct memcache_req *req) {
+    req->mc->cb_fn(req, req->cb_arg);
 }
 
 void memcache_req_error (struct memcache_req *req) {
     // forget our connection
     req->conn = NULL;
-    
-    // enter ERROR state
+
     req->state = MEMCACHE_STATE_ERROR;
 
-    // notify
-    if (_memcache_req_notify(req))
-        WARNING("req error callback failed, ignoring");
+    _memcache_req_notify(req);
+}
+
+void memcache_req_queued (struct memcache_req *req) {
+    req->state = MEMCACHE_STATE_QUEUED;
+
+//    _memcache_req_notify(req);
+}
+
+void memcache_req_send (struct memcache_req *req) {
+    req->state = MEMCACHE_STATE_SEND;
+    
+//    _memcache_req_notify(req);
+}
+
+void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type) {
+    req->state = MEMCACHE_STATE_REPLY;
+    req->reply_type = reply_type;
+
+    _memcache_req_notify(req);
+}
+
+void memcache_req_done (struct memcache_req *req) {
+    // make sure we are in the STATE_SEND state
+    assert(req->state == MEMCACHE_STATE_SEND);
+
+    // forget the connection
+    req->conn = NULL;
+
+    // our state is currently indeterminate until req_reply is called
+    req->state = MEMCACHE_STATE_INVALID;
 }
 
 void memcache_req_free (struct memcache_req *req) {
--- a/memcache/request.h	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/request.h	Wed Aug 27 22:42:27 2008 +0300
@@ -13,6 +13,9 @@
     // the command to execute
     enum memcache_command cmd_type;
 
+    // the reply we have received, or MEMCACHE_REPLY_INVALID
+    enum memcache_reply reply_type;
+
     // our key/obj
     struct memcache_key key;
     struct memcache_obj obj;
@@ -36,7 +39,6 @@
  */
 struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
 
-
 /*
  * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
  * anymore, and the req should not be accessed by memcache_* code after this.
@@ -44,6 +46,28 @@
 void memcache_req_error (struct memcache_req *req);
 
 /*
+ * The request has been queued.
+ */
+void memcache_req_queued (struct memcache_req *req);
+
+/*
+ * The request is being sent.
+ */
+void memcache_req_send (struct memcache_req *req);
+
+/*
+ * The response has been received, although if the respones also contains data, that will be notified separately
+ */
+void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type);
+
+/*
+ * The request was sent and is now done, and is not associated with the connection anymore.
+ *
+ * This will be called before req_reply/req_data, but not before/after req_error.
+ */
+void memcache_req_done (struct memcache_req *req);
+
+/*
  * Free an unused req. Should always be called by the user, not via internal means.
  */
 void memcache_req_free (struct memcache_req *req);
--- a/memcache/server.c	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/server.c	Wed Aug 27 22:42:27 2008 +0300
@@ -74,6 +74,9 @@
         // XXX: queue size limits
 
         TAILQ_INSERT_TAIL(&server->req_queue, req, reqqueue_node);
+        
+        // notify the req
+        memcache_req_queued(req);
 
         return 0;
     }