#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include "connection.h"
#include "command.h"
#include "request.h"
#include "../socket.h"
#include "../common.h"
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
static void memcache_conn_req_done (struct memcache_conn *conn);
struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
struct memcache_conn *conn = NULL;
if ((conn = calloc(1, sizeof(*conn))) == NULL)
ERROR("calloc");
// remember the server
conn->server = server;
// attempt connect
if (memcache_conn_connect(conn))
ERROR("failed to connect to server");
// success
return conn;
error:
free(conn);
return NULL;
}
int memcache_conn_connect (struct memcache_conn *conn) {
assert(conn->fd <= 0 && !conn->is_connected);
// begin connect
if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
goto error;
// fd 0 should be stdin...
assert(conn->fd > 0);
// set up the connect event
event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);
// add it
if (event_add(&conn->ev, NULL))
PERROR("event_add");
// success
return 0;
error:
if (conn->fd > 0) {
if (close(conn->fd))
PWARNING("close %d", conn->fd);
conn->fd = -1;
}
return -1;
}
int memcache_conn_is_available (struct memcache_conn *conn) {
return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
}
int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
assert(conn->fd > 0 && conn->is_connected);
assert(conn->req == NULL);
// store the req
conn->req = req;
// write the request header into our bufferevent's output buffer
if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
ERROR("failed to init the cmd");
// tell our bufferevent to send it
if (bufferevent_enable(conn->bev, EV_WRITE))
PERROR("bufferevent_enable");
// tell the req that it is underway
memcache_req_send(req);
// wait for that to complete
return 0;
error:
return -1;
}
/*
* Start writing out the request data
*/
void memcache_conn_send_req_data (struct memcache_conn *conn) {
// just fake a call to the event handler
_memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
}
/*
* Start reading a reply from the connection
*/
void memcache_conn_handle_reply (struct memcache_conn *conn) {
// ensure that we either didn't have a command, or it has been sent
assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);
// start reading on the bufferevent
if (bufferevent_enable(conn->bev, EV_READ))
PERROR("bufferevent_enable");
// ok, wait for the reply
return;
error:
// XXX: error handling
assert(0);
}
/*
* Start reading reply data from the connection
*/
void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
// XXX: implement
assert(0);
}
/*
* The connect() has finished
*/
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
struct memcache_conn *conn = arg;
int error;
if (socket_check_error(fd, &error))
goto error;
if (error)
ERROR("connect failed: %s", strerror(error));
// set up the bufferevent
if ((conn->bev = bufferevent_new(fd,
&_memcache_conn_bev_read,
&_memcache_conn_bev_write,
&_memcache_conn_bev_error,
conn
)) == NULL)
ERROR("bufferevent_new");
// mark us as succesfully connected
conn->is_connected = 1;
// notify the server
memcache_server_conn_ready(conn->server, conn);
// good
return;
error:
// notify the server
memcache_server_conn_dead(conn->server, conn);
}
/*
* The write buffer is empty, which means that we have written out a command header
*/
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
// the command header has been sent
assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
// does this request have some data to be included in the request?
if (conn->req->buf.data > 0) {
// we need to send the request data next
memcache_conn_send_req_data(conn);
} else {
// wait for a reply
memcache_conn_handle_reply(conn);
}
}
/*
* We have received some reply data, which should include the complete reply line at some point
*/
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
struct memcache_conn *conn = arg;
struct evbuffer *in_buf = bufferevent_get_input(bev);
struct memcache_key key;
char *header_data;
enum memcache_reply reply_type;
int has_data;
// ensure that we do indeed have some data
assert(evbuffer_get_length(in_buf) > 0);
// attempt to parse the response header
if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
ERROR("memcache_cmd_parse_header");
if (!header_data) {
// no complete header received yet
return;
}
// disable reads again
if (bufferevent_disable(bev, EV_READ))
PERROR("bufferevent_disable");
// does the reply include data?
if (has_data) {
// check that they key is the same
if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
ERROR("got reply with wrong key !?!");
// start reading the data (including whatever might be left over in the bufferevent buffer...)
// XXX: what if this triggers a req notify before we do?
memcache_conn_handle_reply_data(conn, in_buf);
} else {
// the request is done with
memcache_conn_req_done(conn);
}
// notify the request
memcache_req_reply(conn->req, reply_type);
error:
// XXX: error handling
return;
}
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
// XXX: error handling
assert(0);
}
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
struct memcache_conn *conn = arg;
struct memcache_buf *buf = &conn->req->buf;
int ret;
// correct event
assert(event == EV_WRITE);
// we do indeed have data to send
assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
// do the actual write()
if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
PERROR("write");
// should never be the case... ?
if (ret == 0)
ERROR("write returned 0 !?!");
// did we manage to write some data?
if (ret > 0) {
// update offset
buf->offset += ret;
}
// data left to write?
if (buf->offset < buf->len) {
// reschedule
if (event_add(&conn->ev, NULL))
PERROR("event_add");
} else {
// done! We can handle the reply now
memcache_conn_handle_reply(conn);
}
// success
return;
error:
// XXX: error handling
assert(0);
}
/*
* Detach the request
*/
static void memcache_conn_req_done (struct memcache_conn *conn) {
// ensure that we do currently have a req
assert(conn->req);
// have the req detach and check it did so
memcache_req_done(conn->req);
assert(conn->req->conn == NULL);
// we are now available again
conn->req = NULL;
}
void memcache_conn_free (struct memcache_conn *conn) {
// XXX: conn->req?
// ensure that the connection is not considered to be connected anymore
assert(!conn->is_connected);
// close the fd if needed
if (conn->fd > 0) {
if (close(conn->fd))
PWARNING("close");
conn->fd = 0;
}
// ensure that the event is not pending anymore
assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0);
// free it
free(conn);
}