#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sys/un.h> /* XXX: for SUN_LEN */
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/bufferevent.h>
#include "render_remote.h"
#include "common.h"
#include "render_struct.h"
#include "render_net.h"
#include "socket.h"
struct render_remote {
// the socket fd
int sock;
// the event/bufferevent
struct event ev_connect, ev_data;
struct bufferevent *bev_data;
// the command
struct render_cmd render_cmd;
// have we sent the command yet?
int cmd_sent;
// have we received the EOF?
int have_eof;
// has cb_done/cb_fail/cancel already been called?
int alive;
void (*cb_sent)(void *arg);
void (*cb_data)(struct evbuffer *buf, void *arg);
void (*cb_done)(void *arg);
void (*cb_fail)(void *arg);
void *cb_arg;
};
// internal prototypes
static void _render_remote_free (struct render_remote *ctx);
static void _render_remote_do_data (struct render_remote *ctx);
static void _render_remote_do_done (struct render_remote *ctx);
static void _render_remote_do_fail (struct render_remote *ctx);
static void _render_remote_free (struct render_remote *ctx) {
assert(ctx && !ctx->alive);
// free the bev_data
if (ctx->bev_data)
bufferevent_free(ctx->bev_data);
// and the events
if (event_pending(&ctx->ev_connect, EV_WRITE, NULL))
event_del(&ctx->ev_connect);
if (event_pending(&ctx->ev_data, EV_READ, NULL))
event_del(&ctx->ev_data);
// close the socket (ctx->ev_connect remains valid even after we're done with it...)
if (ctx->sock)
close(ctx->sock);
// free the context structure
free(ctx);
}
static void _render_remote_do_data (struct render_remote *ctx) {
// if there's data in the buffer, call cb_data
if (evbuffer_get_length(bufferevent_get_input(ctx->bev_data))) {
ctx->cb_data(EVBUFFER_INPUT(ctx->bev_data), ctx->cb_arg);
}
// if we got EOF on the connection and there's no data left in the buffer, call cb_done
if (ctx->have_eof && evbuffer_get_length(bufferevent_get_input(ctx->bev_data)) == 0) {
_render_remote_do_done(ctx);
}
}
static void _render_remote_do_done (struct render_remote *ctx) {
assert(ctx->alive);
ctx->alive = 0;
ctx->cb_done(ctx->cb_arg);
}
static void _render_remote_do_fail (struct render_remote *ctx) {
assert(ctx->alive);
ctx->alive = 0;
ctx->cb_fail(ctx->cb_arg);
}
static void _remote_write (struct bufferevent *bev, void *arg) {
struct render_remote *ctx = arg;
if (!ctx->cmd_sent) {
// write the render command
if (bufferevent_write(ctx->bev_data, &ctx->render_cmd, sizeof(ctx->render_cmd)))
ERROR("bufferevent_write");
// wait for it to be written out (we get called a second time)
ctx->cmd_sent = 1;
} else {
// the write buffer was drained, so the render command was write():n
assert(ctx->cb_sent);
ctx->cb_sent(ctx->cb_arg);
ctx->cb_sent = NULL;
// we don't care about EV_WRITE anymore
if (bufferevent_disable(ctx->bev_data, EV_WRITE))
ERROR("bufferevent_disable");
// are we buffered or raw?
if (ctx->cb_data) {
// start receiving data into our buffer
if (bufferevent_enable(ctx->bev_data, EV_READ))
ERROR("bufferevent_enable");
} else {
assert(event_initialized(&ctx->ev_data));
// enable the raw read event
if (event_add(&ctx->ev_data, NULL))
ERROR("event_add");
}
}
return;
error:
_render_remote_do_fail(ctx);
}
static void _remote_read (struct bufferevent *bev, void *arg) {
struct render_remote *ctx = arg;
_render_remote_do_data(ctx);
}
static void _remote_error (struct bufferevent *bev, short what, void *arg) {
struct render_remote *ctx = arg;
// OH NOES; WHAT DO WE DO!?
if (what & EVBUFFER_EOF) {
// great!
ctx->have_eof = 1;
// flush any remaining data/call cb_send as needed
_render_remote_do_data(ctx);
return;
} else if (what & EVBUFFER_ERROR) {
// crap.
PWARNING("EVBUFFER_ERROR");
} else if (what & EVBUFFER_TIMEOUT) {
// ah well
WARNING("render_remote: timeout");
} else {
FATAL("weird bufferevent error code: 0x%02X", what);
}
// cb_fail + free
_render_remote_do_fail(ctx);
}
/*
* Do the initial IO-agnostic work to initialize the rendering process
*/
static struct render_remote *_render_remote_init (struct render *render, struct remote_pool *pool_info) {
struct render_remote *ctx;
struct remote_node *node_info;
// get a node from the pool
if (!(node_info = remote_pool_get(pool_info)))
ERROR("remote_pool_get");
// alloc the remote render ctx
if (!(ctx = calloc(1, sizeof(*ctx))))
ERROR("calloc");
// copy the relevant stuff from the render_ctx
ctx->render_cmd.mode = render->mode;
ctx->render_cmd.img_w = htonl(render->img_w);
ctx->render_cmd.img_h = htonl(render->img_h);
ctx->render_cmd.x1 = render->x1;
ctx->render_cmd.y1 = render->y1;
ctx->render_cmd.x2 = render->x2;
ctx->render_cmd.y2 = render->y2;
// create the socket
if ((ctx->sock = socket_connect_async(&node_info->endpoint, SOCK_STREAM)) == -1)
goto error;
// return the raw ctx
return ctx;
error:
_render_remote_free(ctx);
return NULL;
}
/*
* Raw unbuffered I/O mode
*/
struct render_remote *render_remote_rawio (
struct render *render,
struct remote_pool *pool_info,
void (*cb_sent)(void *arg),
void (*cb_fail)(void *arg),
void (*cb_io_data)(evutil_socket_t, short, void*),
void *cb_arg
) {
struct render_remote *ctx;
// short-circuit error handling
if (!(ctx = _render_remote_init(render, pool_info)))
return NULL;
// store the provided callback functions
ctx->cb_sent = cb_sent;
ctx->cb_fail = cb_fail;
ctx->cb_arg = cb_arg;
// set up the custom EV_READ callback
event_set(&ctx->ev_data, ctx->sock, EV_READ, cb_io_data, cb_arg);
// set up the write bufferevent
if ((ctx->bev_data = bufferevent_new(ctx->sock, NULL, &_remote_write, &_remote_error, ctx)) == NULL)
ERROR("bufferevent_new");
// wait for it to connect
if (bufferevent_enable(ctx->bev_data, EV_WRITE))
ERROR("bufferevent_enable");
// we are now alive
ctx->alive = 1;
// success
return ctx;
error:
_render_remote_free(ctx);
return NULL;
}
/*
* Old buffered mode
*/
struct render_remote *render_remote (
struct render *render,
struct remote_pool *pool_info,
void (*cb_sent)(void *arg),
void (*cb_data)(struct evbuffer *buf, void *arg),
void (*cb_done)(void *arg),
void (*cb_fail)(void *arg),
void *cb_arg
) {
struct render_remote *ctx;
// short-circuit error handling
if (!(ctx = _render_remote_init(render, pool_info)))
return NULL;
// store the provided callback functions
ctx->cb_sent = cb_sent;
ctx->cb_data = cb_data;
ctx->cb_done = cb_done;
ctx->cb_fail = cb_fail;
ctx->cb_arg = cb_arg;
// set up the read/write bufferevent
if ((ctx->bev_data = bufferevent_new(ctx->sock, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL)
ERROR("bufferevent_new");
// wait for it to connect
if (bufferevent_enable(ctx->bev_data, EV_WRITE))
ERROR("bufferevent_enable");
// we are now alive
ctx->alive = 1;
// success
return ctx;
error:
_render_remote_free(ctx);
return NULL;
}
void render_remote_set_recv (struct render_remote *ctx, size_t recv_threshold, size_t unread_buffer) {
assert(ctx->bev_data);
bufferevent_setwatermark(ctx->bev_data, EV_READ, recv_threshold, recv_threshold + unread_buffer);
}
void render_remote_flush (struct render_remote *ctx) {
assert(ctx->bev_data);
// call cb_data/cb_done as appropriate
_render_remote_do_data(ctx);
}
int render_remote_reschedule (struct render_remote *ctx) {
assert(event_initialized(&ctx->ev_data));
// just reschedule it
if (event_add(&ctx->ev_data, NULL))
ERROR("event_add");
// ok
return 0;
error:
return -1;
}
void render_remote_cancel (struct render_remote *ctx) {
// we must be alive for this..
assert(ctx->alive);
ctx->alive = 0;
_render_remote_free(ctx);
}
void render_remote_done (struct render_remote *ctx) {
// we must be alive and non-buffered for this..
assert(ctx->alive && event_initialized(&ctx->ev_data));
ctx->alive = 0;
_render_remote_free(ctx);
}
void render_remote_free (struct render_remote *ctx) {
// must be dead already
assert(!ctx->alive);
_render_remote_free(ctx);
}