memcache/connection.c
changeset 41 540737bf6bac
parent 39 0e21a65074a6
child 42 0e503189af2f
equal deleted inserted replaced
40:9cecd22e643a 41:540737bf6bac
     1 
     1 
     2 #include <stdlib.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     3 #include <unistd.h>
     4 #include <string.h>
     4 #include <string.h>
       
     5 #include <errno.h>
     5 #include <assert.h>
     6 #include <assert.h>
     6 
     7 
     7 #include "connection.h"
     8 #include "connection.h"
       
     9 #include "command.h"
       
    10 #include "request.h"
     8 #include "../socket.h"
    11 #include "../socket.h"
     9 #include "../common.h"
    12 #include "../common.h"
    10 
    13 
    11 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
    14 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
       
    15 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
       
    16 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
       
    17 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
       
    18 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
    12 
    19 
    13 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    20 struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    14     struct memcache_conn *conn = NULL;
    21     struct memcache_conn *conn = NULL;
    15 
    22 
    16     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    23     if ((conn = calloc(1, sizeof(*conn))) == NULL)
    17         ERROR("calloc");
    24         ERROR("calloc");
    18     
    25     
    19     // remember the server
    26     // remember the server
    20     conn->server = server;
    27     conn->server = server;
    21     
    28 
    22     // attempt connect
    29     // attempt connect
    23     if (memcache_conn_connect(conn))
    30     if (memcache_conn_connect(conn))
    24         ERROR("failed to connect to server");
    31         ERROR("failed to connect to server");
    25     
    32     
    26     // success
    33     // success
    72     assert(conn->req == NULL);
    79     assert(conn->req == NULL);
    73 
    80 
    74     // store the req
    81     // store the req
    75     conn->req = req;
    82     conn->req = req;
    76 
    83 
    77     // XXX: transmit it
    84     // write the request header into our bufferevent's output buffer
       
    85     if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
       
    86         ERROR("failed to init the cmd");
       
    87     
       
    88     // tell our bufferevent to send it
       
    89     if (bufferevent_enable(conn->bev, EV_WRITE))
       
    90         PERROR("bufferevent_enable");
       
    91     
       
    92     // wait for that to complete
       
    93     return 0;
    78 
    94 
    79 error:
    95 error:
    80     return -1;
    96     return -1;
       
    97 }
       
    98 
       
    99 /*
       
   100  * Start writing out the request data
       
   101  */
       
   102 void memcache_conn_send_req_data (struct memcache_conn *conn) {
       
   103     // just fake a call to the event handler
       
   104     _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
       
   105 }
       
   106 
       
   107 /*
       
   108  * Start reading a reply from the connection
       
   109  */
       
   110 void memcache_conn_handle_reply (struct memcache_conn *conn) {
       
   111     // ensure that we either didn't have a command, or it has been sent
       
   112     assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
       
   113 
       
   114     // start reading on the bufferevent
       
   115     if (bufferevent_enable(conn->bev, EV_READ))
       
   116         PERROR("bufferevent_enable");
       
   117 
       
   118     // ok, wait for the reply
       
   119     return;
       
   120 
       
   121 error:
       
   122     // XXX: error handling
       
   123     assert(0);
    81 }
   124 }
    82 
   125 
    83 /*
   126 /*
    84  * The connect() has finished
   127  * The connect() has finished
    85  */
   128  */
    91         goto error;
   134         goto error;
    92 
   135 
    93     if (error)
   136     if (error)
    94         ERROR("connect failed: %s", strerror(error));
   137         ERROR("connect failed: %s", strerror(error));
    95 
   138 
       
   139     // set up the bufferevent
       
   140     if ((conn->bev = bufferevent_new(fd, 
       
   141         &_memcache_conn_bev_read,
       
   142         &_memcache_conn_bev_write,
       
   143         &_memcache_conn_bev_error,
       
   144         conn
       
   145     )) == NULL)
       
   146         ERROR("bufferevent_new");
       
   147 
    96     // mark us as succesfully connected
   148     // mark us as succesfully connected
    97     conn->is_connected = 1;
   149     conn->is_connected = 1;
    98 
   150 
    99     // notify the server
   151     // notify the server
   100     memcache_server_conn_ready(conn->server, conn);
   152     memcache_server_conn_ready(conn->server, conn);
   103     return;
   155     return;
   104 
   156 
   105 error:
   157 error:
   106     // notify the server
   158     // notify the server
   107     memcache_server_conn_dead(conn->server, conn);
   159     memcache_server_conn_dead(conn->server, conn);
       
   160 }
       
   161 
       
   162 /*
       
   163  * The write buffer is empty, which means that we have written out a command header
       
   164  */
       
   165 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
       
   166     struct memcache_conn *conn = arg;
       
   167 
       
   168     // the command header has been sent
       
   169     assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
       
   170     
       
   171     // does this request have some data to be included in the request?
       
   172     if (conn->req->buf.data > 0) {
       
   173         // we need to send the request data next
       
   174         memcache_conn_send_req_data(conn);
       
   175 
       
   176     } else {
       
   177         // wait for a reply
       
   178         memcache_conn_handle_reply(conn);
       
   179     }
       
   180 }
       
   181 
       
   182 /*
       
   183  * We have received some reply data, which should include the complete reply line at some point
       
   184  */
       
   185 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
       
   186     struct memcache_conn *conn = arg;
       
   187     struct evbuffer *in_buf = bufferevent_get_input(bev);
       
   188     char *header_data;
       
   189     enum memcache_reply reply_type;
       
   190     int has_data;
       
   191     
       
   192     // ensure that we do indeed have some data
       
   193     assert(evbuffer_get_length(in_buf) > 0);
       
   194 
       
   195     // attempt to parse the response header
       
   196     if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data))
       
   197         ERROR("memcache_cmd_parse_header");
       
   198 
       
   199     // XXX: read reply data
       
   200 
       
   201 error:
       
   202     // XXX: error handling
       
   203     return;
       
   204 }
       
   205 
       
   206 
       
   207 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
       
   208     // XXX: error handling
       
   209     assert(0);
       
   210 }
       
   211 
       
   212 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
       
   213     struct memcache_conn *conn = arg;
       
   214     struct memcache_buf *buf = &conn->req->buf;
       
   215     int ret;
       
   216 
       
   217     // correct event
       
   218     assert(event == EV_WRITE);
       
   219 
       
   220     // we do indeed have data to send
       
   221     assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
       
   222     
       
   223     // do the actual write()
       
   224     if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
       
   225         PERROR("write");
       
   226 
       
   227     // should never be the case... ?
       
   228     if (ret == 0)
       
   229         ERROR("write returned 0 !?!");
       
   230     
       
   231     // did we manage to write some data?
       
   232     if (ret > 0) {
       
   233         // update offset
       
   234         buf->offset += ret;
       
   235     }
       
   236 
       
   237     // data left to write?
       
   238     if (buf->offset < buf->len) {
       
   239         // reschedule
       
   240         if (event_add(&conn->ev, NULL))
       
   241             PERROR("event_add");
       
   242 
       
   243     } else {
       
   244         // done! We can handle the reply now
       
   245         memcache_conn_handle_reply(conn);
       
   246     }
       
   247 
       
   248     // success
       
   249     return;
       
   250 
       
   251 error:
       
   252     // XXX: error handling
       
   253     assert(0);
   108 }
   254 }
   109 
   255 
   110 void memcache_conn_free (struct memcache_conn *conn) {
   256 void memcache_conn_free (struct memcache_conn *conn) {
   111     // ensure that the connection is not considered to be connected anymore
   257     // ensure that the connection is not considered to be connected anymore
   112     assert(!conn->is_connected);
   258     assert(!conn->is_connected);