memcache/connection.c
author Tero Marttila <terom@fixme.fi>
Sat, 30 Aug 2008 19:13:15 +0300
changeset 49 10c7dce1a043
parent 48 1c67f512779b
permissions -rw-r--r--
autogenerate the memcache_test help output, and pipeline memcache requests

#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>

#include "connection.h"
#include "memcache.h"
#include "command.h"
#include "request.h"
#include "../socket.h"
#include "../common.h"

void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint);
void memcache_conn_send_data (struct memcache_conn *conn);
void memcache_conn_send_end (struct memcache_conn *conn);
void memcache_conn_send_done (struct memcache_conn *conn);
void memcache_conn_recv_next (struct memcache_conn *conn);
void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf);
void memcache_conn_recv_done (struct memcache_conn *conn);
void memcache_conn_recv_end (struct memcache_conn *conn);

static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);
static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg);

static void memcache_conn_error (struct memcache_conn *conn);
static void memcache_conn_close (struct memcache_conn *conn);

struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    struct memcache_conn *conn = NULL;

    if ((conn = calloc(1, sizeof(*conn))) == NULL)
        ERROR("calloc");
    
    // remember the server
    conn->server = server;

    // init the req queue
    TAILQ_INIT(&conn->req_queue);

    // attempt connect
    if (memcache_conn_connect(conn))
        ERROR("failed to connect to server");
    
    // success
    return conn;

error:
    free(conn);

    return NULL;
}

int memcache_conn_connect (struct memcache_conn *conn) {
    assert(conn->fd <= 0 && !conn->is_connected);
    
    // begin connect
    if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
        goto error;

    // fd 0 should be stdin...
    assert(conn->fd > 0);

    // set up the connect event
    event_set(&conn->ev_connect, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);

    // add it
    if (event_add(&conn->ev_connect, NULL))
        PERROR("event_add");
    
    // success
    return 0;

error:
    if (conn->fd > 0) {
        if (close(conn->fd))
            PWARNING("close %d", conn->fd);
        
        conn->fd = -1;
    }
   
    return -1;
}

int memcache_conn_is_available (struct memcache_conn *conn) {
    return (conn->fd > 0 && conn->is_connected && (TAILQ_EMPTY(&conn->req_queue) || conn->server->mc->pipeline_requests));
}

void memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    assert(memcache_conn_is_available(conn));
    
    // XXX: validate req

    // stick the req into the req queue
    TAILQ_INSERT_TAIL(&conn->req_queue, req, reqqueue_node);
    
    // if send is idle...
    if (conn->send_req == NULL) 
        memcache_conn_send_next(conn, req);
}

/*
 * Send out the next request.
 *
 * If there is currently a send_req, it is considered as done.
 */
void memcache_conn_send_next (struct memcache_conn *conn, struct memcache_req *hint) {
    struct memcache_req *req;
    
    // req will be either
    //  * the next enqueued req after the one that was last written
    //  * hint, if no req was being written (the req that was just enqueued)
    if (conn->send_req) {
        assert(!hint);
    
        // the nex req
        req = TAILQ_NEXT(conn->send_req, reqqueue_node);

        // and reset the send_req to NULL...
        conn->send_req = NULL;

    } else {
        assert(hint && TAILQ_LAST(&conn->req_queue, memcache_reqqueue_head) == hint);
        
        // the given req
        req = hint;
    }

    // while we still have a request to process...
    while (req) {
        // try and write the request header into our bufferevent's output buffer
        if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), 
            memcache_req_cmd(req),
            memcache_req_key(req),
            memcache_req_obj(req)
        )) {
            WARNING("invalid request");
            memcache_req_error(req);

            // continue on to the next request
            req = TAILQ_NEXT(req, reqqueue_node);

            continue;
        }

        // send this one
        conn->send_req = req;

        // enable our bufferevent to send it
        if (bufferevent_enable(conn->bev, EV_WRITE))
            PERROR("bufferevent_enable");

        // tell the req that it is underway
        memcache_req_send(req);
        
        // done, we only want to process one
        break;
    }
    
    // done, we either replaced send_req, or consumed them all
    return;

error:
    memcache_conn_error(conn);
}

/*
 * Start writing out the request data
 */
void memcache_conn_send_data (struct memcache_conn *conn) {
    if (conn->send_req->obj.bytes > 0) {
        // set up the ev_write
        event_set(&conn->ev_write, conn->fd, EV_WRITE, &_memcache_conn_ev_write, conn);

        // just fake a call to the event handler
        _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);

    } else {
        // just send the \r\n
        memcache_conn_send_end(conn);
    }
}

/*
 * Write out the final \r\n to terminate the request data
 */
void memcache_conn_send_end (struct memcache_conn *conn) {
    // XXX: this will enable the bev by itself?
    if (bufferevent_write(conn->bev, "\r\n", 2))
        PERROR("bufferevent_write");
    
    // ok
    return;

error:
    memcache_conn_error(conn);
}

/*
 * Finished sending the current send_req
 */
void memcache_conn_send_done (struct memcache_conn *conn) {
    assert(conn->send_req != NULL);

    // send the next req, if there is one
    memcache_conn_send_next(conn, NULL);
    
    // if pipelining is on, it's a question of how many we've sent...
    if (conn->server->mc->pipeline_requests)
        memcache_server_conn_ready(conn->server, conn);
}

/*
 * Start reading a reply from the connection
 */
void memcache_conn_recv_next (struct memcache_conn *conn) {
    // start/continue reading on the bufferevent
    if (bufferevent_enable(conn->bev, EV_READ))
        PERROR("bufferevent_enable");

    // Note: we don't need to recurse into the callback ourselves in case there is data in it, since the read callback
    // will consume all available data iteratively.

    // ok, wait for the reply
    return;

error:
    memcache_conn_error(conn);
}

/*
 * Start reading reply data from the connection
 */
void memcache_conn_recv_data (struct memcache_conn *conn, struct evbuffer *buf) {
    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
    int ret;

    // check that the buf doesn't contain any data
    assert(req->buf.data == NULL);

    // bytes *may* be zero if we have an empty cache entry
    if (req->obj.bytes > 0) {
        // XXX: memcache_req_make_buffer?

        // allocate a buffer for the reply data
        if ((req->buf.data = malloc(req->obj.bytes)) == NULL)
            ERROR("malloc");
        
        // update the length
        req->buf.len = req->obj.bytes;

        // set offset to zero
        req->buf.offset = 0;
        
        // and note that it is present, and is ours
        req->have_buf = 1;
        req->is_buf_ours = 1;

        // do we have any data in the buf that we need to copy?
        if (evbuffer_get_length(buf) > 0) {
            // read the data into the memcache_buf
            ret = evbuffer_remove(buf, req->buf.data, req->buf.len);
            
            // sanity check...
            assert(ret > 0 && ret <= req->buf.len);

            // update offset
            req->buf.offset += ret;
        }

        // still need to receive more data?
        if (req->buf.offset < req->buf.len) {
        
            // disable the bufferevent while we read the data
            if (bufferevent_disable(conn->bev, EV_READ))
                PERROR("bufferevent_disable");

            // set up the ev_read
            event_set(&conn->ev_read, conn->fd, EV_READ, &_memcache_conn_ev_read, conn);

            // then receive what data is left to receive
            _memcache_conn_ev_read(conn->fd, EV_READ, conn);
            
            // wait for the data to arrive
            return;

        } else {
            // the buffer already contained the cache data, no need to read any more
            
        }
    } else {
        // there is no data to receive for this item, so we can ignore this
        
    }
    
    // finish it off
    memcache_conn_recv_end(conn);

    // ok
    return;

error:
    memcache_conn_error(conn);
}

/*
 * Receive the final bits of data following the reply data block
 */ 
void memcache_conn_recv_end (struct memcache_conn *conn) {
    // we still need to receive the MEMCACHE_RPL_END. We kind of "recurse" to handle this, that is, we activate the
    // bufferevent for EV_READ again, use memcache_cmd_parse_header to parse the data (it will skip the "empty line"
    // after the data and then return the MEMCACHE_RPL_END line). This will then have has_data=0, which will cause
    // recv_done to be called.
    // Elegant!
    memcache_conn_recv_next(conn);
}

/*
 * We have finished receiving our current request
 */
void memcache_conn_recv_done (struct memcache_conn *conn) {
    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
    
    // we can remove it from the list now
    TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);

    // have the req detach
    memcache_req_done(req);

    // and prepare to recv the next one
    memcache_conn_recv_next(conn);
    
    // if pipelining is off, it's a question of when we've recevied the reply...
    if (!conn->server->mc->pipeline_requests)
        memcache_server_conn_ready(conn->server, conn);
}

/*
 * The connect() has finished
 */
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
    struct memcache_conn *conn = arg;
    int error;

    if (socket_check_error(fd, &error))
        goto error;

    if (error)
        ERROR("connect failed: %s", strerror(error));

    // set up the bufferevent
    if ((conn->bev = bufferevent_new(fd, 
        &_memcache_conn_bev_read,
        &_memcache_conn_bev_write,
        &_memcache_conn_bev_error,
        conn
    )) == NULL)
        ERROR("bufferevent_new");

    // mark us as succesfully connected
    conn->is_connected = 1;
    
    // and prepare to recv any response headers
    memcache_conn_recv_next(conn);

    // notify the server
    memcache_server_conn_ready(conn->server, conn);
    
    // good
    return;

error:
    memcache_conn_error(conn);
}

/*
 * The write buffer is empty, which means that we have written out a command header
 */
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
    struct memcache_conn *conn = arg;

    assert(conn->send_req != NULL);

    // the command header has been sent
    assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
    
    // does this request have some data to be included in the request?
    // if the data has already been sent (we handle the final \r\n as well), then skip this.
    if (conn->send_req->have_buf && conn->send_req->buf.offset == 0) {
        // we need to send the request data next
        memcache_conn_send_data(conn);

    } else {
        // the request has now been sent, and se can send the next one
        memcache_conn_send_done(conn);
    }
}

/*
 * We have received some reply data, which should include the complete reply line at some point
 */
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
    struct memcache_conn *conn = arg;
    struct evbuffer *in_buf = bufferevent_get_input(bev);
    struct memcache_req *req;
    struct memcache_key key;
    char *header_data;
    enum memcache_reply reply_type;
    int has_data;
    
    // ensure that we do indeed have some data
    assert(evbuffer_get_length(in_buf) > 0);
    
    // consume as much data as possible
    do {
        // the req we are processing
        req = TAILQ_FIRST(&conn->req_queue);

        // attempt to parse the response header
        if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &req->obj, &has_data))
            ERROR("memcache_cmd_parse_header");
        
        if (!header_data) {
            // no complete header received yet
            return;
        }

        // no request waiting?
        if (req == NULL)
            ERROR("got a response without any request pending: %s", header_data);

        // if the reply contains data, check that they key is the same
        if (has_data && (key.len != req->key.len || memcmp(key.buf, req->key.buf, key.len) != 0))
            ERROR("got reply with wrong key !?! '%.*s' vs. '%.*s'", (int) key.len, key.buf, (int) req->key.len, req->key.buf);

        // check it's a FETCH request
        if (has_data && req->cmd_type != MEMCACHE_CMD_FETCH_GET)
            ERROR("a data reply for a non-CMD_FETCH_* command !?!");

        // notify the request (no reply data is ready for reading yet, though)
        memcache_req_recv(req, reply_type);
        
        // does the reply include data?
        if (has_data) {
            // start reading the data (including whatever might be left over in the bufferevent buffer...)
            memcache_conn_recv_data(conn, in_buf);

        } else {
            // the request is done with
            memcache_conn_recv_done(conn);
        }

        // free the header data, but not a second time on error exit
        free(header_data); header_data = NULL;
    
    } while (evbuffer_get_length(in_buf) > 0);
    
    // done
    return;
   
error:
    // free the header data read from the buf
    free(header_data);

    memcache_conn_error(conn);
}


static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
    struct memcache_conn *conn = arg;

    // fail the entire connection
    memcache_conn_error(conn);
}

static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
    struct memcache_conn *conn = arg;
    struct memcache_buf *buf = &conn->send_req->buf;
    int ret;

    // correct event
    assert(event == EV_WRITE);

    // we do indeed have data to send
    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
    
    // do the actual write()
    if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
        PERROR("write");

    // should never be the case... ?
    if (ret == 0)
        ERROR("write returned EOF !?!");
    
    // did we manage to write some data?
    if (ret > 0) {
        // update offset
        buf->offset += ret;
    }

    // data left to write?
    if (buf->offset < buf->len) {
        // reschedule
        if (event_add(&conn->ev_write, NULL))
            PERROR("event_add");

    } else {
        // done! Send the terminating \r\n next
        memcache_conn_send_end(conn);
    }

    // success
    return;

error:
    // fail the entire connection
    memcache_conn_error(conn);
}

static void _memcache_conn_ev_read (evutil_socket_t fd, short event, void *arg) {
    struct memcache_conn *conn = arg;
    struct memcache_req *req = TAILQ_FIRST(&conn->req_queue);
    struct memcache_buf *buf = &req->buf;
    int ret;

    // correct event
    assert(event == EV_READ);

    // we do indeed expect to receive data
    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
    
    // do the actual read()
    if ((ret = read(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
        PERROR("read");

    // should never be the case... ?
    if (ret == 0)
        ERROR("read returned EOF !?!");
    
    // did we manage to read some data?
    if (ret > 0) {
        // update offset
        buf->offset += ret;
    }
    
    // only notify the req if new data was received, and we won't be calling req_done next.
    if (ret > 0 && buf->offset < buf->len) {
        // notify the req
        memcache_req_data(req);
    }

    // data left to read?
    if (buf->offset < buf->len) {
        // reschedule
        if (event_add(&conn->ev_read, NULL))
            PERROR("event_add");


    } else {
        // done! We can let the bufferenvet handle the rest of the reply now
        memcache_conn_recv_end(conn);
    }

    // success
    return;

error:
    // fail the entire connection
    memcache_conn_error(conn);
}

/*
 * The entire connection failed
 */
static void memcache_conn_error (struct memcache_conn *conn) {
    struct memcache_req *req;

    // fail all requests, if we have any
    TAILQ_FOREACH (req, &conn->req_queue, reqqueue_node) {
        // error out the req
        memcache_req_error(req);

        TAILQ_REMOVE(&conn->req_queue, req, reqqueue_node);
    }

    conn->send_req = NULL;
    
    // close the connection
    memcache_conn_close(conn);

    // tell the server we failed
    memcache_server_conn_dead(conn->server, conn);
}

void memcache_conn_close (struct memcache_conn *conn) {
    // close the fd if needed
    if (conn->fd > 0) {
        if (close(conn->fd))
            PWARNING("close");

        conn->fd = 0;
    }
    
    // ensure that the events are not pending anymore
    assert(event_pending(&conn->ev_connect, EV_WRITE|EV_TIMEOUT, NULL) == 0);
    assert(event_pending(&conn->ev_read, EV_READ|EV_TIMEOUT, NULL) == 0);
    assert(event_pending(&conn->ev_write, EV_WRITE|EV_TIMEOUT, NULL) == 0);
    
    // free the bufferevent
    if (conn->bev) {
        bufferevent_free(conn->bev);

        conn->bev = NULL;
    }

    // not connected anymore
    conn->is_connected = 0;
}

void memcache_conn_free (struct memcache_conn *conn) {
    // ensure we don't have any reqs bound to us
    assert(TAILQ_EMPTY(&conn->req_queue));
    
    // ensure that the connection is not considered to be connected anymore
    assert(!conn->is_connected);
    
    // free it
    free(conn);
}