#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#include "render_internal.h" // for render_cmd_build
#include "render_remote.h"
#include "common.h"
struct remote_render_ctx {
struct event *ev_conn;
struct bufferevent *data_bev;
#pragma pack(push)
#pragma pack(1)
struct {
u_int8_t mode;
u_int32_t img_w;
u_int32_t img_h;
double x1;
double y1;
double x2;
double y2;
} render_cmd;
#pragma pack(pop)
// has cb_done/cb_fail already been called?
int alive;
void (*cb_sent)(void *arg);
void (*cb_data)(struct evbuffer *buf, void *arg);
void (*cb_done)(void *arg);
void (*cb_fail)(void *arg);
void *cb_arg;
};
static void _remote_render_free (struct remote_render_ctx *ctx) {
// free the data_bev
if (ctx->data_bev) {
bufferevent_free(ctx->data_bev);
ctx->data_bev = NULL;
}
// close the socket (ctx->ev_conn remains valid even after we're done with it...)
close(event_get_fd(ctx->ev_conn));
// and the event
event_free(ctx->ev_conn);
// free the context structure
free(ctx);
}
static void _remote_render_done (struct remote_render_ctx *ctx) {
assert(ctx->alive);
ctx->alive = 0;
ctx->cb_done(ctx->cb_arg);
_remote_render_free(ctx);
}
static void _remote_render_fail (struct remote_render_ctx *ctx) {
assert(ctx->alive);
ctx->alive = 0;
ctx->cb_fail(ctx->cb_arg);
_remote_render_free(ctx);
}
static void _remote_write (struct bufferevent *bev, void *arg) {
struct remote_render_ctx *ctx = arg;
// the write buffer was drained, so the render command was sent
ctx->cb_sent(ctx->cb_arg);
// we don't care about EV_WRITE anymore
if (bufferevent_disable(ctx->data_bev, EV_WRITE))
ERROR("bufferevent_disable");
// start receiving data
if (bufferevent_enable(ctx->data_bev, EV_READ))
ERROR("bufferevent_enable");
return;
error:
_remote_render_fail(ctx);
}
static void _remote_read (struct bufferevent *bev, void *arg) {
struct remote_render_ctx *ctx = arg;
// pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly
ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
}
static void _remote_error (struct bufferevent *bev, short what, void *arg) {
struct remote_render_ctx *ctx = arg;
// OH NOES; WHAT DO WE DO!?
if (what & EVBUFFER_EOF) {
// great!
// send any remaining-chunk data
if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0)
ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
// signal completion
_remote_render_done(ctx);
return;
} else if (what & EVBUFFER_ERROR) {
// crap.
PWARNING("EVBUFFER_ERROR");
} else if (what & EVBUFFER_TIMEOUT) {
// ah well
WARNING("render_remote: timeout");
} else {
FATAL("weird bufferevent error code: 0x%02X", what);
}
// cb_fail + free
_remote_render_fail(ctx);
}
static void _remote_connected (int fd, short event, void *arg) {
struct remote_render_ctx *ctx = arg;
// set up the read/write bufferevent
if ((ctx->data_bev = bufferevent_new(fd, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL)
ERROR("bufferevent_new");
// write the render command
if (bufferevent_write(ctx->data_bev, &ctx->render_cmd, sizeof(ctx->render_cmd)))
ERROR("bufferevent_write");
// wait for it to be written out
if (bufferevent_enable(ctx->data_bev, EV_WRITE))
ERROR("bufferevent_enable");
return;
error:
_remote_render_fail(ctx);
}
static void render_cmd_build (struct render *render, struct remote_render_ctx *ctx) {
// just copy over the render params to the render_cmd
ctx->render_cmd.mode = render->mode;
ctx->render_cmd.img_w = htonl(render->img_w);
ctx->render_cmd.img_h = htonl(render->img_h);
ctx->render_cmd.x1 = render->x1;
ctx->render_cmd.y1 = render->y1;
ctx->render_cmd.x2 = render->x2;
ctx->render_cmd.y2 = render->y2;
}
struct remote_render_ctx *render_remote (
struct render *render,
struct remote_node *remote_node,
void (*cb_sent)(void *arg),
void (*cb_data)(struct evbuffer *buf, void *arg),
void (*cb_done)(void *arg),
void (*cb_fail)(void *arg),
void *cb_arg
) {
struct remote_render_ctx *ctx;
int sock;
printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders);
// alloc the remote render ctx
if (!(ctx = calloc(1, sizeof(struct remote_render_ctx))))
ERROR("calloc");
// store the provided callback functions
ctx->cb_sent = cb_sent;
ctx->cb_data = cb_data;
ctx->cb_done = cb_done;
ctx->cb_fail = cb_fail;
ctx->cb_arg = cb_arg;
// copy the relevant stuff from the render_ctx
render_cmd_build(render, ctx);
// create the socket
if ((sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0)
PERROR("socket");
// mark it as nonblocking
if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1)
PERROR("fcntl");
// initiate the connect
int err = connect(sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr));
if (err != -1 || errno != EINPROGRESS)
PERROR("connect");
// do the libevent dance
if (!(ctx->ev_conn = event_new(NULL, sock, EV_WRITE, &_remote_connected, ctx)))
ERROR("event_new");
if (event_add(ctx->ev_conn, NULL))
ERROR("event_add");
// we are now alive
ctx->alive = 1;
// success
return ctx;
error:
free(ctx);
if (sock > 0)
close(sock);
return NULL;
}
int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer) {
if (ctx->data_bev == NULL)
return -1;
bufferevent_setwatermark(ctx->data_bev, EV_READ, recv_threshold, recv_threshold + unread_buffer);
return 0;
}
int render_remote_shake (struct remote_render_ctx *ctx) {
if (ctx->data_bev == NULL)
return -1;
ctx->cb_data(EVBUFFER_INPUT(ctx->data_bev), ctx->cb_arg);
return 0;
}
void render_remote_cancel (struct remote_render_ctx *ctx) {
// we must be alive for this..
assert(ctx->alive);
// if it's still just connecting, cancel that
if (event_pending(ctx->ev_conn, EV_WRITE, NULL))
event_del(ctx->ev_conn);
// this takes care of the rest
_remote_render_free (ctx);
}