memcache/connection.c
changeset 46 8a832c0e01ee
parent 44 03a7e064f833
child 48 1c67f512779b
equal deleted inserted replaced
45:10d514029c64 46:8a832c0e01ee
    17 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
    17 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
    18 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
    18 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
    19 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
    19 static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
    20 
    20 
    21 static void memcache_conn_error (struct memcache_conn *conn);
    21 static void memcache_conn_error (struct memcache_conn *conn);
    22 static void memcache_conn_req_error (struct memcache_conn *conn);
       
    23 static void memcache_conn_req_done (struct memcache_conn *conn);
    22 static void memcache_conn_req_done (struct memcache_conn *conn);
       
    23 
       
    24 void memcache_conn_close (struct memcache_conn *conn);
    24 
    25 
    25 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    26 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    26     struct memcache_conn *conn = NULL;
    27     struct memcache_conn *conn = NULL;
    27 
    28 
    28     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    29     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    81 
    82 
    82 void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    83 void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    83     assert(conn->fd > 0 && conn->is_connected);
    84     assert(conn->fd > 0 && conn->is_connected);
    84     assert(conn->req == NULL);
    85     assert(conn->req == NULL);
    85 
    86 
       
    87     // write the request header into our bufferevent's output buffer
       
    88     if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
       
    89         ERROR("failed to init the cmd");
       
    90     }
       
    91     
    86     // store the req
    92     // store the req
    87     conn->req = req;
    93     conn->req = req;
    88 
       
    89     // write the request header into our bufferevent's output buffer
       
    90     if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) {
       
    91         // just fail the request
       
    92         memcache_conn_req_error(conn);
       
    93 
       
    94         ERROR("failed to init the cmd");
       
    95     }
       
    96     
    94     
    97     // tell our bufferevent to send it
    95     // tell our bufferevent to send it
    98     if (bufferevent_enable(conn->bev, EV_WRITE)) {
    96     if (bufferevent_enable(conn->bev, EV_WRITE))
    99         // fail the entire connection
       
   100         memcache_conn_error(conn);
       
   101 
       
   102         PERROR("bufferevent_enable");
    97         PERROR("bufferevent_enable");
   103     }
       
   104     
    98     
   105     // tell the req that it is underway
    99     // tell the req that it is underway
   106     memcache_req_send(req);
   100     memcache_req_send(req);
   107 
   101     
   108 error:
   102     // success
   109 
   103     return;
   110     return;
   104 
       
   105 error:
       
   106     if (conn->req)
       
   107         memcache_conn_error(conn);
       
   108 
       
   109     else
       
   110         memcache_req_error(req);
   111 }
   111 }
   112 
   112 
   113 /*
   113 /*
   114  * Start writing out the request data
   114  * Start writing out the request data
   115  */
   115  */
   120     // just fake a call to the event handler
   120     // just fake a call to the event handler
   121     _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
   121     _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
   122 }
   122 }
   123 
   123 
   124 /*
   124 /*
       
   125  * Write out the final \r\n to terminate the request data
       
   126  */
       
   127 void memcache_conn_finish_req_data (struct memcache_conn *conn) {
       
   128     if (bufferevent_write(conn->bev, "\r\n", 2))
       
   129         PERROR("bufferevent_write");
       
   130     
       
   131     // ok
       
   132     return;
       
   133 
       
   134 error:
       
   135     memcache_conn_error(conn);
       
   136 }
       
   137 
       
   138 /*
   125  * Start reading a reply from the connection
   139  * Start reading a reply from the connection
   126  */
   140  */
   127 void memcache_conn_handle_reply (struct memcache_conn *conn) {
   141 void memcache_conn_handle_reply (struct memcache_conn *conn) {
   128     // ensure that we either didn't have a command, or it has been sent
   142     // ensure that we either didn't have a command, or it has been sent
   129     assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
   143     assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
   137 
   151 
   138     // ok, wait for the reply
   152     // ok, wait for the reply
   139     return;
   153     return;
   140 
   154 
   141 error:
   155 error:
   142     memcache_conn_req_error(conn);
   156     memcache_conn_error(conn);
   143 }
   157 }
   144 
   158 
   145 /*
   159 /*
   146  * Start reading reply data from the connection
   160  * Start reading reply data from the connection
   147  */
   161  */
   208 
   222 
   209     // ok
   223     // ok
   210     return;
   224     return;
   211 
   225 
   212 error:
   226 error:
   213     memcache_conn_req_error(conn);
   227     memcache_conn_error(conn);
   214 }
   228 }
   215 
   229 
   216 /*
   230 /*
   217  * The connect() has finished
   231  * The connect() has finished
   218  */
   232  */
   256 
   270 
   257     // the command header has been sent
   271     // the command header has been sent
   258     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
   272     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
   259     
   273     
   260     // does this request have some data to be included in the request?
   274     // does this request have some data to be included in the request?
   261     if (conn->req->buf.data > 0) {
   275     // if the data has already been sent (we handle the final \r\n as well), then skip this.
       
   276     if (conn->req->have_buf && conn->req->buf.offset == 0) {
   262         // we need to send the request data next
   277         // we need to send the request data next
   263         memcache_conn_send_req_data(conn);
   278         memcache_conn_send_req_data(conn);
   264 
   279 
   265     } else {
   280     } else {
   266         // wait for a reply
   281         // wait for a reply
   320    
   335    
   321 error:
   336 error:
   322     // free the header data read from the buf
   337     // free the header data read from the buf
   323     free(header_data);
   338     free(header_data);
   324 
   339 
   325     memcache_conn_req_error(conn);
   340     memcache_conn_error(conn);
   326 }
   341 }
   327 
   342 
   328 
   343 
   329 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
   344 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
   330     struct memcache_conn *conn = arg;
   345     struct memcache_conn *conn = arg;
   363         // reschedule
   378         // reschedule
   364         if (event_add(&conn->ev_write, NULL))
   379         if (event_add(&conn->ev_write, NULL))
   365             PERROR("event_add");
   380             PERROR("event_add");
   366 
   381 
   367     } else {
   382     } else {
   368         // done! We can handle the reply now
   383         // done! Send the terminating \r\n next
   369         memcache_conn_handle_reply(conn);
   384         memcache_conn_finish_req_data(conn);
   370     }
   385     }
   371 
   386 
   372     // success
   387     // success
   373     return;
   388     return;
   374 
   389 
   426 error:
   441 error:
   427     // fail the entire connection
   442     // fail the entire connection
   428     memcache_conn_error(conn);
   443     memcache_conn_error(conn);
   429 }
   444 }
   430 
   445 
       
   446 // XXX: need to flush/disable buffers/events on errors
       
   447 
   431 /*
   448 /*
   432  * The entire connection failed
   449  * The entire connection failed
   433  */
   450  */
   434 static void memcache_conn_error (struct memcache_conn *conn) {
   451 static void memcache_conn_error (struct memcache_conn *conn) {
   435     // fail the request, if we have one
   452     // fail the request, if we have one
   436     if (conn->req)
   453     if (conn->req) {
   437         memcache_conn_req_error(conn);
   454         // error out the req
       
   455         memcache_req_error(conn->req);
       
   456         assert(conn->req->conn == NULL);
       
   457         
       
   458         // we are now available again
       
   459         conn->req = NULL;
       
   460     }
       
   461     
       
   462     // close the connection
       
   463     memcache_conn_close(conn);
   438 
   464 
   439     // tell the server we failed
   465     // tell the server we failed
   440     memcache_server_conn_dead(conn->server, conn);
   466     memcache_server_conn_dead(conn->server, conn);
   441 }
       
   442 
       
   443 /*
       
   444  * Request failed somehow
       
   445  */
       
   446 static void memcache_conn_req_error (struct memcache_conn *conn) {
       
   447     // ensure that we do currently have a req
       
   448     assert(conn->req);
       
   449     
       
   450     // error out the req
       
   451     memcache_req_error(conn->req);
       
   452     assert(conn->req->conn == NULL);
       
   453     
       
   454     // we are now available again
       
   455     conn->req = NULL;
       
   456    
       
   457 }
   467 }
   458 
   468 
   459 /*
   469 /*
   460  * Detach the request
   470  * Detach the request
   461  */
   471  */
   467     memcache_req_done(conn->req);
   477     memcache_req_done(conn->req);
   468     assert(conn->req->conn == NULL);
   478     assert(conn->req->conn == NULL);
   469     
   479     
   470     // we are now available again
   480     // we are now available again
   471     conn->req = NULL;
   481     conn->req = NULL;
   472 }
   482     
   473 
   483     memcache_server_conn_ready(conn->server, conn);
   474 void memcache_conn_free (struct memcache_conn *conn) {
   484 }
   475     // ensure we don't have a req bound to us
   485 
   476     assert(conn->req == NULL);
   486 void memcache_conn_close (struct memcache_conn *conn) {
   477 
       
   478     // ensure that the connection is not considered to be connected anymore
       
   479     assert(!conn->is_connected);
       
   480     
       
   481     // close the fd if needed
   487     // close the fd if needed
   482     if (conn->fd > 0) {
   488     if (conn->fd > 0) {
   483         if (close(conn->fd))
   489         if (close(conn->fd))
   484             PWARNING("close");
   490             PWARNING("close");
   485 
   491 
   490     assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
   496     assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
   491     assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
   497     assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
   492     assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
   498     assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
   493     
   499     
   494     // free the bufferevent
   500     // free the bufferevent
   495     if (conn->bev)
   501     if (conn->bev) {
   496         bufferevent_free(conn->bev);
   502         bufferevent_free(conn->bev);
   497 
   503 
       
   504         conn->bev = NULL;
       
   505     }
       
   506 
       
   507     // not connected anymore
       
   508     conn->is_connected = 0;
       
   509 }
       
   510 
       
   511 void memcache_conn_free (struct memcache_conn *conn) {
       
   512     // ensure we don't have a req bound to us
       
   513     assert(conn->req == NULL);
       
   514     
       
   515     // ensure that the connection is not considered to be connected anymore
       
   516     assert(!conn->is_connected);
       
   517     
   498     // free it
   518     // free it
   499     free(conn);
   519     free(conn);
   500 }
   520 }
   501 
   521