memcache/connection.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 22:42:27 +0300
changeset 42 0e503189af2f
parent 41 540737bf6bac
child 43 e5b714190dee
permissions -rw-r--r--
more reply-receiving code, but still incomplete

#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>

#include "connection.h"
#include "command.h"
#include "request.h"
#include "../socket.h"
#include "../common.h"

static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg);
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg);
static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg);
static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg);

static void memcache_conn_req_done (struct memcache_conn *conn);

struct memcache_conn *memcache_conn_open (struct memcache_server *server) {
    struct memcache_conn *conn = NULL;

    if ((conn = calloc(1, sizeof(*conn))) == NULL)
        ERROR("calloc");
    
    // remember the server
    conn->server = server;

    // attempt connect
    if (memcache_conn_connect(conn))
        ERROR("failed to connect to server");
    
    // success
    return conn;

error:
    free(conn);

    return NULL;
}

int memcache_conn_connect (struct memcache_conn *conn) {
    assert(conn->fd <= 0 && !conn->is_connected);
    
    // begin connect
    if ((conn->fd = socket_connect_async(conn->server->endpoint, SOCK_STREAM)) == -1)
        goto error;

    // fd 0 should be stdin...
    assert(conn->fd > 0);

    // set up the connect event
    event_set(&conn->ev, conn->fd, EV_WRITE, &_memcache_conn_ev_connect, conn);

    // add it
    if (event_add(&conn->ev, NULL))
        PERROR("event_add");
    
    // success
    return 0;

error:
    if (conn->fd > 0) {
        if (close(conn->fd))
            PWARNING("close %d", conn->fd);
        
        conn->fd = -1;
    }
   
    return -1;
}

int memcache_conn_is_available (struct memcache_conn *conn) {
    return (conn->fd > 0 && conn->is_connected && conn->req == NULL);
}

int memcache_conn_do_req (struct memcache_conn *conn, struct memcache_req *req) {
    assert(conn->fd > 0 && conn->is_connected);
    assert(conn->req == NULL);

    // store the req
    conn->req = req;

    // write the request header into our bufferevent's output buffer
    if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj))
        ERROR("failed to init the cmd");
    
    // tell our bufferevent to send it
    if (bufferevent_enable(conn->bev, EV_WRITE))
        PERROR("bufferevent_enable");
    
    // tell the req that it is underway
    memcache_req_send(req);

    // wait for that to complete
    return 0;

error:
    return -1;
}

/*
 * Start writing out the request data
 */
void memcache_conn_send_req_data (struct memcache_conn *conn) {
    // just fake a call to the event handler
    _memcache_conn_ev_write(conn->fd, EV_WRITE, conn);
}

/*
 * Start reading a reply from the connection
 */
void memcache_conn_handle_reply (struct memcache_conn *conn) {
    // ensure that we either didn't have a command, or it has been sent
    assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len);

    // start reading on the bufferevent
    if (bufferevent_enable(conn->bev, EV_READ))
        PERROR("bufferevent_enable");

    // ok, wait for the reply
    return;

error:
    // XXX: error handling
    assert(0);
}

/*
 * Start reading reply data from the connection
 */
void memcache_conn_handle_reply_data (struct memcache_conn *conn, struct evbuffer *buf) {
    // XXX: implement
    assert(0);
}

/*
 * The connect() has finished
 */
static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg) {
    struct memcache_conn *conn = arg;
    int error;

    if (socket_check_error(fd, &error))
        goto error;

    if (error)
        ERROR("connect failed: %s", strerror(error));

    // set up the bufferevent
    if ((conn->bev = bufferevent_new(fd, 
        &_memcache_conn_bev_read,
        &_memcache_conn_bev_write,
        &_memcache_conn_bev_error,
        conn
    )) == NULL)
        ERROR("bufferevent_new");

    // mark us as succesfully connected
    conn->is_connected = 1;

    // notify the server
    memcache_server_conn_ready(conn->server, conn);
    
    // good
    return;

error:
    // notify the server
    memcache_server_conn_dead(conn->server, conn);
}

/*
 * The write buffer is empty, which means that we have written out a command header
 */
static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) {
    struct memcache_conn *conn = arg;

    // the command header has been sent
    assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0);
    
    // does this request have some data to be included in the request?
    if (conn->req->buf.data > 0) {
        // we need to send the request data next
        memcache_conn_send_req_data(conn);

    } else {
        // wait for a reply
        memcache_conn_handle_reply(conn);
    }
}

/*
 * We have received some reply data, which should include the complete reply line at some point
 */
static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) {
    struct memcache_conn *conn = arg;
    struct evbuffer *in_buf = bufferevent_get_input(bev);
    struct memcache_key key;
    char *header_data;
    enum memcache_reply reply_type;
    int has_data;
    
    // ensure that we do indeed have some data
    assert(evbuffer_get_length(in_buf) > 0);

    // attempt to parse the response header
    if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &key, &conn->req->obj, &has_data))
        ERROR("memcache_cmd_parse_header");
    
    if (!header_data) {
        // no complete header received yet
        return;
    }

    // disable reads again
    if (bufferevent_disable(bev, EV_READ))
        PERROR("bufferevent_disable");
    
    // does the reply include data?
    if (has_data) {
        // check that they key is the same
        if (key.len != conn->req->key.len || memcmp(key.buf, conn->req->key.buf, key.len) != 0)
            ERROR("got reply with wrong key !?!");

        // start reading the data (including whatever might be left over in the bufferevent buffer...)
        // XXX: what if this triggers a req notify before we do?
        memcache_conn_handle_reply_data(conn, in_buf);

    } else {
        // the request is done with
        memcache_conn_req_done(conn);
    }
    
    // notify the request
    memcache_req_reply(conn->req, reply_type);

error:
    // XXX: error handling
    return;
}


static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) {
    // XXX: error handling
    assert(0);
}

static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) {
    struct memcache_conn *conn = arg;
    struct memcache_buf *buf = &conn->req->buf;
    int ret;

    // correct event
    assert(event == EV_WRITE);

    // we do indeed have data to send
    assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len);
    
    // do the actual write()
    if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN)
        PERROR("write");

    // should never be the case... ?
    if (ret == 0)
        ERROR("write returned 0 !?!");
    
    // did we manage to write some data?
    if (ret > 0) {
        // update offset
        buf->offset += ret;
    }

    // data left to write?
    if (buf->offset < buf->len) {
        // reschedule
        if (event_add(&conn->ev, NULL))
            PERROR("event_add");

    } else {
        // done! We can handle the reply now
        memcache_conn_handle_reply(conn);
    }

    // success
    return;

error:
    // XXX: error handling
    assert(0);
}

/*
 * Detach the request
 */
static void memcache_conn_req_done (struct memcache_conn *conn) {
    // ensure that we do currently have a req
    assert(conn->req);
    
    // have the req detach and check it did so
    memcache_req_done(conn->req);
    assert(conn->req->conn == NULL);
    
    // we are now available again
    conn->req = NULL;
}

void memcache_conn_free (struct memcache_conn *conn) {
    // XXX: conn->req?

    // ensure that the connection is not considered to be connected anymore
    assert(!conn->is_connected);
    
    // close the fd if needed
    if (conn->fd > 0) {
        if (close(conn->fd))
            PWARNING("close");

        conn->fd = 0;
    }
    
    // ensure that the event is not pending anymore
    assert(event_pending(&conn->ev, EV_READ|EV_WRITE|EV_TIMEOUT, NULL) == 0);
    
    // free it
    free(conn);
}