terom@38: terom@38: #include terom@38: #include terom@39: #include terom@41: #include terom@38: #include terom@38: terom@38: #include "connection.h" terom@41: #include "command.h" terom@41: #include "request.h" terom@38: #include "../socket.h" terom@38: #include "../common.h" terom@38: terom@38: static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); terom@41: static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); terom@41: static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); terom@41: static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); terom@41: static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); terom@38: terom@42: static void memcache_conn_req_done (struct memcache_conn *conn); terom@42: terom@38: struct memcache_conn *memcache_conn_open (struct memcache_server *server) { terom@38: struct memcache_conn *conn = NULL; terom@38: terom@38: if ((conn = calloc(1, sizeof(*conn))) == NULL) terom@38: ERROR("calloc"); terom@38: terom@38: // remember the server terom@38: conn->server = server; terom@41: terom@38: // attempt connect terom@38: if (memcache_conn_connect(conn)) terom@38: ERROR("failed to connect to server"); terom@38: terom@38: // success terom@38: return conn; terom@38: terom@38: error: terom@38: free(conn); terom@38: terom@38: return NULL; terom@38: } terom@38: terom@38: int memcache_conn_connect (struct memcache_conn *conn) { terom@39: assert(conn->fd <= 0 && !conn->is_connected); terom@38: terom@38: // begin connect terom@38: if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1) terom@38: goto error; terom@38: terom@39: // fd 0 should be stdin... terom@39: assert(conn->fd > 0); terom@39: terom@38: // set up the connect event terom@38: event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn); terom@38: terom@38: // add it terom@38: if (event_add(&conn->ev, NULL)) terom@38: PERROR("event_add"); terom@38: terom@38: // success terom@38: return 0; terom@38: terom@38: error: terom@38: if (conn->fd > 0) { terom@38: if (close(conn->fd)) terom@38: PWARNING("close %d", conn->fd); terom@38: terom@38: conn->fd = -1; terom@38: } terom@38: terom@38: return -1; terom@38: } terom@38: terom@39: int memcache_conn_is_available (struct memcache_conn *conn) { terom@39: return (conn->fd > 0 && conn->is_connected && conn->req == NULL); terom@39: } terom@39: terom@38: int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) { terom@39: assert(conn->fd > 0 && conn->is_connected); terom@38: assert(conn->req == NULL); terom@38: terom@38: // store the req terom@38: conn->req = req; terom@38: terom@41: // write the request header into our bufferevent's output buffer terom@41: if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) terom@41: ERROR("failed to init the cmd"); terom@41: terom@41: // tell our bufferevent to send it terom@41: if (bufferevent_enable(conn->bev, EV_WRITE)) terom@41: PERROR("bufferevent_enable"); terom@41: terom@42: // tell the req that it is underway terom@42: memcache_req_send(req); terom@42: terom@41: // wait for that to complete terom@41: return 0; terom@38: terom@38: error: terom@38: return -1; terom@38: } terom@38: terom@38: /* terom@41: * Start writing out the request data terom@41: */ terom@41: void memcache_conn_send_req_data (struct memcache_conn *conn) { terom@41: // just fake a call to the event handler terom@41: _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); terom@41: } terom@41: terom@41: /* terom@41: * Start reading a reply from the connection terom@41: */ terom@41: void memcache_conn_handle_reply (struct memcache_conn *conn) { terom@41: // ensure that we either didn't have a command, or it has been sent terom@41: assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); terom@41: terom@41: // start reading on the bufferevent terom@41: if (bufferevent_enable(conn->bev, EV_READ)) terom@41: PERROR("bufferevent_enable"); terom@41: terom@41: // ok, wait for the reply terom@41: return; terom@41: terom@41: error: terom@41: // XXX: error handling terom@41: assert(0); terom@41: } terom@41: terom@41: /* terom@42: * Start reading reply data from the connection terom@42: */ terom@42: void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) { terom@42: // XXX: implement terom@42: assert(0); terom@42: } terom@42: terom@42: /* terom@38: * The connect() has finished terom@38: */ terom@38: static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) { terom@38: struct memcache_conn *conn = arg; terom@39: int error; terom@38: terom@39: if (socket_check_error(fd, &error)) terom@39: goto error; terom@39: terom@39: if (error) terom@39: ERROR("connect failed: %s", strerror(error)); terom@39: terom@41: // set up the bufferevent terom@41: if ((conn->bev = bufferevent_new(fd, terom@41: &_memcache_conn_bev_read, terom@41: &_memcache_conn_bev_write, terom@41: &_memcache_conn_bev_error, terom@41: conn terom@41: )) == NULL) terom@41: ERROR("bufferevent_new"); terom@41: terom@39: // mark us as succesfully connected terom@39: conn->is_connected = 1; terom@38: terom@38: // notify the server terom@38: memcache_server_conn_ready(conn->server, conn); terom@39: terom@39: // good terom@39: return; terom@39: terom@39: error: terom@39: // notify the server terom@39: memcache_server_conn_dead(conn->server, conn); terom@38: } terom@38: terom@41: /* terom@41: * The write buffer is empty, which means that we have written out a command header terom@41: */ terom@41: static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: terom@41: // the command header has been sent terom@41: assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); terom@41: terom@41: // does this request have some data to be included in the request? terom@41: if (conn->req->buf.data > 0) { terom@41: // we need to send the request data next terom@41: memcache_conn_send_req_data(conn); terom@41: terom@41: } else { terom@41: // wait for a reply terom@41: memcache_conn_handle_reply(conn); terom@41: } terom@41: } terom@41: terom@41: /* terom@41: * We have received some reply data, which should include the complete reply line at some point terom@41: */ terom@41: static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: struct evbuffer *in_buf = bufferevent_get_input(bev); terom@42: struct memcache_key key; terom@41: char *header_data; terom@41: enum memcache_reply reply_type; terom@41: int has_data; terom@41: terom@41: // ensure that we do indeed have some data terom@41: assert(evbuffer_get_length(in_buf) > 0); terom@41: terom@41: // attempt to parse the response header terom@42: if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data)) terom@41: ERROR("memcache_cmd_parse_header"); terom@42: terom@42: if (!header_data) { terom@42: // no complete header received yet terom@42: return; terom@42: } terom@41: terom@42: // disable reads again terom@42: if (bufferevent_disable(bev, EV_READ)) terom@42: PERROR("bufferevent_disable"); terom@42: terom@42: // does the reply include data? terom@42: if (has_data) { terom@42: // check that they key is the same terom@42: if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0) terom@42: ERROR("got reply with wrong key !?!"); terom@42: terom@42: // start reading the data (including whatever might be left over in the bufferevent buffer...) terom@42: // XXX: what if this triggers a req notify before we do? terom@42: memcache_conn_handle_reply_data(conn, in_buf); terom@42: terom@42: } else { terom@42: // the request is done with terom@42: memcache_conn_req_done(conn); terom@42: } terom@42: terom@42: // notify the request terom@42: memcache_req_reply(conn->req, reply_type); terom@41: terom@41: error: terom@41: // XXX: error handling terom@41: return; terom@41: } terom@41: terom@41: terom@41: static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { terom@41: // XXX: error handling terom@41: assert(0); terom@41: } terom@41: terom@41: static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { terom@41: struct memcache_conn *conn = arg; terom@41: struct memcache_buf *buf = &conn->req->buf; terom@41: int ret; terom@41: terom@41: // correct event terom@41: assert(event == EV_WRITE); terom@41: terom@41: // we do indeed have data to send terom@41: assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); terom@41: terom@41: // do the actual write() terom@41: if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) terom@41: PERROR("write"); terom@41: terom@41: // should never be the case... ? terom@41: if (ret == 0) terom@41: ERROR("write returned 0 !?!"); terom@41: terom@41: // did we manage to write some data? terom@41: if (ret > 0) { terom@41: // update offset terom@41: buf->offset += ret; terom@41: } terom@41: terom@41: // data left to write? terom@41: if (buf->offset < buf->len) { terom@41: // reschedule terom@41: if (event_add(&conn->ev, NULL)) terom@41: PERROR("event_add"); terom@41: terom@41: } else { terom@41: // done! We can handle the reply now terom@41: memcache_conn_handle_reply(conn); terom@41: } terom@41: terom@41: // success terom@41: return; terom@41: terom@41: error: terom@41: // XXX: error handling terom@41: assert(0); terom@41: } terom@41: terom@42: /* terom@42: * Detach the request terom@42: */ terom@42: static void memcache_conn_req_done (struct memcache_conn *conn) { terom@42: // ensure that we do currently have a req terom@42: assert(conn->req); terom@42: terom@42: // have the req detach and check it did so terom@42: memcache_req_done(conn->req); terom@42: assert(conn->req->conn == NULL); terom@42: terom@42: // we are now available again terom@42: conn->req = NULL; terom@42: } terom@42: terom@39: void memcache_conn_free (struct memcache_conn *conn) { terom@42: // XXX: conn->req? terom@42: terom@39: // ensure that the connection is not considered to be connected anymore terom@39: assert(!conn->is_connected); terom@39: terom@39: // close the fd if needed terom@39: if (conn->fd > 0) { terom@39: if (close(conn->fd)) terom@39: PWARNING("close"); terom@39: terom@39: conn->fd = 0; terom@39: } terom@39: terom@39: // ensure that the event is not pending anymore terom@39: assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0); terom@39: terom@39: // free it terom@39: free(conn); terom@39: } terom@39: