the request/reply code should be complete now, but still needs testing
authorTero Marttila <terom@fixme.fi>
Thu, 28 Aug 2008 00:29:39 +0300
changeset 43 e5b714190dee
parent 42 0e503189af2f
child 44 03a7e064f833
the request/reply code should be complete now, but still needs testing
memcache.h
memcache/command.c
memcache/command.h
memcache/command.o
memcache/connection.c
memcache/connection.h
memcache/request.c
memcache/request.h
memcache/server.c
memcache/server.h
--- a/memcache.h	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache.h	Thu Aug 28 00:29:39 2008 +0300
@@ -87,6 +87,10 @@
     MEMCACHE_STATE_QUEUED,
     MEMCACHE_STATE_SEND,
     MEMCACHE_STATE_REPLY,
+    MEMCACHE_STATE_REPLY_DATA,
+
+    MEMCACHE_STATE_DONE,
+    MEMCACHE_STATE_DATA_DONE,
 
     MEMCACHE_STATE_ERROR,
 };
--- a/memcache/command.c	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/command.c	Thu Aug 28 00:29:39 2008 +0300
@@ -92,7 +92,7 @@
         case MEMCACHE_CMD_STORE_PREPEND:
             assert(key != NULL && obj != NULL);
             assert(key->len > 0 && key->buf != NULL);
-            assert(obj->bytes > 0);
+            // XXX: ensure that we have a valid buf
 
             if (evbuffer_add_printf(buf, "%s %*s %u %lu %zu\r\n", cmd_name, (int) key->len, key->buf, obj->flags, obj->exptime, obj->bytes))
                 ERROR("evbuffer_add_printf");
@@ -117,32 +117,33 @@
     size_t line_length, buf_size;
     char *line = NULL, *token_cursor, *token, *invalid;
     int i;
-
-    // take note of how long the buffer is
-    buf_size = evbuffer_get_length(buf);
+   
+    // read lines until we find one that isn't empty
+    do {
+        // take note of how long the buffer is
+        buf_size = evbuffer_get_length(buf);
 
-    // first, try and read a full line
-    if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
-        // check if any data was consumed
-        if (evbuffer_get_length(buf) != buf_size) {
-            // faaaail!
-            return -1;
+        // free the prvious line in case it was empty
+        free(line);
 
-        } else {
-            // no complete line found
-            return 0;
+        // try and read a line
+        if ((line = evbuffer_readln(buf, &line_length, EVBUFFER_EOL_CRLF_STRICT)) == NULL) {
+            // check if any data was consumed
+            if (evbuffer_get_length(buf) != buf_size) {
+                // faaaail!
+                return -1;
+
+            } else {
+                // no complete line found
+                return 0;
+            }
         }
-    }
+
+    } while (line_length == 0);
     
     // just check to make sure that it really is null-delimited
     assert(line[line_length - 1] == '\0');
 
-    // empty lines?
-    if (line_length == 0) {
-        PWARNING("empty reply line !?!");
-        return 0;
-    }
-
     // use strsep
     token_cursor = line;
 
--- a/memcache/command.h	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/command.h	Thu Aug 28 00:29:39 2008 +0300
@@ -13,6 +13,17 @@
 int memcache_cmd_init (struct memcache_cmd *cmd, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
 
 int memcache_cmd_format_header (struct evbuffer *buf, enum memcache_command cmd_type, struct memcache_key *key, struct memcache_obj *obj);
+
+/*
+ * Attempt to parse a response line from the given buf. *header_data will be set to NULL if no complete response line
+ * was found, or a pointer to the line received (which may contain NULs between tokens), which must be freed by the
+ * caller. *reply_type will be set to the parsed `enum memcache_reply` type. If the reply is of type
+ * MEMCACHE_RPL_VALUE, then key and obj will be updated to correspond to what was returned (key.buf will point into
+ * *header_data). has_data will be set to true if the response also contains cache data.
+ *
+ * Empty lines will be skipped based on the assumption that they are, in fact, the newline between the data and the
+ * MEMCACHE_RPL_END line of a MEMCACHE_RPL_VALUE reply.
+ */
 int memcache_cmd_parse_header (struct evbuffer *buf, char **header_data, enum memcache_reply *reply_type, struct memcache_key *key, struct memcache_obj *obj, int *has_data);
 
 #endif /* MEMCACHE_COMMAND_H */
Binary file memcache/command.o has changed
--- a/memcache/connection.c	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/connection.c	Thu Aug 28 00:29:39 2008 +0300
@@ -16,7 +16,10 @@
 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
+static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
 
+static void memcache_conn_error (struct memcache_conn *conn);
+static void memcache_conn_req_error (struct memcache_conn *conn);
 static void memcache_conn_req_done (struct memcache_conn *conn);
 
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
@@ -52,10 +55,10 @@
     assert(conn->fd > 0);
 
     // set up the connect event
-    event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
+    event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
 
     // add it
-    if (event_add(&conn->ev, NULL))
+    if (event_add(&conn->ev_connect, NULL))
         PERROR("event_add");
     
     // success
@@ -76,7 +79,7 @@
     return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
 }
 
-int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
+void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
     assert(conn->fd > 0 && conn->is_connected);
     assert(conn->req == NULL);
 
@@ -84,27 +87,36 @@
     conn->req = req;
 
     // write the request header into our bufferevent's output buffer
-    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
+        // just fail the request
+        memcache_conn_req_error(conn);
+
         ERROR("failed to init the cmd");
+    }
     
     // tell our bufferevent to send it
-    if (bufferevent_enable(conn->bev, EV_WRITE))
+    if (bufferevent_enable(conn->bev, EV_WRITE)) {
+        // fail the entire connection
+        memcache_conn_error(conn);
+
         PERROR("bufferevent_enable");
+    }
     
     // tell the req that it is underway
     memcache_req_send(req);
 
-    // wait for that to complete
-    return 0;
+error:
 
-error:
-    return -1;
+    return;
 }
 
 /*
  * Start writing out the request data
  */
 void memcache_conn_send_req_data (struct memcache_conn *conn) {
+    // set up the ev_write
+    event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
+
     // just fake a call to the event handler
     _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
 }
@@ -116,24 +128,89 @@
     // ensure that we either didn't have a command, or it has been sent
     assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
 
-    // start reading on the bufferevent
+    // start/continue reading on the bufferevent
     if (bufferevent_enable(conn->bev, EV_READ))
         PERROR("bufferevent_enable");
 
+    // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
+    // will consume all available data iteratively.
+
     // ok, wait for the reply
     return;
 
 error:
-    // XXX: error handling
-    assert(0);
+    memcache_conn_req_error(conn);
 }
 
 /*
  * Start reading reply data from the connection
  */
 void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
-    // XXX: implement
-    assert(0);
+    int ret;
+
+    // check that the buf doesn't contain any data
+    assert(conn->req->buf.data == NULL);
+
+    // bytes *may* be zero if we have an empty cache entry
+    if (conn->req->obj.bytes > 0) {
+        // allocate a buffer for the reply data
+        if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
+            ERROR("malloc");
+        
+        // update the length
+        conn->req->buf.len = conn->req->obj.bytes;
+
+        // set offset to zero
+        conn->req->buf.offset = 0;
+
+        // do we have any data in the buf that we need to copy?
+        if (evbuffer_get_length(buf) > 0) {
+            // read the data into the memcache_buf
+            ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len);
+            
+            // sanity check...
+            assert(ret > 0 && ret <= conn->req->buf.len);
+
+            // update offset
+            conn->req->buf.offset += ret;
+        }
+
+        // still need to receive more data?
+        if (conn->req->buf.offset < conn->req->buf.len) {
+        
+            // disable the bufferevent while we read the data
+            if (bufferevent_disable(conn->bev, EV_READ))
+                PERROR("bufferevent_disable");
+
+            // set up the ev_read
+            event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn);
+
+            // then receive what data is left to receive
+            _memcache_conn_ev_read(conn->fd, EV_READ, conn);
+            
+            // wait for the data to arrive
+            return;
+
+        } else {
+            // the buffer already contained the cache data, no need to read any more
+            
+        }
+    } else {
+        // there is no data to receive for this item, so we can ignore this
+        
+    }
+    
+    // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ
+    // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then
+    // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called.
+    // Elegant!
+    memcache_conn_handle_reply(conn);
+
+    // ok
+    return;
+
+error:
+    memcache_conn_req_error(conn);
 }
 
 /*
@@ -168,8 +245,7 @@
     return;
 
 error:
-    // notify the server
-    memcache_server_conn_dead(conn->server, conn);
+    memcache_conn_error(conn);
 }
 
 /*
@@ -205,47 +281,56 @@
     
     // ensure that we do indeed have some data
     assert(evbuffer_get_length(in_buf) > 0);
-
-    // attempt to parse the response header
-    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
-        ERROR("memcache_cmd_parse_header");
     
-    if (!header_data) {
-        // no complete header received yet
-        return;
-    }
+    // consume as much data as possible
+    do {
+        // attempt to parse the response header
+        if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
+            ERROR("memcache_cmd_parse_header");
+        
+        if (!header_data) {
+            // no complete header received yet
+            return;
+        }
 
-    // disable reads again
-    if (bufferevent_disable(bev, EV_READ))
-        PERROR("bufferevent_disable");
-    
-    // does the reply include data?
-    if (has_data) {
-        // check that they key is the same
-        if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+        // if the reply contains data, check that they key is the same
+        if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0))
             ERROR("got reply with wrong key !?!");
 
-        // start reading the data (including whatever might be left over in the bufferevent buffer...)
-        // XXX: what if this triggers a req notify before we do?
-        memcache_conn_handle_reply_data(conn, in_buf);
+        // notify the request (no reply data is ready for reading yet, though)
+        memcache_req_reply(conn->req, reply_type);
+        
+        // does the reply include data?
+        if (has_data) {
+            // start reading the data (including whatever might be left over in the bufferevent buffer...)
+            memcache_conn_handle_reply_data(conn, in_buf);
 
-    } else {
-        // the request is done with
-        memcache_conn_req_done(conn);
-    }
+        } else {
+            // the request is done with
+            memcache_conn_req_done(conn);
+        }
+
+        // free the header data
+        free(header_data);
     
-    // notify the request
-    memcache_req_reply(conn->req, reply_type);
+    } while (evbuffer_get_length(in_buf) > 0);
+    
+    // done
+    return;
+   
+error:
+    // free the header data read from the buf
+    free(header_data);
 
-error:
-    // XXX: error handling
-    return;
+    memcache_conn_req_error(conn);
 }
 
 
 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
-    // XXX: error handling
-    assert(0);
+    struct memcache_conn *conn = arg;
+
+    // fail the entire connection
+    memcache_conn_error(conn);
 }
 
 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
@@ -265,7 +350,7 @@
 
     // should never be the case... ?
     if (ret == 0)
-        ERROR("write returned 0 !?!");
+        ERROR("write returned EOF !?!");
     
     // did we manage to write some data?
     if (ret > 0) {
@@ -276,7 +361,7 @@
     // data left to write?
     if (buf->offset < buf->len) {
         // reschedule
-        if (event_add(&conn->ev, NULL))
+        if (event_add(&conn->ev_write, NULL))
             PERROR("event_add");
 
     } else {
@@ -288,8 +373,87 @@
     return;
 
 error:
-    // XXX: error handling
-    assert(0);
+    // fail the entire connection
+    memcache_conn_error(conn);
+}
+
+static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
+    struct memcache_conn *conn = arg;
+    struct memcache_buf *buf = &conn->req->buf;
+    int ret;
+
+    // correct event
+    assert(event == EV_READ);
+
+    // we do indeed expect to receive data
+    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+    
+    // do the actual read()
+    if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+        PERROR("read");
+
+    // should never be the case... ?
+    if (ret == 0)
+        ERROR("read returned EOF !?!");
+    
+    // did we manage to read some data?
+    if (ret > 0) {
+        // update offset
+        buf->offset += ret;
+    }
+    
+    // only notify the req if new data was received, and we won't be calling req_done next.
+    if (ret > 0 && buf->offset < buf->len) {
+        // notify the req
+        memcache_req_data(conn->req);
+    }
+
+    // data left to read?
+    if (buf->offset < buf->len) {
+        // reschedule
+        if (event_add(&conn->ev_read, NULL))
+            PERROR("event_add");
+
+
+    } else {
+        // done! We can let the bufferenvet handle the rest of the reply now
+        memcache_conn_handle_reply(conn);
+    }
+
+    // success
+    return;
+
+error:
+    // fail the entire connection
+    memcache_conn_error(conn);
+}
+
+/*
+ * The entire connection failed
+ */
+static void memcache_conn_error (struct memcache_conn *conn) {
+    // fail the request, if we have one
+    if (conn->req)
+        memcache_conn_req_error(conn);
+
+    // tell the server we failed
+    memcache_server_conn_dead(conn->server, conn);
+}
+
+/*
+ * Request failed somehow
+ */
+static void memcache_conn_req_error (struct memcache_conn *conn) {
+    // ensure that we do currently have a req
+    assert(conn->req);
+    
+    // error out the req
+    memcache_req_error(conn->req);
+    assert(conn->req->conn == NULL);
+    
+    // we are now available again
+    conn->req = NULL;
+   
 }
 
 /*
@@ -308,7 +472,8 @@
 }
 
 void memcache_conn_free (struct memcache_conn *conn) {
-    // XXX: conn->req?
+    // ensure we don't have a req bound to us
+    assert(conn->req == NULL);
 
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);
@@ -321,9 +486,15 @@
         conn->fd = 0;
     }
     
-    // ensure that the event is not pending anymore
-    assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0);
+    // ensure that the events are not pending anymore
+    assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
+    assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
+    assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
     
+    // free the bufferevent
+    if (conn->bev)
+        bufferevent_free(conn->bev);
+
     // free it
     free(conn);
 }
--- a/memcache/connection.h	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/connection.h	Thu Aug 28 00:29:39 2008 +0300
@@ -20,8 +20,10 @@
     // our socket fd
     int fd;
 
-    // socket event
-    struct event ev;
+    // socket events
+    struct event ev_connect;
+    struct event ev_read;
+    struct event ev_write;
 
     // socket bufferevent
     struct bufferevent *bev;
@@ -56,7 +58,7 @@
 /*
  * The given connection must be idle, whereupon the given request is processed.
  */
-int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
+void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req);
 
 /*
  * Free all resources associated with a closed/failed connection
--- a/memcache/request.c	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/request.c	Thu Aug 28 00:29:39 2008 +0300
@@ -45,15 +45,6 @@
     req->mc->cb_fn(req, req->cb_arg);
 }
 
-void memcache_req_error (struct memcache_req *req) {
-    // forget our connection
-    req->conn = NULL;
-
-    req->state = MEMCACHE_STATE_ERROR;
-
-    _memcache_req_notify(req);
-}
-
 void memcache_req_queued (struct memcache_req *req) {
     req->state = MEMCACHE_STATE_QUEUED;
 
@@ -73,21 +64,41 @@
     _memcache_req_notify(req);
 }
 
+void memcache_req_data (struct memcache_req *req) {
+    assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA);
+
+    req->state = MEMCACHE_STATE_REPLY_DATA;
+    
+    _memcache_req_notify(req);
+}
+
 void memcache_req_done (struct memcache_req *req) {
-    // make sure we are in the STATE_SEND state
-    assert(req->state == MEMCACHE_STATE_SEND);
+    // make sure we are in the REPLY/REPLY_DATA state
+    assert(req->state == MEMCACHE_STATE_REPLY || req->state == MEMCACHE_STATE_REPLY_DATA);
+
+    // make sure we really have the full data, if applicable
+    assert(req->buf.data == NULL || req->buf.offset == req->buf.len);
 
     // forget the connection
     req->conn = NULL;
+    
+    // state depends on if we have data or not...
+    req->state = req->buf.data ? MEMCACHE_STATE_DATA_DONE : MEMCACHE_STATE_DONE;
+}
 
-    // our state is currently indeterminate until req_reply is called
-    req->state = MEMCACHE_STATE_INVALID;
+void memcache_req_error (struct memcache_req *req) {
+    // forget our connection
+    req->conn = NULL;
+
+    req->state = MEMCACHE_STATE_ERROR;
+
+    _memcache_req_notify(req);
 }
 
 void memcache_req_free (struct memcache_req *req) {
     // must be unused
     assert(req->conn == NULL);
-    assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR);
+    assert(req->state == MEMCACHE_STATE_INVALID || req->state == MEMCACHE_STATE_ERROR || req->state == MEMCACHE_STATE_DONE || req->state == MEMCACHE_STATE_DATA_DONE);
 
     free(req->key.buf);
     free(req);
--- a/memcache/request.h	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/request.h	Thu Aug 28 00:29:39 2008 +0300
@@ -40,12 +40,6 @@
 struct memcache_req *memcache_req_alloc (struct memcache *mc, enum memcache_command cmd_type, const struct memcache_key *key, void *cb_arg);
 
 /*
- * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
- * anymore, and the req should not be accessed by memcache_* code after this.
- */
-void memcache_req_error (struct memcache_req *req);
-
-/*
  * The request has been queued.
  */
 void memcache_req_queued (struct memcache_req *req);
@@ -56,11 +50,31 @@
 void memcache_req_send (struct memcache_req *req);
 
 /*
- * The response has been received, although if the respones also contains data, that will be notified separately
+ * The reply has been received, although if the respones also contains data, that will be notified separately.
+ *
+ * If the response doesn't contain any data, req_done will be called after this is. Otherwise, this will be called
+ * first, followed by zero or more calls to req_data, followed by req_reply/RPL_END + req_done.
+ *
+ * Note that no req data will not be available when this is first called, only once req_data/req_done is called.
  */
 void memcache_req_reply (struct memcache_req *req, enum memcache_reply reply_type);
 
 /*
+ * Some amount of reply data has been received. This may be called between the req_reply/MEMCACHE_RPL_VALUE and
+ * req_reply/MEMCACHE_RPL_END + req_done notifications.
+ *
+ * Note that this is not always called, it is possible that one might have:
+ *      memcache_req_reply(req, MEMCACHE_RPL_VALUE)
+ *      memcache_req_reply(req, MEMCACHE_RPL_END)
+ *      memcache_req_done(req)
+ *
+ * This will happen if both the value and the end replies are received in the same chunk.
+ *
+ * Note also that replies may have zero-length entries, in which case req_data will never be called either.
+ */
+void memcache_req_data (struct memcache_req *req);
+
+/*
  * The request was sent and is now done, and is not associated with the connection anymore.
  *
  * This will be called before req_reply/req_data, but not before/after req_error.
@@ -68,6 +82,12 @@
 void memcache_req_done (struct memcache_req *req);
 
 /*
+ * An error occurred, and the request must be abandoned. This will assume that the req is not active or enqueued
+ * anymore, and the req should not be accessed by memcache_* code after this.
+ */
+void memcache_req_error (struct memcache_req *req);
+
+/*
  * Free an unused req. Should always be called by the user, not via internal means.
  */
 void memcache_req_free (struct memcache_req *req);
--- a/memcache/server.c	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/server.c	Thu Aug 28 00:29:39 2008 +0300
@@ -67,7 +67,10 @@
 
     if (conn != NULL) {
         // we found an available connection
-        return memcache_conn_do_req(conn, req);
+        // if the request fails, then we will know via conn_dead
+        memcache_conn_do_req(conn, req);
+
+        return 0;
 
     } else {
         // enqueue the request until a connection is available
@@ -92,12 +95,8 @@
         // remove it from the queue and execute it
         TAILQ_REMOVE(&server->req_queue, req, reqqueue_node);
         
-        if (memcache_conn_do_req(conn, req)) {
-            WARNING("processing enqueued request failed");
-            
-            // notify the request/user, the req is now out of our hands
-            memcache_req_error(req);
-        }
+        // this will take care of any error handling by itself
+        memcache_conn_do_req(conn, req);
     }
 }
 
--- a/memcache/server.h	Wed Aug 27 22:42:27 2008 +0300
+++ b/memcache/server.h	Thu Aug 28 00:29:39 2008 +0300
@@ -44,7 +44,7 @@
 void memcache_server_conn_ready (struct memcache_server *server, struct memcache_conn *conn);
 
 /*
- * The given connection failed/died
+ * The given connection failed/died. This will take care of freeing it/reconnecting it
  */
 void memcache_server_conn_dead (struct memcache_server *server, struct memcache_conn *conn);