memcache/connection.c
changeset 49 10c7dce1a043
parent 48 1c67f512779b
equal deleted inserted replaced
48:1c67f512779b 49:10c7dce1a043
     4 #include <string.h>
     4 #include <string.h>
     5 #include <errno.h>
     5 #include <errno.h>
     6 #include <assert.h>
     6 #include <assert.h>
     7 
     7 
     8 #include "connection.h"
     8 #include "connection.h"
       
     9 #include "memcache.h"
     9 #include "command.h"
    10 #include "command.h"
    10 #include "request.h"
    11 #include "request.h"
    11 #include "../socket.h"
    12 #include "../socket.h"
    12 #include "../common.h"
    13 #include "../common.h"
    13 
    14 
    14 
    15 void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint);
    15 void memcache_conn_send_req_data (struct memcache_conn *conn);
    16 void memcache_conn_send_data (struct memcache_conn *conn);
    16 void memcache_conn_finish_req_data (struct memcache_conn *conn);
    17 void memcache_conn_send_end (struct memcache_conn *conn);
    17 void memcache_conn_handle_reply (struct memcache_conn *conn);
    18 void memcache_conn_send_done (struct memcache_conn *conn);
    18 void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf);
    19 void memcache_conn_recv_next (struct memcache_conn *conn);
       
    20 void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf);
       
    21 void memcache_conn_recv_done (struct memcache_conn *conn);
       
    22 void memcache_conn_recv_end (struct memcache_conn *conn);
    19 
    23 
    20 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
    24 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
    21 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
    25 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
    22 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
    26 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
    23 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
    27 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
    24 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
    28 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
    25 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
    29 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
    26 
    30 
    27 static void memcache_conn_error (struct memcache_conn *conn);
    31 static void memcache_conn_error (struct memcache_conn *conn);
    28 static void memcache_conn_req_done (struct memcache_conn *conn);
    32 static void memcache_conn_close (struct memcache_conn *conn);
    29 
       
    30 void memcache_conn_close (struct memcache_conn *conn);
       
    31 
    33 
    32 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    34 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    33     struct memcache_conn *conn = NULL;
    35     struct memcache_conn *conn = NULL;
    34 
    36 
    35     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    37     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    36         ERROR("calloc");
    38         ERROR("calloc");
    37     
    39     
    38     // remember the server
    40     // remember the server
    39     conn->server = server;
    41     conn->server = server;
       
    42 
       
    43     // init the req queue
       
    44     TAILQ_INIT(&conn->req_queue);
    40 
    45 
    41     // attempt connect
    46     // attempt connect
    42     if (memcache_conn_connect(conn))
    47     if (memcache_conn_connect(conn))
    43         ERROR("failed to connect to server");
    48         ERROR("failed to connect to server");
    44     
    49     
    81    
    86    
    82     return -1;
    87     return -1;
    83 }
    88 }
    84 
    89 
    85 int memcache_conn_is_available (struct memcache_conn *conn) {
    90 int memcache_conn_is_available (struct memcache_conn *conn) {
    86     return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
    91     return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests));
    87 }
    92 }
    88 
    93 
    89 void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    94 void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    90     assert(conn->fd > 0 && conn->is_connected);
    95     assert(memcache_conn_is_available(conn));
    91     assert(conn->req == NULL);
    96     
    92 
    97     // XXX: validate req
    93     // write the request header into our bufferevent's output buffer
    98 
    94     if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
    99     // stick the req into the req queue
    95         memcache_req_cmd(req),
   100     TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node);
    96         memcache_req_key(req),
   101     
    97         memcache_req_obj(req)
   102     // if send is idle...
    98     )) {
   103     if (conn->send_req == NULL) 
    99         ERROR("failed to init the cmd");
   104         memcache_conn_send_next(conn, req);
   100     }
   105 }
   101     
   106 
   102     // store the req
   107 /*
   103     conn->req = req;
   108  * Send out the next request.
   104     
   109  *
   105     // tell our bufferevent to send it
   110  * If there is currently a send_req, it is considered as done.
   106     if (bufferevent_enable(conn->bev, EV_WRITE))
   111  */
   107         PERROR("bufferevent_enable");
   112 void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) {
   108     
   113     struct memcache_req *req;
   109     // tell the req that it is underway
   114     
   110     memcache_req_send(req);
   115     // req will be either
   111     
   116     //  * the next enqueued req after the one that was last written
   112     // success
   117     //  * hint, if no req was being written (the req that was just enqueued)
   113     return;
   118     if (conn->send_req) {
   114 
   119         assert(!hint);
   115 error:
   120     
   116     if (conn->req)
   121         // the nex req
   117         memcache_conn_error(conn);
   122         req = TAILQ_NEXT(conn->send_req, reqqueue_node);
   118 
   123 
   119     else
   124         // and reset the send_req to NULL...
   120         memcache_req_error(req);
   125         conn->send_req = NULL;
       
   126 
       
   127     } else {
       
   128         assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint);
       
   129         
       
   130         // the given req
       
   131         req = hint;
       
   132     }
       
   133 
       
   134     // while we still have a request to process...
       
   135     while (req) {
       
   136         // try and write the request header into our bufferevent's output buffer
       
   137         if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
       
   138             memcache_req_cmd(req),
       
   139             memcache_req_key(req),
       
   140             memcache_req_obj(req)
       
   141         )) {
       
   142             WARNING("invalid request");
       
   143             memcache_req_error(req);
       
   144 
       
   145             // continue on to the next request
       
   146             req = TAILQ_NEXT(req, reqqueue_node);
       
   147 
       
   148             continue;
       
   149         }
       
   150 
       
   151         // send this one
       
   152         conn->send_req = req;
       
   153 
       
   154         // enable our bufferevent to send it
       
   155         if (bufferevent_enable(conn->bev, EV_WRITE))
       
   156             PERROR("bufferevent_enable");
       
   157 
       
   158         // tell the req that it is underway
       
   159         memcache_req_send(req);
       
   160         
       
   161         // done, we only want to process one
       
   162         break;
       
   163     }
       
   164     
       
   165     // done, we either replaced send_req, or consumed them all
       
   166     return;
       
   167 
       
   168 error:
       
   169     memcache_conn_error(conn);
   121 }
   170 }
   122 
   171 
   123 /*
   172 /*
   124  * Start writing out the request data
   173  * Start writing out the request data
   125  */
   174  */
   126 void memcache_conn_send_req_data (struct memcache_conn *conn) {
   175 void memcache_conn_send_data (struct memcache_conn *conn) {
   127     if (conn->req->obj.bytes > 0) {
   176     if (conn->send_req->obj.bytes > 0) {
   128         // set up the ev_write
   177         // set up the ev_write
   129         event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
   178         event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
   130 
   179 
   131         // just fake a call to the event handler
   180         // just fake a call to the event handler
   132         _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
   181         _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
   133 
   182 
   134     } else {
   183     } else {
   135         // just send the \r\n
   184         // just send the \r\n
   136         memcache_conn_finish_req_data(conn);
   185         memcache_conn_send_end(conn);
   137     }
   186     }
   138 }
   187 }
   139 
   188 
   140 /*
   189 /*
   141  * Write out the final \r\n to terminate the request data
   190  * Write out the final \r\n to terminate the request data
   142  */
   191  */
   143 void memcache_conn_finish_req_data (struct memcache_conn *conn) {
   192 void memcache_conn_send_end (struct memcache_conn *conn) {
       
   193     // XXX: this will enable the bev by itself?
   144     if (bufferevent_write(conn->bev, "\r\n", 2))
   194     if (bufferevent_write(conn->bev, "\r\n", 2))
   145         PERROR("bufferevent_write");
   195         PERROR("bufferevent_write");
   146     
   196     
   147     // ok
   197     // ok
   148     return;
   198     return;
   150 error:
   200 error:
   151     memcache_conn_error(conn);
   201     memcache_conn_error(conn);
   152 }
   202 }
   153 
   203 
   154 /*
   204 /*
       
   205  * Finished sending the current send_req
       
   206  */
       
   207 void memcache_conn_send_done (struct memcache_conn *conn) {
       
   208     assert(conn->send_req != NULL);
       
   209 
       
   210     // send the next req, if there is one
       
   211     memcache_conn_send_next(conn, NULL);
       
   212     
       
   213     // if pipelining is on, it's a question of how many we've sent...
       
   214     if (conn->server->mc->pipeline_requests)
       
   215         memcache_server_conn_ready(conn->server, conn);
       
   216 }
       
   217 
       
   218 /*
   155  * Start reading a reply from the connection
   219  * Start reading a reply from the connection
   156  */
   220  */
   157 void memcache_conn_handle_reply (struct memcache_conn *conn) {
   221 void memcache_conn_recv_next (struct memcache_conn *conn) {
   158     // ensure that we either didn't have a command, or it has been sent
       
   159     assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
       
   160 
       
   161     // start/continue reading on the bufferevent
   222     // start/continue reading on the bufferevent
   162     if (bufferevent_enable(conn->bev, EV_READ))
   223     if (bufferevent_enable(conn->bev, EV_READ))
   163         PERROR("bufferevent_enable");
   224         PERROR("bufferevent_enable");
   164 
   225 
   165     // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
   226     // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
   173 }
   234 }
   174 
   235 
   175 /*
   236 /*
   176  * Start reading reply data from the connection
   237  * Start reading reply data from the connection
   177  */
   238  */
   178 void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
   239 void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) {
       
   240     struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
   179     int ret;
   241     int ret;
   180 
   242 
   181     // check that the buf doesn't contain any data
   243     // check that the buf doesn't contain any data
   182     assert(conn->req->buf.data == NULL);
   244     assert(req->buf.data == NULL);
   183 
   245 
   184     // bytes *may* be zero if we have an empty cache entry
   246     // bytes *may* be zero if we have an empty cache entry
   185     if (conn->req->obj.bytes > 0) {
   247     if (req->obj.bytes > 0) {
   186         // XXX: memcache_req_make_buffer?
   248         // XXX: memcache_req_make_buffer?
   187 
   249 
   188         // allocate a buffer for the reply data
   250         // allocate a buffer for the reply data
   189         if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL)
   251         if ((req->buf.data = malloc(req->obj.bytes)) == NULL)
   190             ERROR("malloc");
   252             ERROR("malloc");
   191         
   253         
   192         // update the length
   254         // update the length
   193         conn->req->buf.len = conn->req->obj.bytes;
   255         req->buf.len = req->obj.bytes;
   194 
   256 
   195         // set offset to zero
   257         // set offset to zero
   196         conn->req->buf.offset = 0;
   258         req->buf.offset = 0;
   197         
   259         
   198         // and note that it is present, and is ours
   260         // and note that it is present, and is ours
   199         conn->req->have_buf = 1;
   261         req->have_buf = 1;
   200         conn->req->is_buf_ours = 1;
   262         req->is_buf_ours = 1;
   201 
   263 
   202         // do we have any data in the buf that we need to copy?
   264         // do we have any data in the buf that we need to copy?
   203         if (evbuffer_get_length(buf) > 0) {
   265         if (evbuffer_get_length(buf) > 0) {
   204             // read the data into the memcache_buf
   266             // read the data into the memcache_buf
   205             ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len);
   267             ret = evbuffer_remove(buf, req->buf.data, req->buf.len);
   206             
   268             
   207             // sanity check...
   269             // sanity check...
   208             assert(ret > 0 && ret <= conn->req->buf.len);
   270             assert(ret > 0 && ret <= req->buf.len);
   209 
   271 
   210             // update offset
   272             // update offset
   211             conn->req->buf.offset += ret;
   273             req->buf.offset += ret;
   212         }
   274         }
   213 
   275 
   214         // still need to receive more data?
   276         // still need to receive more data?
   215         if (conn->req->buf.offset < conn->req->buf.len) {
   277         if (req->buf.offset < req->buf.len) {
   216         
   278         
   217             // disable the bufferevent while we read the data
   279             // disable the bufferevent while we read the data
   218             if (bufferevent_disable(conn->bev, EV_READ))
   280             if (bufferevent_disable(conn->bev, EV_READ))
   219                 PERROR("bufferevent_disable");
   281                 PERROR("bufferevent_disable");
   220 
   282 
   234     } else {
   296     } else {
   235         // there is no data to receive for this item, so we can ignore this
   297         // there is no data to receive for this item, so we can ignore this
   236         
   298         
   237     }
   299     }
   238     
   300     
   239     // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ
   301     // finish it off
   240     // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then
   302     memcache_conn_recv_end(conn);
   241     // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called.
   303 
       
   304     // ok
       
   305     return;
       
   306 
       
   307 error:
       
   308     memcache_conn_error(conn);
       
   309 }
       
   310 
       
   311 /*
       
   312  * Receive the final bits of data following the reply data block
       
   313  */ 
       
   314 void memcache_conn_recv_end (struct memcache_conn *conn) {
       
   315     // we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the
       
   316     // bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line"
       
   317     // after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause
       
   318     // recv_done to be called.
   242     // Elegant!
   319     // Elegant!
   243     memcache_conn_handle_reply(conn);
   320     memcache_conn_recv_next(conn);
   244 
   321 }
   245     // ok
   322 
   246     return;
   323 /*
   247 
   324  * We have finished receiving our current request
   248 error:
   325  */
   249     memcache_conn_error(conn);
   326 void memcache_conn_recv_done (struct memcache_conn *conn) {
       
   327     struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
       
   328     
       
   329     // we can remove it from the list now
       
   330     TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
       
   331 
       
   332     // have the req detach
       
   333     memcache_req_done(req);
       
   334 
       
   335     // and prepare to recv the next one
       
   336     memcache_conn_recv_next(conn);
       
   337     
       
   338     // if pipelining is off, it's a question of when we've recevied the reply...
       
   339     if (!conn->server->mc->pipeline_requests)
       
   340         memcache_server_conn_ready(conn->server, conn);
   250 }
   341 }
   251 
   342 
   252 /*
   343 /*
   253  * The connect() has finished
   344  * The connect() has finished
   254  */
   345  */
   271     )) == NULL)
   362     )) == NULL)
   272         ERROR("bufferevent_new");
   363         ERROR("bufferevent_new");
   273 
   364 
   274     // mark us as succesfully connected
   365     // mark us as succesfully connected
   275     conn->is_connected = 1;
   366     conn->is_connected = 1;
       
   367     
       
   368     // and prepare to recv any response headers
       
   369     memcache_conn_recv_next(conn);
   276 
   370 
   277     // notify the server
   371     // notify the server
   278     memcache_server_conn_ready(conn->server, conn);
   372     memcache_server_conn_ready(conn->server, conn);
   279     
   373     
   280     // good
   374     // good
   288  * The write buffer is empty, which means that we have written out a command header
   382  * The write buffer is empty, which means that we have written out a command header
   289  */
   383  */
   290 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
   384 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
   291     struct memcache_conn *conn = arg;
   385     struct memcache_conn *conn = arg;
   292 
   386 
       
   387     assert(conn->send_req != NULL);
       
   388 
   293     // the command header has been sent
   389     // the command header has been sent
   294     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
   390     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
   295     
   391     
   296     // does this request have some data to be included in the request?
   392     // does this request have some data to be included in the request?
   297     // if the data has already been sent (we handle the final \r\n as well), then skip this.
   393     // if the data has already been sent (we handle the final \r\n as well), then skip this.
   298     if (conn->req->have_buf && conn->req->buf.offset == 0) {
   394     if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) {
   299         // we need to send the request data next
   395         // we need to send the request data next
   300         memcache_conn_send_req_data(conn);
   396         memcache_conn_send_data(conn);
   301 
   397 
   302     } else {
   398     } else {
   303         // wait for a reply
   399         // the request has now been sent, and se can send the next one
   304         memcache_conn_handle_reply(conn);
   400         memcache_conn_send_done(conn);
   305     }
   401     }
   306 }
   402 }
   307 
   403 
   308 /*
   404 /*
   309  * We have received some reply data, which should include the complete reply line at some point
   405  * We have received some reply data, which should include the complete reply line at some point
   310  */
   406  */
   311 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
   407 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
   312     struct memcache_conn *conn = arg;
   408     struct memcache_conn *conn = arg;
   313     struct evbuffer *in_buf = bufferevent_get_input(bev);
   409     struct evbuffer *in_buf = bufferevent_get_input(bev);
       
   410     struct memcache_req *req;
   314     struct memcache_key key;
   411     struct memcache_key key;
   315     char *header_data;
   412     char *header_data;
   316     enum memcache_reply reply_type;
   413     enum memcache_reply reply_type;
   317     int has_data;
   414     int has_data;
   318     
   415     
   319     // ensure that we do indeed have some data
   416     // ensure that we do indeed have some data
   320     assert(evbuffer_get_length(in_buf) > 0);
   417     assert(evbuffer_get_length(in_buf) > 0);
   321     
   418     
   322     // consume as much data as possible
   419     // consume as much data as possible
   323     do {
   420     do {
       
   421         // the req we are processing
       
   422         req = TAILQ_FIRST(&conn->req_queue);
       
   423 
   324         // attempt to parse the response header
   424         // attempt to parse the response header
   325         if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
   425         if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data))
   326             ERROR("memcache_cmd_parse_header");
   426             ERROR("memcache_cmd_parse_header");
   327         
   427         
   328         if (!header_data) {
   428         if (!header_data) {
   329             // no complete header received yet
   429             // no complete header received yet
   330             return;
   430             return;
   331         }
   431         }
   332 
   432 
       
   433         // no request waiting?
       
   434         if (req == NULL)
       
   435             ERROR("got a response without any request pending: %s", header_data);
       
   436 
   333         // if the reply contains data, check that they key is the same
   437         // if the reply contains data, check that they key is the same
   334         if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0))
   438         if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0))
   335             ERROR("got reply with wrong key !?!");
   439             ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf);
       
   440 
       
   441         // check it's a FETCH request
       
   442         if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET)
       
   443             ERROR("a data reply for a non-CMD_FETCH_* command !?!");
   336 
   444 
   337         // notify the request (no reply data is ready for reading yet, though)
   445         // notify the request (no reply data is ready for reading yet, though)
   338         memcache_req_recv(conn->req, reply_type);
   446         memcache_req_recv(req, reply_type);
   339         
   447         
   340         // does the reply include data?
   448         // does the reply include data?
   341         if (has_data) {
   449         if (has_data) {
   342             // start reading the data (including whatever might be left over in the bufferevent buffer...)
   450             // start reading the data (including whatever might be left over in the bufferevent buffer...)
   343             memcache_conn_handle_reply_data(conn, in_buf);
   451             memcache_conn_recv_data(conn, in_buf);
   344 
   452 
   345         } else {
   453         } else {
   346             // the request is done with
   454             // the request is done with
   347             memcache_conn_req_done(conn);
   455             memcache_conn_recv_done(conn);
   348         }
   456         }
   349 
   457 
   350         // free the header data
   458         // free the header data, but not a second time on error exit
   351         free(header_data);
   459         free(header_data); header_data = NULL;
   352     
   460     
   353     } while (evbuffer_get_length(in_buf) > 0);
   461     } while (evbuffer_get_length(in_buf) > 0);
   354     
   462     
   355     // done
   463     // done
   356     return;
   464     return;
   370     memcache_conn_error(conn);
   478     memcache_conn_error(conn);
   371 }
   479 }
   372 
   480 
   373 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
   481 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
   374     struct memcache_conn *conn = arg;
   482     struct memcache_conn *conn = arg;
   375     struct memcache_buf *buf = &conn->req->buf;
   483     struct memcache_buf *buf = &conn->send_req->buf;
   376     int ret;
   484     int ret;
   377 
   485 
   378     // correct event
   486     // correct event
   379     assert(event == EV_WRITE);
   487     assert(event == EV_WRITE);
   380 
   488 
   401         if (event_add(&conn->ev_write, NULL))
   509         if (event_add(&conn->ev_write, NULL))
   402             PERROR("event_add");
   510             PERROR("event_add");
   403 
   511 
   404     } else {
   512     } else {
   405         // done! Send the terminating \r\n next
   513         // done! Send the terminating \r\n next
   406         memcache_conn_finish_req_data(conn);
   514         memcache_conn_send_end(conn);
   407     }
   515     }
   408 
   516 
   409     // success
   517     // success
   410     return;
   518     return;
   411 
   519 
   414     memcache_conn_error(conn);
   522     memcache_conn_error(conn);
   415 }
   523 }
   416 
   524 
   417 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
   525 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
   418     struct memcache_conn *conn = arg;
   526     struct memcache_conn *conn = arg;
   419     struct memcache_buf *buf = &conn->req->buf;
   527     struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
       
   528     struct memcache_buf *buf = &req->buf;
   420     int ret;
   529     int ret;
   421 
   530 
   422     // correct event
   531     // correct event
   423     assert(event == EV_READ);
   532     assert(event == EV_READ);
   424 
   533 
   440     }
   549     }
   441     
   550     
   442     // only notify the req if new data was received, and we won't be calling req_done next.
   551     // only notify the req if new data was received, and we won't be calling req_done next.
   443     if (ret > 0 && buf->offset < buf->len) {
   552     if (ret > 0 && buf->offset < buf->len) {
   444         // notify the req
   553         // notify the req
   445         memcache_req_data(conn->req);
   554         memcache_req_data(req);
   446     }
   555     }
   447 
   556 
   448     // data left to read?
   557     // data left to read?
   449     if (buf->offset < buf->len) {
   558     if (buf->offset < buf->len) {
   450         // reschedule
   559         // reschedule
   452             PERROR("event_add");
   561             PERROR("event_add");
   453 
   562 
   454 
   563 
   455     } else {
   564     } else {
   456         // done! We can let the bufferenvet handle the rest of the reply now
   565         // done! We can let the bufferenvet handle the rest of the reply now
   457         memcache_conn_handle_reply(conn);
   566         memcache_conn_recv_end(conn);
   458     }
   567     }
   459 
   568 
   460     // success
   569     // success
   461     return;
   570     return;
   462 
   571 
   463 error:
   572 error:
   464     // fail the entire connection
   573     // fail the entire connection
   465     memcache_conn_error(conn);
   574     memcache_conn_error(conn);
   466 }
   575 }
   467 
   576 
   468 // XXX: need to flush/disable buffers/events on errors
       
   469 
       
   470 /*
   577 /*
   471  * The entire connection failed
   578  * The entire connection failed
   472  */
   579  */
   473 static void memcache_conn_error (struct memcache_conn *conn) {
   580 static void memcache_conn_error (struct memcache_conn *conn) {
   474     // fail the request, if we have one
   581     struct memcache_req *req;
   475     if (conn->req) {
   582 
       
   583     // fail all requests, if we have any
       
   584     TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) {
   476         // error out the req
   585         // error out the req
   477         memcache_req_error(conn->req);
   586         memcache_req_error(req);
   478         
   587 
   479         // we are now available again
   588         TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
   480         conn->req = NULL;
   589     }
   481     }
   590 
       
   591     conn->send_req = NULL;
   482     
   592     
   483     // close the connection
   593     // close the connection
   484     memcache_conn_close(conn);
   594     memcache_conn_close(conn);
   485 
   595 
   486     // tell the server we failed
   596     // tell the server we failed
   487     memcache_server_conn_dead(conn->server, conn);
   597     memcache_server_conn_dead(conn->server, conn);
   488 }
       
   489 
       
   490 /*
       
   491  * Detach the request
       
   492  */
       
   493 static void memcache_conn_req_done (struct memcache_conn *conn) {
       
   494     // ensure that we do currently have a req
       
   495     assert(conn->req);
       
   496     
       
   497     // have the req detach
       
   498     memcache_req_done(conn->req);
       
   499 
       
   500     // we are now available again
       
   501     conn->req = NULL;
       
   502     
       
   503     memcache_server_conn_ready(conn->server, conn);
       
   504 }
   598 }
   505 
   599 
   506 void memcache_conn_close (struct memcache_conn *conn) {
   600 void memcache_conn_close (struct memcache_conn *conn) {
   507     // close the fd if needed
   601     // close the fd if needed
   508     if (conn->fd > 0) {
   602     if (conn->fd > 0) {
   527     // not connected anymore
   621     // not connected anymore
   528     conn->is_connected = 0;
   622     conn->is_connected = 0;
   529 }
   623 }
   530 
   624 
   531 void memcache_conn_free (struct memcache_conn *conn) {
   625 void memcache_conn_free (struct memcache_conn *conn) {
   532     // ensure we don't have a req bound to us
   626     // ensure we don't have any reqs bound to us
   533     assert(conn->req == NULL);
   627     assert(TAILQ_EMPTY(&conn->req_queue));
   534     
   628     
   535     // ensure that the connection is not considered to be connected anymore
   629     // ensure that the connection is not considered to be connected anymore
   536     assert(!conn->is_connected);
   630     assert(!conn->is_connected);
   537     
   631     
   538     // free it
   632     // free it