render_remote.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 21:30:32 +0300
changeset 41 540737bf6bac
parent 26 6d615203d963
permissions -rw-r--r--
sending requests, and partial support for receiving -- incomplete, not tested
#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>

#include <sys/un.h> /* XXX: for SUN_LEN */

#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/bufferevent.h>

#include "render_remote.h"
#include "common.h"
#include "render_struct.h"
#include "render_net.h"
#include "socket.h"

struct render_remote {
    // the socket fd
    int sock;

    // the event/bufferevent
    struct event ev_connect, ev_data;
    struct bufferevent *bev_data;

    // the command
    struct render_cmd render_cmd;
    
    // have we sent the command yet?
    int cmd_sent;

    // have we received the EOF?
    int have_eof;
    
    // has cb_done/cb_fail/cancel already been called?
    int alive;

    void (*cb_sent)(void *arg);
    void (*cb_data)(struct evbuffer *buf, void *arg);
    void (*cb_done)(void *arg);
    void (*cb_fail)(void *arg);

    void *cb_arg;
};

// internal prototypes
static void _render_remote_free (struct render_remote *ctx);
static void _render_remote_do_data (struct render_remote *ctx);
static void _render_remote_do_done (struct render_remote *ctx);
static void _render_remote_do_fail (struct render_remote *ctx);

static void _render_remote_free (struct render_remote *ctx) {
    assert(ctx && !ctx->alive);

    // free the bev_data
    if (ctx->bev_data)
        bufferevent_free(ctx->bev_data);
    
    // and the events
    if (event_pending(&ctx->ev_connect, EV_WRITE, NULL))
        event_del(&ctx->ev_connect);
    
    if (event_pending(&ctx->ev_data, EV_READ, NULL))
        event_del(&ctx->ev_data);
        
    // close the socket (ctx->ev_connect remains valid even after we're done with it...)
    if (ctx->sock)
        close(ctx->sock);
    
    // free the context structure
    free(ctx);
}

static void _render_remote_do_data (struct render_remote *ctx) {
    // if there's data in the buffer, call cb_data
    if (evbuffer_get_length(bufferevent_get_input(ctx->bev_data))) {
        ctx->cb_data(EVBUFFER_INPUT(ctx->bev_data), ctx->cb_arg);
    }
    
    // if we got EOF on the connection and there's no data left in the buffer, call cb_done
    if (ctx->have_eof && evbuffer_get_length(bufferevent_get_input(ctx->bev_data)) == 0) {
        _render_remote_do_done(ctx);
    }
}

static void _render_remote_do_done (struct render_remote *ctx) {
    assert(ctx->alive);
    
    ctx->alive = 0;

    ctx->cb_done(ctx->cb_arg);
}

static void _render_remote_do_fail (struct render_remote *ctx) {
    assert(ctx->alive);
    
    ctx->alive = 0;

    ctx->cb_fail(ctx->cb_arg);
}

static void _remote_write (struct bufferevent *bev, void *arg) {
    struct render_remote *ctx = arg;
    
    if (!ctx->cmd_sent) {
        // write the render command
        if (bufferevent_write(ctx->bev_data, &ctx->render_cmd, sizeof(ctx->render_cmd)))
            ERROR("bufferevent_write");

        // wait for it to be written out (we get called a second time)
        ctx->cmd_sent = 1;

    } else {
        // the write buffer was drained, so the render command was write():n
        assert(ctx->cb_sent);
        ctx->cb_sent(ctx->cb_arg);
        ctx->cb_sent = NULL;
        
        // we don't care about EV_WRITE anymore
        if (bufferevent_disable(ctx->bev_data, EV_WRITE))
            ERROR("bufferevent_disable");
        
        // are we buffered or raw?
        if (ctx->cb_data) {
            // start receiving data into our buffer
            if (bufferevent_enable(ctx->bev_data, EV_READ))
                ERROR("bufferevent_enable");

        } else {
            assert(event_initialized(&ctx->ev_data));
            
            // enable the raw read event
            if (event_add(&ctx->ev_data, NULL))
                ERROR("event_add");
        }
    }

    return;

error:
    _render_remote_do_fail(ctx);
}

static void _remote_read (struct bufferevent *bev, void *arg) {
    struct render_remote *ctx = arg;

    _render_remote_do_data(ctx);
}

static void _remote_error (struct bufferevent *bev, short what, void *arg) {
    struct render_remote *ctx = arg;

    // OH NOES; WHAT DO WE DO!?
    
    if (what & EVBUFFER_EOF) {
        // great!
        ctx->have_eof = 1;
        
        // flush any remaining data/call cb_send as needed
        _render_remote_do_data(ctx);

        return;

    } else if (what & EVBUFFER_ERROR) {
        // crap.
        PWARNING("EVBUFFER_ERROR");

    } else if (what & EVBUFFER_TIMEOUT) {
        // ah well
        WARNING("render_remote: timeout");

    } else {
        FATAL("weird bufferevent error code: 0x%02X", what);
    }
    
    // cb_fail + free
    _render_remote_do_fail(ctx);
}

/*
 * Do the initial IO-agnostic work to initialize the rendering process
 */
static struct render_remote *_render_remote_init (struct render *render, struct remote_pool *pool_info) {
    struct render_remote *ctx;
    struct remote_node *node_info;

    // get a node from the pool
    if (!(node_info = remote_pool_get(pool_info)))
        ERROR("remote_pool_get");

    // alloc the remote render ctx
    if (!(ctx = calloc(1, sizeof(*ctx))))
        ERROR("calloc");
        
    // copy the relevant stuff from the render_ctx
    ctx->render_cmd.mode = render->mode;
    ctx->render_cmd.img_w = htonl(render->img_w);
    ctx->render_cmd.img_h = htonl(render->img_h);
    ctx->render_cmd.x1 = render->x1;
    ctx->render_cmd.y1 = render->y1;
    ctx->render_cmd.x2 = render->x2;
    ctx->render_cmd.y2 = render->y2;

    // create the socket
    if ((ctx->sock = socket_connect_async(&node_info->endpoint, SOCK_STREAM)) == -1)
        goto error;

    // return the raw ctx
    return ctx;

error:
    _render_remote_free(ctx);
    return NULL;
}

/*
 * Raw unbuffered I/O mode
 */
struct render_remote *render_remote_rawio (
        struct render *render,
        struct remote_pool *pool_info,
        void (*cb_sent)(void *arg),
        void (*cb_fail)(void *arg),
        void (*cb_io_data)(evutil_socket_t, short, void*),
        void *cb_arg
) {
    struct render_remote *ctx;
    
    // short-circuit error handling
    if (!(ctx = _render_remote_init(render, pool_info)))
        return NULL;

    // store the provided callback functions
    ctx->cb_sent = cb_sent;
    ctx->cb_fail = cb_fail;
    ctx->cb_arg = cb_arg;
 
    // set up the custom EV_READ callback
    event_set(&ctx->ev_data, ctx->sock, EV_READ, cb_io_data, cb_arg);
   
    // set up the write bufferevent
    if ((ctx->bev_data = bufferevent_new(ctx->sock, NULL, &_remote_write, &_remote_error, ctx)) == NULL)
        ERROR("bufferevent_new");

    // wait for it to connect
    if (bufferevent_enable(ctx->bev_data, EV_WRITE))
        ERROR("bufferevent_enable");

    // we are now alive
    ctx->alive = 1;

    // success
    return ctx;

error:
    _render_remote_free(ctx);
    return NULL;
}   

/*
 * Old buffered mode
 */
struct render_remote *render_remote (
        struct render *render,
        struct remote_pool *pool_info,
        void (*cb_sent)(void *arg),
        void (*cb_data)(struct evbuffer *buf, void *arg),
        void (*cb_done)(void *arg),
        void (*cb_fail)(void *arg),
        void *cb_arg
) {    
    struct render_remote *ctx;
    
    // short-circuit error handling
    if (!(ctx = _render_remote_init(render, pool_info)))
        return NULL;
    
    // store the provided callback functions
    ctx->cb_sent = cb_sent;
    ctx->cb_data = cb_data;
    ctx->cb_done = cb_done;
    ctx->cb_fail = cb_fail;
    ctx->cb_arg = cb_arg;

    // set up the read/write bufferevent
    if ((ctx->bev_data = bufferevent_new(ctx->sock, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL)
        ERROR("bufferevent_new");

    // wait for it to connect
    if (bufferevent_enable(ctx->bev_data, EV_WRITE))
        ERROR("bufferevent_enable");

    // we are now alive
    ctx->alive = 1;

    // success
    return ctx;

error:
    _render_remote_free(ctx);
    return NULL;
}

void render_remote_set_recv (struct render_remote *ctx, size_t recv_threshold, size_t unread_buffer) {
    assert(ctx->bev_data);

    bufferevent_setwatermark(ctx->bev_data, EV_READ, recv_threshold, recv_threshold + unread_buffer);
}

void render_remote_flush (struct render_remote *ctx) {
    assert(ctx->bev_data);

    // call cb_data/cb_done as appropriate
    _render_remote_do_data(ctx);
}

int render_remote_reschedule (struct render_remote *ctx) {
    assert(event_initialized(&ctx->ev_data));
    
    // just reschedule it
    if (event_add(&ctx->ev_data, NULL))
        ERROR("event_add");

    // ok
    return 0;

error:
    return -1;
}

void render_remote_cancel (struct render_remote *ctx) {
    // we must be alive for this..
    assert(ctx->alive);

    ctx->alive = 0;

    _render_remote_free(ctx);
}

void render_remote_done (struct render_remote *ctx) {
    // we must be alive and non-buffered for this..
    assert(ctx->alive && event_initialized(&ctx->ev_data));

    ctx->alive = 0;

    _render_remote_free(ctx);
}

void render_remote_free (struct render_remote *ctx) {
    // must be dead already
    assert(!ctx->alive);
    
    _render_remote_free(ctx);
}