diff -r 9cecd22e643a -r 540737bf6bac memcache/connection.c --- a/memcache/connection.c Wed Aug 27 10:13:38 2008 +0300 +++ b/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300 @@ -2,13 +2,20 @@ #include #include #include +#include #include #include "connection.h" +#include "command.h" +#include "request.h" #include "../socket.h" #include "../common.h" static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); +static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); +static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); +static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); +static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); struct memcache_conn *memcache_conn_open (struct memcache_server *server) { struct memcache_conn *conn = NULL; @@ -18,7 +25,7 @@ // remember the server conn->server = server; - + // attempt connect if (memcache_conn_connect(conn)) ERROR("failed to connect to server"); @@ -74,13 +81,49 @@ // store the req conn->req = req; - // XXX: transmit it + // write the request header into our bufferevent's output buffer + if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) + ERROR("failed to init the cmd"); + + // tell our bufferevent to send it + if (bufferevent_enable(conn->bev, EV_WRITE)) + PERROR("bufferevent_enable"); + + // wait for that to complete + return 0; error: return -1; } /* + * Start writing out the request data + */ +void memcache_conn_send_req_data (struct memcache_conn *conn) { + // just fake a call to the event handler + _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); +} + +/* + * Start reading a reply from the connection + */ +void memcache_conn_handle_reply (struct memcache_conn *conn) { + // ensure that we either didn't have a command, or it has been sent + assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); + + // start reading on the bufferevent + if (bufferevent_enable(conn->bev, EV_READ)) + PERROR("bufferevent_enable"); + + // ok, wait for the reply + return; + +error: + // XXX: error handling + assert(0); +} + +/* * The connect() has finished */ static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { @@ -93,6 +136,15 @@ if (error) ERROR("connect failed: %s", strerror(error)); + // set up the bufferevent + if ((conn->bev = bufferevent_new(fd, + &_memcache_conn_bev_read, + &_memcache_conn_bev_write, + &_memcache_conn_bev_error, + conn + )) == NULL) + ERROR("bufferevent_new"); + // mark us as succesfully connected conn->is_connected = 1; @@ -107,6 +159,100 @@ memcache_server_conn_dead(conn->server, conn); } +/* + * The write buffer is empty, which means that we have written out a command header + */ +static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { + struct memcache_conn *conn = arg; + + // the command header has been sent + assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); + + // does this request have some data to be included in the request? + if (conn->req->buf.data > 0) { + // we need to send the request data next + memcache_conn_send_req_data(conn); + + } else { + // wait for a reply + memcache_conn_handle_reply(conn); + } +} + +/* + * We have received some reply data, which should include the complete reply line at some point + */ +static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { + struct memcache_conn *conn = arg; + struct evbuffer *in_buf = bufferevent_get_input(bev); + char *header_data; + enum memcache_reply reply_type; + int has_data; + + // ensure that we do indeed have some data + assert(evbuffer_get_length(in_buf) > 0); + + // attempt to parse the response header + if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data)) + ERROR("memcache_cmd_parse_header"); + + // XXX: read reply data + +error: + // XXX: error handling + return; +} + + +static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { + // XXX: error handling + assert(0); +} + +static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { + struct memcache_conn *conn = arg; + struct memcache_buf *buf = &conn->req->buf; + int ret; + + // correct event + assert(event == EV_WRITE); + + // we do indeed have data to send + assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); + + // do the actual write() + if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) + PERROR("write"); + + // should never be the case... ? + if (ret == 0) + ERROR("write returned 0 !?!"); + + // did we manage to write some data? + if (ret > 0) { + // update offset + buf->offset += ret; + } + + // data left to write? + if (buf->offset < buf->len) { + // reschedule + if (event_add(&conn->ev, NULL)) + PERROR("event_add"); + + } else { + // done! We can handle the reply now + memcache_conn_handle_reply(conn); + } + + // success + return; + +error: + // XXX: error handling + assert(0); +} + void memcache_conn_free (struct memcache_conn *conn) { // ensure that the connection is not considered to be connected anymore assert(!conn->is_connected);