memcache/connection.c
changeset 41 540737bf6bac
parent 39 0e21a65074a6
child 42 0e503189af2f
--- a/memcache/connection.c	Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.c	Wed Aug 27 21:30:32 2008 +0300
@@ -2,13 +2,20 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
+#include <errno.h>
 #include <assert.h>
 
 #include "connection.h"
+#include "command.h"
+#include "request.h"
 #include "../socket.h"
 #include "../common.h"
 
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
 
 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
     struct memcache_conn *conn = NULL;
@@ -18,7 +25,7 @@
     
     // remember the server
     conn->server = server;
-    
+
     // attempt connect
     if (memcache_conn_connect(conn))
         ERROR("failed to connect to server");
@@ -74,13 +81,49 @@
     // store the req
     conn->req = req;
 
-    // XXX: transmit it
+    // write the request header into our bufferevent's output buffer
+    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+        ERROR("failed to init the cmd");
+    
+    // tell our bufferevent to send it
+    if (bufferevent_enable(conn->bev, EV_WRITE))
+        PERROR("bufferevent_enable");
+    
+    // wait for that to complete
+    return 0;
 
 error:
     return -1;
 }
 
 /*
+ * Start writing out the request data
+ */
+void memcache_conn_send_req_data (struct memcache_conn *conn) {
+    // just fake a call to the event handler
+    _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+}
+
+/*
+ * Start reading a reply from the connection
+ */
+void memcache_conn_handle_reply (struct memcache_conn *conn) {
+    // ensure that we either didn't have a command, or it has been sent
+    assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
+
+    // start reading on the bufferevent
+    if (bufferevent_enable(conn->bev, EV_READ))
+        PERROR("bufferevent_enable");
+
+    // ok, wait for the reply
+    return;
+
+error:
+    // XXX: error handling
+    assert(0);
+}
+
+/*
  * The connect() has finished
  */
 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -93,6 +136,15 @@
     if (error)
         ERROR("connect failed: %s", strerror(error));
 
+    // set up the bufferevent
+    if ((conn->bev = bufferevent_new(fd, 
+        &_memcache_conn_bev_read,
+        &_memcache_conn_bev_write,
+        &_memcache_conn_bev_error,
+        conn
+    )) == NULL)
+        ERROR("bufferevent_new");
+
     // mark us as succesfully connected
     conn->is_connected = 1;
 
@@ -107,6 +159,100 @@
     memcache_server_conn_dead(conn->server, conn);
 }
 
+/*
+ * The write buffer is empty, which means that we have written out a command header
+ */
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
+    struct memcache_conn *conn = arg;
+
+    // the command header has been sent
+    assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
+    
+    // does this request have some data to be included in the request?
+    if (conn->req->buf.data > 0) {
+        // we need to send the request data next
+        memcache_conn_send_req_data(conn);
+
+    } else {
+        // wait for a reply
+        memcache_conn_handle_reply(conn);
+    }
+}
+
+/*
+ * We have received some reply data, which should include the complete reply line at some point
+ */
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
+    struct memcache_conn *conn = arg;
+    struct evbuffer *in_buf = bufferevent_get_input(bev);
+    char *header_data;
+    enum memcache_reply reply_type;
+    int has_data;
+    
+    // ensure that we do indeed have some data
+    assert(evbuffer_get_length(in_buf) > 0);
+
+    // attempt to parse the response header
+    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+        ERROR("memcache_cmd_parse_header");
+
+    // XXX: read reply data
+
+error:
+    // XXX: error handling
+    return;
+}
+
+
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
+    // XXX: error handling
+    assert(0);
+}
+
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
+    struct memcache_conn *conn = arg;
+    struct memcache_buf *buf = &conn->req->buf;
+    int ret;
+
+    // correct event
+    assert(event == EV_WRITE);
+
+    // we do indeed have data to send
+    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+    
+    // do the actual write()
+    if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+        PERROR("write");
+
+    // should never be the case... ?
+    if (ret == 0)
+        ERROR("write returned 0 !?!");
+    
+    // did we manage to write some data?
+    if (ret > 0) {
+        // update offset
+        buf->offset += ret;
+    }
+
+    // data left to write?
+    if (buf->offset < buf->len) {
+        // reschedule
+        if (event_add(&conn->ev, NULL))
+            PERROR("event_add");
+
+    } else {
+        // done! We can handle the reply now
+        memcache_conn_handle_reply(conn);
+    }
+
+    // success
+    return;
+
+error:
+    // XXX: error handling
+    assert(0);
+}
+
 void memcache_conn_free (struct memcache_conn *conn) {
     // ensure that the connection is not considered to be connected anymore
     assert(!conn->is_connected);