--- a/memcache/connection.c Wed Aug 27 10:13:38 2008 +0300
+++ b/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300
@@ -2,13 +2,20 @@
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
+#include <errno.h>
#include <assert.h>
#include "connection.h"
+#include "command.h"
+#include "request.h"
#include "../socket.h"
#include "../common.h"
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -18,7 +25,7 @@
// remember the server
conn->server = server;
-
+
// attempt connect
if (memcache_conn_connect(conn))
ERROR("failed to connect to server");
@@ -74,13 +81,49 @@
// store the req
conn->req = req;
- // XXX: transmit it
+ // write the request header into our bufferevent's output buffer
+ if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
+ ERROR("failed to init the cmd");
+
+ // tell our bufferevent to send it
+ if (bufferevent_enable(conn->bev, EV_WRITE))
+ PERROR("bufferevent_enable");
+
+ // wait for that to complete
+ return 0;
error:
return -1;
}
/*
+ * Start writing out the request data
+ */
+void memcache_conn_send_req_data (struct memcache_conn *conn) {
+ // just fake a call to the event handler
+ _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
+}
+
+/*
+ * Start reading a reply from the connection
+ */
+void memcache_conn_handle_reply (struct memcache_conn *conn) {
+ // ensure that we either didn't have a command, or it has been sent
+ assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
+
+ // start reading on the bufferevent
+ if (bufferevent_enable(conn->bev, EV_READ))
+ PERROR("bufferevent_enable");
+
+ // ok, wait for the reply
+ return;
+
+error:
+ // XXX: error handling
+ assert(0);
+}
+
+/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -93,6 +136,15 @@
if (error)
ERROR("connect failed: %s", strerror(error));
+ // set up the bufferevent
+ if ((conn->bev = bufferevent_new(fd,
+ &_memcache_conn_bev_read,
+ &_memcache_conn_bev_write,
+ &_memcache_conn_bev_error,
+ conn
+ )) == NULL)
+ ERROR("bufferevent_new");
+
// mark us as succesfully connected
conn->is_connected = 1;
@@ -107,6 +159,100 @@
memcache_server_conn_dead(conn->server, conn);
}
+/*
+ * The write buffer is empty, which means that we have written out a command header
+ */
+static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
+ struct memcache_conn *conn = arg;
+
+ // the command header has been sent
+ assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
+
+ // does this request have some data to be included in the request?
+ if (conn->req->buf.data > 0) {
+ // we need to send the request data next
+ memcache_conn_send_req_data(conn);
+
+ } else {
+ // wait for a reply
+ memcache_conn_handle_reply(conn);
+ }
+}
+
+/*
+ * We have received some reply data, which should include the complete reply line at some point
+ */
+static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
+ struct memcache_conn *conn = arg;
+ struct evbuffer *in_buf = bufferevent_get_input(bev);
+ char *header_data;
+ enum memcache_reply reply_type;
+ int has_data;
+
+ // ensure that we do indeed have some data
+ assert(evbuffer_get_length(in_buf) > 0);
+
+ // attempt to parse the response header
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+ ERROR("memcache_cmd_parse_header");
+
+ // XXX: read reply data
+
+error:
+ // XXX: error handling
+ return;
+}
+
+
+static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
+ // XXX: error handling
+ assert(0);
+}
+
+static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
+ struct memcache_conn *conn = arg;
+ struct memcache_buf *buf = &conn->req->buf;
+ int ret;
+
+ // correct event
+ assert(event == EV_WRITE);
+
+ // we do indeed have data to send
+ assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
+
+ // do the actual write()
+ if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
+ PERROR("write");
+
+ // should never be the case... ?
+ if (ret == 0)
+ ERROR("write returned 0 !?!");
+
+ // did we manage to write some data?
+ if (ret > 0) {
+ // update offset
+ buf->offset += ret;
+ }
+
+ // data left to write?
+ if (buf->offset < buf->len) {
+ // reschedule
+ if (event_add(&conn->ev, NULL))
+ PERROR("event_add");
+
+ } else {
+ // done! We can handle the reply now
+ memcache_conn_handle_reply(conn);
+ }
+
+ // success
+ return;
+
+error:
+ // XXX: error handling
+ assert(0);
+}
+
void memcache_conn_free (struct memcache_conn *conn) {
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);