memcache/connection.c
changeset 42 0e503189af2f
parent 41 540737bf6bac
child 43 e5b714190dee
--- a/memcache/connection.c	Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/connection.c	Wed Aug 27 22:42:27 2008 +0300
@@ -17,6 +17,8 @@
 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
 
+static void memcache_conn_req_done (struct memcache_conn *conn);
+
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
     struct memcache_conn *conn = NULL;
 
@@ -89,6 +91,9 @@
     if (bufferevent_enable(conn->bev, EV_WRITE))
         PERROR("bufferevent_enable");
     
+    // tell the req that it is underway
+    memcache_req_send(req);
+
     // wait for that to complete
     return 0;
 
@@ -124,6 +129,14 @@
 }
 
 /*
+ * Start reading reply data from the connection
+ */
+void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+    // XXX: implement
+    assert(0);
+}
+
+/*
  * The connect() has finished
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -185,6 +198,7 @@
 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
     struct memcache_conn *conn = arg;
     struct evbuffer *in_buf = bufferevent_get_input(bev);
+    struct memcache_key key;
     char *header_data;
     enum memcache_reply reply_type;
     int has_data;
@@ -193,10 +207,35 @@
     assert(evbuffer_get_length(in_buf) > 0);
 
     // attempt to parse the response header
-    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
         ERROR("memcache_cmd_parse_header");
+    
+    if (!header_data) {
+        // no complete header received yet
+        return;
+    }
 
-    // XXX: read reply data
+    // disable reads again
+    if (bufferevent_disable(bev, EV_READ))
+        PERROR("bufferevent_disable");
+    
+    // does the reply include data?
+    if (has_data) {
+        // check that they key is the same
+        if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+            ERROR("got reply with wrong key !?!");
+
+        // start reading the data (including whatever might be left over in the bufferevent buffer...)
+        // XXX: what if this triggers a req notify before we do?
+        memcache_conn_handle_reply_data(conn, in_buf);
+
+    } else {
+        // the request is done with
+        memcache_conn_req_done(conn);
+    }
+    
+    // notify the request
+    memcache_req_reply(conn->req, reply_type);
 
 error:
     // XXX: error handling
@@ -253,7 +292,24 @@
     assert(0);
 }
 
+/*
+ * Detach the request
+ */
+static void memcache_conn_req_done (struct memcache_conn *conn) {
+    // ensure that we do currently have a req
+    assert(conn->req);
+    
+    // have the req detach and check it did so
+    memcache_req_done(conn->req);
+    assert(conn->req->conn == NULL);
+    
+    // we are now available again
+    conn->req = NULL;
+}
+
 void memcache_conn_free (struct memcache_conn *conn) {
+    // XXX: conn->req?
+
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);