--- a/memcache/connection.c Wed Aug 27 21:30:32 2008 +0300
+++ b/memcache/connection.c Wed Aug 27 22:42:27 2008 +0300
@@ -17,6 +17,8 @@
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
+static void memcache_conn_req_done (struct memcache_conn *conn);
+
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
@@ -89,6 +91,9 @@
if (bufferevent_enable(conn->bev, EV_WRITE))
PERROR("bufferevent_enable");
+ // tell the req that it is underway
+ memcache_req_send(req);
+
// wait for that to complete
return 0;
@@ -124,6 +129,14 @@
}
/*
+ * Start reading reply data from the connection
+ */
+void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
+ // XXX: implement
+ assert(0);
+}
+
+/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
@@ -185,6 +198,7 @@
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
struct evbuffer *in_buf = bufferevent_get_input(bev);
+ struct memcache_key key;
char *header_data;
enum memcache_reply reply_type;
int has_data;
@@ -193,10 +207,35 @@
assert(evbuffer_get_length(in_buf) > 0);
// attempt to parse the response header
- if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
+ if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
ERROR("memcache_cmd_parse_header");
+
+ if (!header_data) {
+ // no complete header received yet
+ return;
+ }
- // XXX: read reply data
+ // disable reads again
+ if (bufferevent_disable(bev, EV_READ))
+ PERROR("bufferevent_disable");
+
+ // does the reply include data?
+ if (has_data) {
+ // check that they key is the same
+ if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
+ ERROR("got reply with wrong key !?!");
+
+ // start reading the data (including whatever might be left over in the bufferevent buffer...)
+ // XXX: what if this triggers a req notify before we do?
+ memcache_conn_handle_reply_data(conn, in_buf);
+
+ } else {
+ // the request is done with
+ memcache_conn_req_done(conn);
+ }
+
+ // notify the request
+ memcache_req_reply(conn->req, reply_type);
error:
// XXX: error handling
@@ -253,7 +292,24 @@
assert(0);
}
+/*
+ * Detach the request
+ */
+static void memcache_conn_req_done (struct memcache_conn *conn) {
+ // ensure that we do currently have a req
+ assert(conn->req);
+
+ // have the req detach and check it did so
+ memcache_req_done(conn->req);
+ assert(conn->req->conn == NULL);
+
+ // we are now available again
+ conn->req = NULL;
+}
+
void memcache_conn_free (struct memcache_conn *conn) {
+ // XXX: conn->req?
+
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);