terom@38: terom@38: #include terom@38: #include terom@39: #include terom@41: #include terom@38: #include terom@38: terom@38: #include "connection.h" terom@41: #include "command.h" terom@41: #include "request.h" terom@38: #include "../socket.h" terom@38: #include "../common.h" terom@38: terom@38: static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); terom@41: static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); terom@41: static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); terom@41: static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); terom@41: static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); terom@43: static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg); terom@38: terom@43: static void memcache_conn_error (struct memcache_conn *conn); terom@43: static void memcache_conn_req_error (struct memcache_conn *conn); terom@42: static void memcache_conn_req_done (struct memcache_conn *conn); terom@42: terom@38: struct memcache_conn *memcache_conn_open (struct memcache_server *server) { terom@38: struct memcache_conn *conn = NULL; terom@38: terom@38: if ((conn = calloc(1, sizeof(*conn))) == NULL) terom@38: ERROR("calloc"); terom@38: terom@38: // remember the server terom@38: conn->server = server; terom@41: terom@38: // attempt connect terom@38: if (memcache_conn_connect(conn)) terom@38: ERROR("failed to connect to server"); terom@38: terom@38: // success terom@38: return conn; terom@38: terom@38: error: terom@38: free(conn); terom@38: terom@38: return NULL; terom@38: } terom@38: terom@38: int memcache_conn_connect (struct memcache_conn *conn) { terom@39: assert(conn->fd <= 0 && !conn->is_connected); terom@38: terom@38: // begin connect terom@38: if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1) terom@38: goto error; terom@38: terom@39: // fd 0 should be stdin... terom@39: assert(conn->fd > 0); terom@39: terom@38: // set up the connect event terom@43: event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn); terom@38: terom@38: // add it terom@43: if (event_add(&conn->ev_connect, NULL)) terom@38: PERROR("event_add"); terom@38: terom@38: // success terom@38: return 0; terom@38: terom@38: error: terom@38: if (conn->fd > 0) { terom@38: if (close(conn->fd)) terom@38: PWARNING("close %d", conn->fd); terom@38: terom@38: conn->fd = -1; terom@38: } terom@38: terom@38: return -1; terom@38: } terom@38: terom@39: int memcache_conn_is_available (struct memcache_conn *conn) { terom@39: return (conn->fd > 0 && conn->is_connected && conn->req == NULL); terom@39: } terom@39: terom@43: void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) { terom@39: assert(conn->fd > 0 && conn->is_connected); terom@38: assert(conn->req == NULL); terom@38: terom@38: // store the req terom@38: conn->req = req; terom@38: terom@41: // write the request header into our bufferevent's output buffer terom@43: if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) { terom@43: // just fail the request terom@43: memcache_conn_req_error(conn); terom@43: terom@41: ERROR("failed to init the cmd"); terom@43: } terom@41: terom@41: // tell our bufferevent to send it terom@43: if (bufferevent_enable(conn->bev, EV_WRITE)) { terom@43: // fail the entire connection terom@43: memcache_conn_error(conn); terom@43: terom@41: PERROR("bufferevent_enable"); terom@43: } terom@41: terom@42: // tell the req that it is underway terom@42: memcache_req_send(req); terom@42: terom@43: error: terom@38: terom@43: return; terom@38: } terom@38: terom@38: /* terom@41: * Start writing out the request data terom@41: */ terom@41: void memcache_conn_send_req_data (struct memcache_conn *conn) { terom@43: // set up the ev_write terom@43: event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn); terom@43: terom@41: // just fake a call to the event handler terom@41: _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); terom@41: } terom@41: terom@41: /* terom@41: * Start reading a reply from the connection terom@41: */ terom@41: void memcache_conn_handle_reply (struct memcache_conn *conn) { terom@41: // ensure that we either didn't have a command, or it has been sent terom@41: assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); terom@41: terom@43: // start/continue reading on the bufferevent terom@41: if (bufferevent_enable(conn->bev, EV_READ)) terom@41: PERROR("bufferevent_enable"); terom@41: terom@43: // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback terom@43: // will consume all available data iteratively. terom@43: terom@41: // ok, wait for the reply terom@41: return; terom@41: terom@41: error: terom@43: memcache_conn_req_error(conn); terom@41: } terom@41: terom@41: /* terom@42: * Start reading reply data from the connection terom@42: */ terom@42: void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) { terom@43: int ret; terom@43: terom@43: // check that the buf doesn't contain any data terom@43: assert(conn->req->buf.data == NULL); terom@43: terom@43: // bytes *may* be zero if we have an empty cache entry terom@43: if (conn->req->obj.bytes > 0) { terom@43: // allocate a buffer for the reply data terom@43: if ((conn->req->buf.data = malloc(conn->req->obj.bytes)) == NULL) terom@43: ERROR("malloc"); terom@43: terom@43: // update the length terom@43: conn->req->buf.len = conn->req->obj.bytes; terom@43: terom@43: // set offset to zero terom@43: conn->req->buf.offset = 0; terom@43: terom@43: // do we have any data in the buf that we need to copy? terom@43: if (evbuffer_get_length(buf) > 0) { terom@43: // read the data into the memcache_buf terom@43: ret = evbuffer_remove(buf, conn->req->buf.data, conn->req->buf.len); terom@43: terom@43: // sanity check... terom@43: assert(ret > 0 && ret <= conn->req->buf.len); terom@43: terom@43: // update offset terom@43: conn->req->buf.offset += ret; terom@43: } terom@43: terom@43: // still need to receive more data? terom@43: if (conn->req->buf.offset < conn->req->buf.len) { terom@43: terom@43: // disable the bufferevent while we read the data terom@43: if (bufferevent_disable(conn->bev, EV_READ)) terom@43: PERROR("bufferevent_disable"); terom@43: terom@43: // set up the ev_read terom@43: event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn); terom@43: terom@43: // then receive what data is left to receive terom@43: _memcache_conn_ev_read(conn->fd, EV_READ, conn); terom@43: terom@43: // wait for the data to arrive terom@43: return; terom@43: terom@43: } else { terom@43: // the buffer already contained the cache data, no need to read any more terom@43: terom@43: } terom@43: } else { terom@43: // there is no data to receive for this item, so we can ignore this terom@43: terom@43: } terom@43: terom@43: // we kind of "recurse" to handle the MEMCACHE_RPL_END reply, that is, we activate the bufferevent for EV_READ terom@43: // again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line" after the data and then terom@43: // return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause req_done to be called. terom@43: // Elegant! terom@43: memcache_conn_handle_reply(conn); terom@43: terom@43: // ok terom@43: return; terom@43: terom@43: error: terom@43: memcache_conn_req_error(conn); terom@42: } terom@42: terom@42: /* terom@38: * The connect() has finished terom@38: */ terom@38: static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { terom@38: struct memcache_conn *conn = arg; terom@39: int error; terom@38: terom@39: if (socket_check_error(fd, &error)) terom@39: goto error; terom@39: terom@39: if (error) terom@39: ERROR("connect failed: %s", strerror(error)); terom@39: terom@41: // set up the bufferevent terom@41: if ((conn->bev = bufferevent_new(fd, terom@41: &_memcache_conn_bev_read, terom@41: &_memcache_conn_bev_write, terom@41: &_memcache_conn_bev_error, terom@41: conn terom@41: )) == NULL) terom@41: ERROR("bufferevent_new"); terom@41: terom@39: // mark us as succesfully connected terom@39: conn->is_connected = 1; terom@38: terom@38: // notify the server terom@38: memcache_server_conn_ready(conn->server, conn); terom@39: terom@39: // good terom@39: return; terom@39: terom@39: error: terom@43: memcache_conn_error(conn); terom@38: } terom@38: terom@41: /* terom@41: * The write buffer is empty, which means that we have written out a command header terom@41: */ terom@41: static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: terom@41: // the command header has been sent terom@41: assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); terom@41: terom@41: // does this request have some data to be included in the request? terom@41: if (conn->req->buf.data > 0) { terom@41: // we need to send the request data next terom@41: memcache_conn_send_req_data(conn); terom@41: terom@41: } else { terom@41: // wait for a reply terom@41: memcache_conn_handle_reply(conn); terom@41: } terom@41: } terom@41: terom@41: /* terom@41: * We have received some reply data, which should include the complete reply line at some point terom@41: */ terom@41: static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: struct evbuffer *in_buf = bufferevent_get_input(bev); terom@42: struct memcache_key key; terom@41: char *header_data; terom@41: enum memcache_reply reply_type; terom@41: int has_data; terom@41: terom@41: // ensure that we do indeed have some data terom@41: assert(evbuffer_get_length(in_buf) > 0); terom@42: terom@43: // consume as much data as possible terom@43: do { terom@43: // attempt to parse the response header terom@43: if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) terom@43: ERROR("memcache_cmd_parse_header"); terom@43: terom@43: if (!header_data) { terom@43: // no complete header received yet terom@43: return; terom@43: } terom@41: terom@43: // if the reply contains data, check that they key is the same terom@43: if (has_data && (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)) terom@42: ERROR("got reply with wrong key !?!"); terom@42: terom@43: // notify the request (no reply data is ready for reading yet, though) terom@44: memcache_req_recv(conn->req, reply_type); terom@43: terom@43: // does the reply include data? terom@43: if (has_data) { terom@43: // start reading the data (including whatever might be left over in the bufferevent buffer...) terom@43: memcache_conn_handle_reply_data(conn, in_buf); terom@42: terom@43: } else { terom@43: // the request is done with terom@43: memcache_conn_req_done(conn); terom@43: } terom@43: terom@43: // free the header data terom@43: free(header_data); terom@42: terom@43: } while (evbuffer_get_length(in_buf) > 0); terom@43: terom@43: // done terom@43: return; terom@43: terom@43: error: terom@43: // free the header data read from the buf terom@43: free(header_data); terom@41: terom@43: memcache_conn_req_error(conn); terom@41: } terom@41: terom@41: terom@41: static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { terom@43: struct memcache_conn *conn = arg; terom@43: terom@43: // fail the entire connection terom@43: memcache_conn_error(conn); terom@41: } terom@41: terom@41: static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: struct memcache_buf *buf = &conn->req->buf; terom@41: int ret; terom@41: terom@41: // correct event terom@41: assert(event == EV_WRITE); terom@41: terom@41: // we do indeed have data to send terom@41: assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); terom@41: terom@41: // do the actual write() terom@41: if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) terom@41: PERROR("write"); terom@41: terom@41: // should never be the case... ? terom@41: if (ret == 0) terom@43: ERROR("write returned EOF !?!"); terom@41: terom@41: // did we manage to write some data? terom@41: if (ret > 0) { terom@41: // update offset terom@41: buf->offset += ret; terom@41: } terom@41: terom@41: // data left to write? terom@41: if (buf->offset < buf->len) { terom@41: // reschedule terom@43: if (event_add(&conn->ev_write, NULL)) terom@41: PERROR("event_add"); terom@41: terom@41: } else { terom@41: // done! We can handle the reply now terom@41: memcache_conn_handle_reply(conn); terom@41: } terom@41: terom@41: // success terom@41: return; terom@41: terom@41: error: terom@43: // fail the entire connection terom@43: memcache_conn_error(conn); terom@43: } terom@43: terom@43: static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) { terom@43: struct memcache_conn *conn = arg; terom@43: struct memcache_buf *buf = &conn->req->buf; terom@43: int ret; terom@43: terom@43: // correct event terom@43: assert(event == EV_READ); terom@43: terom@43: // we do indeed expect to receive data terom@43: assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); terom@43: terom@43: // do the actual read() terom@43: if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) terom@43: PERROR("read"); terom@43: terom@43: // should never be the case... ? terom@43: if (ret == 0) terom@43: ERROR("read returned EOF !?!"); terom@43: terom@43: // did we manage to read some data? terom@43: if (ret > 0) { terom@43: // update offset terom@43: buf->offset += ret; terom@43: } terom@43: terom@43: // only notify the req if new data was received, and we won't be calling req_done next. terom@43: if (ret > 0 && buf->offset < buf->len) { terom@43: // notify the req terom@43: memcache_req_data(conn->req); terom@43: } terom@43: terom@43: // data left to read? terom@43: if (buf->offset < buf->len) { terom@43: // reschedule terom@43: if (event_add(&conn->ev_read, NULL)) terom@43: PERROR("event_add"); terom@43: terom@43: terom@43: } else { terom@43: // done! We can let the bufferenvet handle the rest of the reply now terom@43: memcache_conn_handle_reply(conn); terom@43: } terom@43: terom@43: // success terom@43: return; terom@43: terom@43: error: terom@43: // fail the entire connection terom@43: memcache_conn_error(conn); terom@43: } terom@43: terom@43: /* terom@43: * The entire connection failed terom@43: */ terom@43: static void memcache_conn_error (struct memcache_conn *conn) { terom@43: // fail the request, if we have one terom@43: if (conn->req) terom@43: memcache_conn_req_error(conn); terom@43: terom@43: // tell the server we failed terom@43: memcache_server_conn_dead(conn->server, conn); terom@43: } terom@43: terom@43: /* terom@43: * Request failed somehow terom@43: */ terom@43: static void memcache_conn_req_error (struct memcache_conn *conn) { terom@43: // ensure that we do currently have a req terom@43: assert(conn->req); terom@43: terom@43: // error out the req terom@43: memcache_req_error(conn->req); terom@43: assert(conn->req->conn == NULL); terom@43: terom@43: // we are now available again terom@43: conn->req = NULL; terom@43: terom@41: } terom@41: terom@42: /* terom@42: * Detach the request terom@42: */ terom@42: static void memcache_conn_req_done (struct memcache_conn *conn) { terom@42: // ensure that we do currently have a req terom@42: assert(conn->req); terom@42: terom@42: // have the req detach and check it did so terom@42: memcache_req_done(conn->req); terom@42: assert(conn->req->conn == NULL); terom@42: terom@42: // we are now available again terom@42: conn->req = NULL; terom@42: } terom@42: terom@39: void memcache_conn_free (struct memcache_conn *conn) { terom@43: // ensure we don't have a req bound to us terom@43: assert(conn->req == NULL); terom@42: terom@39: // ensure that the connection is not considered to be connected anymore terom@39: assert(!conn->is_connected); terom@39: terom@39: // close the fd if needed terom@39: if (conn->fd > 0) { terom@39: if (close(conn->fd)) terom@39: PWARNING("close"); terom@39: terom@39: conn->fd = 0; terom@39: } terom@39: terom@43: // ensure that the events are not pending anymore terom@43: assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0); terom@43: assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0); terom@43: assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0); terom@39: terom@43: // free the bufferevent terom@43: if (conn->bev) terom@43: bufferevent_free(conn->bev); terom@43: terom@39: // free it terom@39: free(conn); terom@39: } terom@39: