terom@2: #include terom@2: #include terom@2: #include terom@2: #include terom@2: #include terom@13: #include terom@2: terom@26: #include /* XXX: for SUN_LEN */ terom@26: terom@4: #include terom@15: #include terom@4: #include terom@4: terom@2: #include "render_remote.h" terom@26: #include "common.h" terom@26: #include "render_struct.h" terom@23: #include "render_net.h" terom@26: #include "socket.h" terom@2: terom@15: struct render_remote { terom@15: // the socket fd terom@15: int sock; terom@15: terom@15: // the event/bufferevent terom@15: struct event ev_connect, ev_data; terom@15: struct bufferevent *bev_data; terom@2: terom@23: // the command terom@23: struct render_cmd render_cmd; terom@13: terom@15: // have we sent the command yet? terom@15: int cmd_sent; terom@15: terom@15: // have we received the EOF? terom@15: int have_eof; terom@15: terom@15: // has cb_done/cb_fail/cancel already been called? terom@13: int alive; terom@2: terom@2: void (*cb_sent)(void *arg); terom@2: void (*cb_data)(struct evbuffer *buf, void *arg); terom@2: void (*cb_done)(void *arg); terom@2: void (*cb_fail)(void *arg); terom@2: terom@2: void *cb_arg; terom@2: }; terom@2: terom@15: // internal prototypes terom@15: static void _render_remote_free (struct render_remote *ctx); terom@15: static void _render_remote_do_data (struct render_remote *ctx); terom@15: static void _render_remote_do_done (struct render_remote *ctx); terom@15: static void _render_remote_do_fail (struct render_remote *ctx); terom@15: terom@15: static void _render_remote_free (struct render_remote *ctx) { terom@16: assert(ctx && !ctx->alive); terom@16: terom@15: // free the bev_data terom@15: if (ctx->bev_data) terom@15: bufferevent_free(ctx->bev_data); terom@13: terom@15: // and the events terom@15: if (event_pending(&ctx->ev_connect, EV_WRITE, NULL)) terom@15: event_del(&ctx->ev_connect); terom@15: terom@15: if (event_pending(&ctx->ev_data, EV_READ, NULL)) terom@15: event_del(&ctx->ev_data); terom@15: terom@15: // close the socket (ctx->ev_connect remains valid even after we're done with it...) terom@15: if (ctx->sock) terom@15: close(ctx->sock); terom@2: terom@2: // free the context structure terom@13: free(ctx); terom@2: } terom@2: terom@15: static void _render_remote_do_data (struct render_remote *ctx) { terom@15: // if there's data in the buffer, call cb_data terom@15: if (evbuffer_get_length(bufferevent_get_input(ctx->bev_data))) { terom@15: ctx->cb_data(EVBUFFER_INPUT(ctx->bev_data), ctx->cb_arg); terom@15: } terom@15: terom@15: // if we got EOF on the connection and there's no data left in the buffer, call cb_done terom@15: if (ctx->have_eof && evbuffer_get_length(bufferevent_get_input(ctx->bev_data)) == 0) { terom@15: _render_remote_do_done(ctx); terom@15: } terom@15: } terom@15: terom@15: static void _render_remote_do_done (struct render_remote *ctx) { terom@13: assert(ctx->alive); terom@13: terom@13: ctx->alive = 0; terom@2: terom@13: ctx->cb_done(ctx->cb_arg); terom@13: } terom@13: terom@15: static void _render_remote_do_fail (struct render_remote *ctx) { terom@13: assert(ctx->alive); terom@13: terom@13: ctx->alive = 0; terom@13: terom@13: ctx->cb_fail(ctx->cb_arg); terom@13: } terom@13: terom@13: static void _remote_write (struct bufferevent *bev, void *arg) { terom@15: struct render_remote *ctx = arg; terom@2: terom@15: if (!ctx->cmd_sent) { terom@15: // write the render command terom@15: if (bufferevent_write(ctx->bev_data, &ctx->render_cmd, sizeof(ctx->render_cmd))) terom@15: ERROR("bufferevent_write"); terom@2: terom@15: // wait for it to be written out (we get called a second time) terom@15: ctx->cmd_sent = 1; terom@15: terom@15: } else { terom@15: // the write buffer was drained, so the render command was write():n terom@15: assert(ctx->cb_sent); terom@15: ctx->cb_sent(ctx->cb_arg); terom@15: ctx->cb_sent = NULL; terom@15: terom@15: // we don't care about EV_WRITE anymore terom@15: if (bufferevent_disable(ctx->bev_data, EV_WRITE)) terom@15: ERROR("bufferevent_disable"); terom@15: terom@15: // are we buffered or raw? terom@15: if (ctx->cb_data) { terom@15: // start receiving data into our buffer terom@15: if (bufferevent_enable(ctx->bev_data, EV_READ)) terom@15: ERROR("bufferevent_enable"); terom@15: terom@15: } else { terom@15: assert(event_initialized(&ctx->ev_data)); terom@15: terom@15: // enable the raw read event terom@15: if (event_add(&ctx->ev_data, NULL)) terom@15: ERROR("event_add"); terom@15: } terom@15: } terom@15: terom@13: return; terom@15: terom@13: error: terom@15: _render_remote_do_fail(ctx); terom@2: } terom@2: terom@13: static void _remote_read (struct bufferevent *bev, void *arg) { terom@15: struct render_remote *ctx = arg; terom@15: terom@15: _render_remote_do_data(ctx); terom@2: } terom@2: terom@13: static void _remote_error (struct bufferevent *bev, short what, void *arg) { terom@15: struct render_remote *ctx = arg; terom@2: terom@2: // OH NOES; WHAT DO WE DO!? terom@2: terom@2: if (what & EVBUFFER_EOF) { terom@2: // great! terom@15: ctx->have_eof = 1; terom@3: terom@15: // flush any remaining data/call cb_send as needed terom@15: _render_remote_do_data(ctx); terom@13: terom@13: return; terom@2: terom@2: } else if (what & EVBUFFER_ERROR) { terom@2: // crap. terom@13: PWARNING("EVBUFFER_ERROR"); terom@2: terom@2: } else if (what & EVBUFFER_TIMEOUT) { terom@2: // ah well terom@13: WARNING("render_remote: timeout"); terom@2: terom@2: } else { terom@13: FATAL("weird bufferevent error code: 0x%02X", what); terom@2: } terom@13: terom@13: // cb_fail + free terom@15: _render_remote_do_fail(ctx); terom@2: } terom@2: terom@15: /* terom@15: * Do the initial IO-agnostic work to initialize the rendering process terom@15: */ terom@19: static struct render_remote *_render_remote_init (struct render *render, struct remote_pool *pool_info) { terom@15: struct render_remote *ctx; terom@19: struct remote_node *node_info; terom@2: terom@19: // get a node from the pool terom@19: if (!(node_info = remote_pool_get(pool_info))) terom@19: ERROR("remote_pool_get"); terom@19: terom@15: // alloc the remote render ctx terom@19: if (!(ctx = calloc(1, sizeof(*ctx)))) terom@15: ERROR("calloc"); terom@15: terom@15: // copy the relevant stuff from the render_ctx terom@11: ctx->render_cmd.mode = render->mode; terom@11: ctx->render_cmd.img_w = htonl(render->img_w); terom@11: ctx->render_cmd.img_h = htonl(render->img_h); terom@11: ctx->render_cmd.x1 = render->x1; terom@11: ctx->render_cmd.y1 = render->y1; terom@11: ctx->render_cmd.x2 = render->x2; terom@11: ctx->render_cmd.y2 = render->y2; terom@15: terom@15: // create the socket terom@26: if ((ctx->sock = socket_connect_async(&node_info->endpoint, SOCK_STREAM)) == -1) terom@26: goto error; terom@15: terom@15: // return the raw ctx terom@15: return ctx; terom@15: terom@15: error: terom@15: _render_remote_free(ctx); terom@15: return NULL; terom@2: } terom@2: terom@15: /* terom@15: * Raw unbuffered I/O mode terom@15: */ terom@15: struct render_remote *render_remote_rawio ( terom@15: struct render *render, terom@19: struct remote_pool *pool_info, terom@15: void (*cb_sent)(void *arg), terom@15: void (*cb_fail)(void *arg), terom@15: void (*cb_io_data)(evutil_socket_t, short, void*), terom@15: void *cb_arg terom@15: ) { terom@15: struct render_remote *ctx; terom@15: terom@15: // short-circuit error handling terom@19: if (!(ctx = _render_remote_init(render, pool_info))) terom@15: return NULL; terom@15: terom@15: // store the provided callback functions terom@15: ctx->cb_sent = cb_sent; terom@15: ctx->cb_fail = cb_fail; terom@15: ctx->cb_arg = cb_arg; terom@16: terom@16: // set up the custom EV_READ callback terom@16: event_set(&ctx->ev_data, ctx->sock, EV_READ, cb_io_data, cb_arg); terom@16: terom@15: // set up the write bufferevent terom@15: if ((ctx->bev_data = bufferevent_new(ctx->sock, NULL, &_remote_write, &_remote_error, ctx)) == NULL) terom@15: ERROR("bufferevent_new"); terom@15: terom@15: // wait for it to connect terom@15: if (bufferevent_enable(ctx->bev_data, EV_WRITE)) terom@15: ERROR("bufferevent_enable"); terom@15: terom@15: // we are now alive terom@15: ctx->alive = 1; terom@15: terom@15: // success terom@15: return ctx; terom@15: terom@15: error: terom@15: _render_remote_free(ctx); terom@15: return NULL; terom@15: } terom@15: terom@15: /* terom@15: * Old buffered mode terom@15: */ terom@15: struct render_remote *render_remote ( terom@11: struct render *render, terom@19: struct remote_pool *pool_info, terom@2: void (*cb_sent)(void *arg), terom@2: void (*cb_data)(struct evbuffer *buf, void *arg), terom@2: void (*cb_done)(void *arg), terom@2: void (*cb_fail)(void *arg), terom@2: void *cb_arg terom@2: ) { terom@15: struct render_remote *ctx; terom@15: terom@15: // short-circuit error handling terom@19: if (!(ctx = _render_remote_init(render, pool_info))) terom@15: return NULL; terom@2: terom@2: // store the provided callback functions terom@2: ctx->cb_sent = cb_sent; terom@2: ctx->cb_data = cb_data; terom@2: ctx->cb_done = cb_done; terom@2: ctx->cb_fail = cb_fail; terom@2: ctx->cb_arg = cb_arg; terom@2: terom@15: // set up the read/write bufferevent terom@15: if ((ctx->bev_data = bufferevent_new(ctx->sock, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL) terom@15: ERROR("bufferevent_new"); terom@2: terom@15: // wait for it to connect terom@15: if (bufferevent_enable(ctx->bev_data, EV_WRITE)) terom@15: ERROR("bufferevent_enable"); terom@2: terom@13: // we are now alive terom@13: ctx->alive = 1; terom@13: terom@2: // success terom@3: return ctx; terom@4: terom@4: error: terom@15: _render_remote_free(ctx); terom@4: return NULL; terom@3: } terom@3: terom@15: void render_remote_set_recv (struct render_remote *ctx, size_t recv_threshold, size_t unread_buffer) { terom@15: assert(ctx->bev_data); terom@3: terom@15: bufferevent_setwatermark(ctx->bev_data, EV_READ, recv_threshold, recv_threshold + unread_buffer); terom@6: } terom@6: terom@15: void render_remote_flush (struct render_remote *ctx) { terom@15: assert(ctx->bev_data); terom@6: terom@15: // call cb_data/cb_done as appropriate terom@15: _render_remote_do_data(ctx); terom@2: } terom@3: terom@15: int render_remote_reschedule (struct render_remote *ctx) { terom@15: assert(event_initialized(&ctx->ev_data)); terom@15: terom@15: // just reschedule it terom@15: if (event_add(&ctx->ev_data, NULL)) terom@15: ERROR("event_add"); terom@15: terom@15: // ok terom@15: return 0; terom@15: terom@15: error: terom@15: return -1; terom@15: } terom@15: terom@15: void render_remote_cancel (struct render_remote *ctx) { terom@13: // we must be alive for this.. terom@13: assert(ctx->alive); terom@11: terom@16: ctx->alive = 0; terom@16: terom@16: _render_remote_free(ctx); terom@16: } terom@16: terom@16: void render_remote_done (struct render_remote *ctx) { terom@16: // we must be alive and non-buffered for this.. terom@16: assert(ctx->alive && event_initialized(&ctx->ev_data)); terom@16: terom@16: ctx->alive = 0; terom@16: terom@15: _render_remote_free(ctx); terom@3: } terom@3: terom@15: void render_remote_free (struct render_remote *ctx) { terom@16: // must be dead already terom@16: assert(!ctx->alive); terom@15: terom@15: _render_remote_free(ctx); terom@15: } terom@15: