#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "connection.h"
#include "memcache.h"
#include "command.h"
#include "request.h"
#include "../socket.h"
#include "../common.h"
void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint);
void memcache_conn_send_data (struct memcache_conn *conn);
void memcache_conn_send_end (struct memcache_conn *conn);
void memcache_conn_send_done (struct memcache_conn *conn);
void memcache_conn_recv_next (struct memcache_conn *conn);
void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf);
void memcache_conn_recv_done (struct memcache_conn *conn);
void memcache_conn_recv_end (struct memcache_conn *conn);
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);
static void memcache_conn_error (struct memcache_conn *conn);
static void memcache_conn_close (struct memcache_conn *conn);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
if ((conn = calloc(1, sizeof(*conn))) == NULL)
ERROR("calloc");
// remember the server
conn->server = server;
// init the req queue
TAILQ_INIT(&conn->req_queue);
// attempt connect
if (memcache_conn_connect(conn))
ERROR("failed to connect to server");
// success
return conn;
error:
free(conn);
return NULL;
}
int memcache_conn_connect (struct memcache_conn *conn) {
assert(conn->fd <= 0 && !conn->is_connected);
// begin connect
if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
goto error;
// fd 0 should be stdin...
assert(conn->fd > 0);
// set up the connect event
event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
// add it
if (event_add(&conn->ev_connect, NULL))
PERROR("event_add");
// success
return 0;
error:
if (conn->fd > 0) {
if (close(conn->fd))
PWARNING("close %d", conn->fd);
conn->fd = -1;
}
return -1;
}
int memcache_conn_is_available (struct memcache_conn *conn) {
return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests));
}
void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
assert(memcache_conn_is_available(conn));
// XXX: validate req
// stick the req into the req queue
TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node);
// if send is idle...
if (conn->send_req == NULL)
memcache_conn_send_next(conn, req);
}
/*
* Send out the next request.
*
* If there is currently a send_req, it is considered as done.
*/
void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) {
struct memcache_req *req;
// req will be either
// * the next enqueued req after the one that was last written
// * hint, if no req was being written (the req that was just enqueued)
if (conn->send_req) {
assert(!hint);
// the nex req
req = TAILQ_NEXT(conn->send_req, reqqueue_node);
// and reset the send_req to NULL...
conn->send_req = NULL;
} else {
assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint);
// the given req
req = hint;
}
// while we still have a request to process...
while (req) {
// try and write the request header into our bufferevent's output buffer
if (memcache_cmd_format_header(bufferevent_get_output(conn->bev),
memcache_req_cmd(req),
memcache_req_key(req),
memcache_req_obj(req)
)) {
WARNING("invalid request");
memcache_req_error(req);
// continue on to the next request
req = TAILQ_NEXT(req, reqqueue_node);
continue;
}
// send this one
conn->send_req = req;
// enable our bufferevent to send it
if (bufferevent_enable(conn->bev, EV_WRITE))
PERROR("bufferevent_enable");
// tell the req that it is underway
memcache_req_send(req);
// done, we only want to process one
break;
}
// done, we either replaced send_req, or consumed them all
return;
error:
memcache_conn_error(conn);
}
/*
* Start writing out the request data
*/
void memcache_conn_send_data (struct memcache_conn *conn) {
if (conn->send_req->obj.bytes > 0) {
// set up the ev_write
event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);
// just fake a call to the event handler
_memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
} else {
// just send the \r\n
memcache_conn_send_end(conn);
}
}
/*
* Write out the final \r\n to terminate the request data
*/
void memcache_conn_send_end (struct memcache_conn *conn) {
// XXX: this will enable the bev by itself?
if (bufferevent_write(conn->bev, "\r\n", 2))
PERROR("bufferevent_write");
// ok
return;
error:
memcache_conn_error(conn);
}
/*
* Finished sending the current send_req
*/
void memcache_conn_send_done (struct memcache_conn *conn) {
assert(conn->send_req != NULL);
// send the next req, if there is one
memcache_conn_send_next(conn, NULL);
// if pipelining is on, it's a question of how many we've sent...
if (conn->server->mc->pipeline_requests)
memcache_server_conn_ready(conn->server, conn);
}
/*
* Start reading a reply from the connection
*/
void memcache_conn_recv_next (struct memcache_conn *conn) {
// start/continue reading on the bufferevent
if (bufferevent_enable(conn->bev, EV_READ))
PERROR("bufferevent_enable");
// Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
// will consume all available data iteratively.
// ok, wait for the reply
return;
error:
memcache_conn_error(conn);
}
/*
* Start reading reply data from the connection
*/
void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) {
struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
int ret;
// check that the buf doesn't contain any data
assert(req->buf.data == NULL);
// bytes *may* be zero if we have an empty cache entry
if (req->obj.bytes > 0) {
// XXX: memcache_req_make_buffer?
// allocate a buffer for the reply data
if ((req->buf.data = malloc(req->obj.bytes)) == NULL)
ERROR("malloc");
// update the length
req->buf.len = req->obj.bytes;
// set offset to zero
req->buf.offset = 0;
// and note that it is present, and is ours
req->have_buf = 1;
req->is_buf_ours = 1;
// do we have any data in the buf that we need to copy?
if (evbuffer_get_length(buf) > 0) {
// read the data into the memcache_buf
ret = evbuffer_remove(buf, req->buf.data, req->buf.len);
// sanity check...
assert(ret > 0 && ret <= req->buf.len);
// update offset
req->buf.offset += ret;
}
// still need to receive more data?
if (req->buf.offset < req->buf.len) {
// disable the bufferevent while we read the data
if (bufferevent_disable(conn->bev, EV_READ))
PERROR("bufferevent_disable");
// set up the ev_read
event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn);
// then receive what data is left to receive
_memcache_conn_ev_read(conn->fd, EV_READ, conn);
// wait for the data to arrive
return;
} else {
// the buffer already contained the cache data, no need to read any more
}
} else {
// there is no data to receive for this item, so we can ignore this
}
// finish it off
memcache_conn_recv_end(conn);
// ok
return;
error:
memcache_conn_error(conn);
}
/*
* Receive the final bits of data following the reply data block
*/
void memcache_conn_recv_end (struct memcache_conn *conn) {
// we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the
// bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line"
// after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause
// recv_done to be called.
// Elegant!
memcache_conn_recv_next(conn);
}
/*
* We have finished receiving our current request
*/
void memcache_conn_recv_done (struct memcache_conn *conn) {
struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
// we can remove it from the list now
TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
// have the req detach
memcache_req_done(req);
// and prepare to recv the next one
memcache_conn_recv_next(conn);
// if pipelining is off, it's a question of when we've recevied the reply...
if (!conn->server->mc->pipeline_requests)
memcache_server_conn_ready(conn->server, conn);
}
/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
struct memcache_conn *conn = arg;
int error;
if (socket_check_error(fd, &error))
goto error;
if (error)
ERROR("connect failed: %s", strerror(error));
// set up the bufferevent
if ((conn->bev = bufferevent_new(fd,
&_memcache_conn_bev_read,
&_memcache_conn_bev_write,
&_memcache_conn_bev_error,
conn
)) == NULL)
ERROR("bufferevent_new");
// mark us as succesfully connected
conn->is_connected = 1;
// and prepare to recv any response headers
memcache_conn_recv_next(conn);
// notify the server
memcache_server_conn_ready(conn->server, conn);
// good
return;
error:
memcache_conn_error(conn);
}
/*
* The write buffer is empty, which means that we have written out a command header
*/
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
assert(conn->send_req != NULL);
// the command header has been sent
assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
// does this request have some data to be included in the request?
// if the data has already been sent (we handle the final \r\n as well), then skip this.
if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) {
// we need to send the request data next
memcache_conn_send_data(conn);
} else {
// the request has now been sent, and se can send the next one
memcache_conn_send_done(conn);
}
}
/*
* We have received some reply data, which should include the complete reply line at some point
*/
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
struct evbuffer *in_buf = bufferevent_get_input(bev);
struct memcache_req *req;
struct memcache_key key;
char *header_data;
enum memcache_reply reply_type;
int has_data;
// ensure that we do indeed have some data
assert(evbuffer_get_length(in_buf) > 0);
// consume as much data as possible
do {
// the req we are processing
req = TAILQ_FIRST(&conn->req_queue);
// attempt to parse the response header
if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data))
ERROR("memcache_cmd_parse_header");
if (!header_data) {
// no complete header received yet
return;
}
// no request waiting?
if (req == NULL)
ERROR("got a response without any request pending: %s", header_data);
// if the reply contains data, check that they key is the same
if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0))
ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf);
// check it's a FETCH request
if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET)
ERROR("a data reply for a non-CMD_FETCH_* command !?!");
// notify the request (no reply data is ready for reading yet, though)
memcache_req_recv(req, reply_type);
// does the reply include data?
if (has_data) {
// start reading the data (including whatever might be left over in the bufferevent buffer...)
memcache_conn_recv_data(conn, in_buf);
} else {
// the request is done with
memcache_conn_recv_done(conn);
}
// free the header data, but not a second time on error exit
free(header_data); header_data = NULL;
} while (evbuffer_get_length(in_buf) > 0);
// done
return;
error:
// free the header data read from the buf
free(header_data);
memcache_conn_error(conn);
}
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
struct memcache_conn *conn = arg;
// fail the entire connection
memcache_conn_error(conn);
}
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
struct memcache_conn *conn = arg;
struct memcache_buf *buf = &conn->send_req->buf;
int ret;
// correct event
assert(event == EV_WRITE);
// we do indeed have data to send
assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
// do the actual write()
if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
PERROR("write");
// should never be the case... ?
if (ret == 0)
ERROR("write returned EOF !?!");
// did we manage to write some data?
if (ret > 0) {
// update offset
buf->offset += ret;
}
// data left to write?
if (buf->offset < buf->len) {
// reschedule
if (event_add(&conn->ev_write, NULL))
PERROR("event_add");
} else {
// done! Send the terminating \r\n next
memcache_conn_send_end(conn);
}
// success
return;
error:
// fail the entire connection
memcache_conn_error(conn);
}
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
struct memcache_conn *conn = arg;
struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
struct memcache_buf *buf = &req->buf;
int ret;
// correct event
assert(event == EV_READ);
// we do indeed expect to receive data
assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
// do the actual read()
if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
PERROR("read");
// should never be the case... ?
if (ret == 0)
ERROR("read returned EOF !?!");
// did we manage to read some data?
if (ret > 0) {
// update offset
buf->offset += ret;
}
// only notify the req if new data was received, and we won't be calling req_done next.
if (ret > 0 && buf->offset < buf->len) {
// notify the req
memcache_req_data(req);
}
// data left to read?
if (buf->offset < buf->len) {
// reschedule
if (event_add(&conn->ev_read, NULL))
PERROR("event_add");
} else {
// done! We can let the bufferenvet handle the rest of the reply now
memcache_conn_recv_end(conn);
}
// success
return;
error:
// fail the entire connection
memcache_conn_error(conn);
}
/*
* The entire connection failed
*/
static void memcache_conn_error (struct memcache_conn *conn) {
struct memcache_req *req;
// fail all requests, if we have any
TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) {
// error out the req
memcache_req_error(req);
TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
}
conn->send_req = NULL;
// close the connection
memcache_conn_close(conn);
// tell the server we failed
memcache_server_conn_dead(conn->server, conn);
}
void memcache_conn_close (struct memcache_conn *conn) {
// close the fd if needed
if (conn->fd > 0) {
if (close(conn->fd))
PWARNING("close");
conn->fd = 0;
}
// ensure that the events are not pending anymore
assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
// free the bufferevent
if (conn->bev) {
bufferevent_free(conn->bev);
conn->bev = NULL;
}
// not connected anymore
conn->is_connected = 0;
}
void memcache_conn_free (struct memcache_conn *conn) {
// ensure we don't have any reqs bound to us
assert(TAILQ_EMPTY(&conn->req_queue));
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
// free it
free(conn);
}