render_remote.c
changeset 15 e7f0697814dc
parent 13 ee426f453cf5
child 16 50995bbe442a
equal deleted inserted replaced
14:5a2246f5be78 15:e7f0697814dc
     4 #include <fcntl.h>
     4 #include <fcntl.h>
     5 #include <errno.h>
     5 #include <errno.h>
     6 #include <assert.h>
     6 #include <assert.h>
     7 
     7 
     8 #include <event2/event.h>
     8 #include <event2/event.h>
       
     9 #include <event2/event_struct.h>
     9 #include <event2/bufferevent.h>
    10 #include <event2/bufferevent.h>
    10 
    11 
    11 #include "render_internal.h"    // for render_cmd_build
    12 #include "render_internal.h"    // for render_cmd_build
    12 #include "render_remote.h"
    13 #include "render_remote.h"
    13 #include "common.h"
    14 #include "common.h"
    14 
    15 
    15 struct remote_render_ctx {
    16 struct render_remote {
    16     struct event *ev_conn;
    17     // the socket fd
    17     struct bufferevent *data_bev;
    18     int sock;
       
    19 
       
    20     // the event/bufferevent
       
    21     struct event ev_connect, ev_data;
       
    22     struct bufferevent *bev_data;
    18 
    23 
    19     #pragma pack(push)
    24     #pragma pack(push)
    20     #pragma pack(1)
    25     #pragma pack(1)
    21 
    26 
    22     struct {
    27     struct {
    31         double      y2;
    36         double      y2;
    32     } render_cmd;
    37     } render_cmd;
    33 
    38 
    34     #pragma pack(pop)
    39     #pragma pack(pop)
    35     
    40     
    36     // has cb_done/cb_fail already been called?
    41     // have we sent the command yet?
       
    42     int cmd_sent;
       
    43 
       
    44     // have we received the EOF?
       
    45     int have_eof;
       
    46     
       
    47     // has cb_done/cb_fail/cancel already been called?
    37     int alive;
    48     int alive;
    38 
    49 
    39     void (*cb_sent)(void *arg);
    50     void (*cb_sent)(void *arg);
    40     void (*cb_data)(struct evbuffer *buf, void *arg);
    51     void (*cb_data)(struct evbuffer *buf, void *arg);
    41     void (*cb_done)(void *arg);
    52     void (*cb_done)(void *arg);
    42     void (*cb_fail)(void *arg);
    53     void (*cb_fail)(void *arg);
    43 
    54 
    44     void *cb_arg;
    55     void *cb_arg;
    45 };
    56 };
    46 
    57 
    47 static void _remote_render_free (struct remote_render_ctx *ctx) {
    58 // internal prototypes
    48     // free the data_bev
    59 static void _render_remote_free (struct render_remote *ctx);
    49     if (ctx->data_bev) {
    60 static void _render_remote_do_data (struct render_remote *ctx);
    50         bufferevent_free(ctx->data_bev);
    61 static void _render_remote_do_done (struct render_remote *ctx);
    51         ctx->data_bev = NULL;
    62 static void _render_remote_do_fail (struct render_remote *ctx);
    52     }
    63 
    53     
    64 static void _render_remote_free (struct render_remote *ctx) {
    54     // close the socket (ctx->ev_conn remains valid even after we're done with it...)
    65     // free the bev_data
    55     close(event_get_fd(ctx->ev_conn));
    66     if (ctx->bev_data)
    56 
    67         bufferevent_free(ctx->bev_data);
    57     // and the event
    68     
    58     event_free(ctx->ev_conn);
    69     // and the events
       
    70     if (event_pending(&ctx->ev_connect, EV_WRITE, NULL))
       
    71         event_del(&ctx->ev_connect);
       
    72     
       
    73     if (event_pending(&ctx->ev_data, EV_READ, NULL))
       
    74         event_del(&ctx->ev_data);
       
    75         
       
    76     // close the socket (ctx->ev_connect remains valid even after we're done with it...)
       
    77     if (ctx->sock)
       
    78         close(ctx->sock);
    59     
    79     
    60     // free the context structure
    80     // free the context structure
    61     free(ctx);
    81     free(ctx);
    62 }
    82 }
    63 
    83 
    64 static void _remote_render_done (struct remote_render_ctx *ctx) {
    84 static void _render_remote_do_data (struct render_remote *ctx) {
       
    85     // if there's data in the buffer, call cb_data
       
    86     if (evbuffer_get_length(bufferevent_get_input(ctx->bev_data))) {
       
    87         ctx->cb_data(EVBUFFER_INPUT(ctx->bev_data), ctx->cb_arg);
       
    88     }
       
    89     
       
    90     // if we got EOF on the connection and there's no data left in the buffer, call cb_done
       
    91     if (ctx->have_eof && evbuffer_get_length(bufferevent_get_input(ctx->bev_data)) == 0) {
       
    92         _render_remote_do_done(ctx);
       
    93     }
       
    94 }
       
    95 
       
    96 static void _render_remote_do_done (struct render_remote *ctx) {
    65     assert(ctx->alive);
    97     assert(ctx->alive);
    66     
    98     
    67     ctx->alive = 0;
    99     ctx->alive = 0;
    68 
   100 
    69     ctx->cb_done(ctx->cb_arg);
   101     ctx->cb_done(ctx->cb_arg);
    70 
   102 }
    71     _remote_render_free(ctx);
   103 
    72 }
   104 static void _render_remote_do_fail (struct render_remote *ctx) {
    73 
       
    74 static void _remote_render_fail (struct remote_render_ctx *ctx) {
       
    75     assert(ctx->alive);
   105     assert(ctx->alive);
    76     
   106     
    77     ctx->alive = 0;
   107     ctx->alive = 0;
    78 
   108 
    79     ctx->cb_fail(ctx->cb_arg);
   109     ctx->cb_fail(ctx->cb_arg);
    80     
       
    81     _remote_render_free(ctx);
       
    82 }
   110 }
    83 
   111 
    84 static void _remote_write (struct bufferevent *bev, void *arg) {
   112 static void _remote_write (struct bufferevent *bev, void *arg) {
    85     struct remote_render_ctx *ctx = arg;
   113     struct render_remote *ctx = arg;
    86 
   114     
    87     // the write buffer was drained, so the render command was sent
   115     if (!ctx->cmd_sent) {
    88     ctx->cb_sent(ctx->cb_arg);
   116         // write the render command
    89     
   117         if (bufferevent_write(ctx->bev_data, &ctx->render_cmd, sizeof(ctx->render_cmd)))
    90     // we don't care about EV_WRITE anymore
   118             ERROR("bufferevent_write");
    91     if (bufferevent_disable(ctx->data_bev, EV_WRITE))
   119 
    92         ERROR("bufferevent_disable");
   120         // wait for it to be written out (we get called a second time)
    93 
   121         ctx->cmd_sent = 1;
    94     // start receiving data
   122 
    95     if (bufferevent_enable(ctx->data_bev, EV_READ))
   123     } else {
    96         ERROR("bufferevent_enable");
   124         // the write buffer was drained, so the render command was write():n
    97     
   125         assert(ctx->cb_sent);
       
   126         ctx->cb_sent(ctx->cb_arg);
       
   127         ctx->cb_sent = NULL;
       
   128         
       
   129         // we don't care about EV_WRITE anymore
       
   130         if (bufferevent_disable(ctx->bev_data, EV_WRITE))
       
   131             ERROR("bufferevent_disable");
       
   132         
       
   133         // are we buffered or raw?
       
   134         if (ctx->cb_data) {
       
   135             // start receiving data into our buffer
       
   136             if (bufferevent_enable(ctx->bev_data, EV_READ))
       
   137                 ERROR("bufferevent_enable");
       
   138 
       
   139         } else {
       
   140             assert(event_initialized(&ctx->ev_data));
       
   141             
       
   142             // enable the raw read event
       
   143             if (event_add(&ctx->ev_data, NULL))
       
   144                 ERROR("event_add");
       
   145         }
       
   146     }
       
   147 
    98     return;
   148     return;
    99 error:
   149 
   100     _remote_render_fail(ctx);
   150 error:
       
   151     _render_remote_do_fail(ctx);
   101 }
   152 }
   102 
   153 
   103 static void _remote_read (struct bufferevent *bev, void *arg) {
   154 static void _remote_read (struct bufferevent *bev, void *arg) {
   104     struct remote_render_ctx *ctx = arg;
   155     struct render_remote *ctx = arg;
   105     
   156 
   106     // pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly
   157     _render_remote_do_data(ctx);
   107     ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
       
   108 }
   158 }
   109 
   159 
   110 static void _remote_error (struct bufferevent *bev, short what, void *arg) {
   160 static void _remote_error (struct bufferevent *bev, short what, void *arg) {
   111     struct remote_render_ctx *ctx = arg;
   161     struct render_remote *ctx = arg;
   112 
   162 
   113     // OH NOES; WHAT DO WE DO!?
   163     // OH NOES; WHAT DO WE DO!?
   114     
   164     
   115     if (what & EVBUFFER_EOF) {
   165     if (what & EVBUFFER_EOF) {
   116         // great!
   166         // great!
   117         
   167         ctx->have_eof = 1;
   118         // send any remaining-chunk data
   168         
   119         if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0)
   169         // flush any remaining data/call cb_send as needed
   120             ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
   170         _render_remote_do_data(ctx);
   121 
       
   122         // signal completion
       
   123         _remote_render_done(ctx);
       
   124 
   171 
   125         return;
   172         return;
   126 
   173 
   127     } else if (what & EVBUFFER_ERROR) {
   174     } else if (what & EVBUFFER_ERROR) {
   128         // crap.
   175         // crap.
   135     } else {
   182     } else {
   136         FATAL("weird bufferevent error code: 0x%02X", what);
   183         FATAL("weird bufferevent error code: 0x%02X", what);
   137     }
   184     }
   138     
   185     
   139     // cb_fail + free
   186     // cb_fail + free
   140     _remote_render_fail(ctx);
   187     _render_remote_do_fail(ctx);
   141 }
   188 }
   142 
   189 
   143 static void _remote_connected (int fd, short event, void *arg) {
   190 /*
   144     struct remote_render_ctx *ctx = arg;
   191  * Do the initial IO-agnostic work to initialize the rendering process
   145 
   192  */
   146     // set up the read/write bufferevent
   193 static struct render_remote *_render_remote_init (struct render *render, struct remote_node *remote_node) {
   147     if ((ctx->data_bev = bufferevent_new(fd, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL)
   194     struct render_remote *ctx;
   148         ERROR("bufferevent_new");
   195 
   149 
   196     printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders);
   150     // write the render command
   197 
   151     if (bufferevent_write(ctx->data_bev, &ctx->render_cmd, sizeof(ctx->render_cmd)))
   198     // alloc the remote render ctx
   152         ERROR("bufferevent_write");
   199     if (!(ctx = calloc(1, sizeof(struct render_remote))))
   153 
   200         ERROR("calloc");
   154     // wait for it to be written out
   201         
   155     if (bufferevent_enable(ctx->data_bev, EV_WRITE))
   202     // copy the relevant stuff from the render_ctx
   156         ERROR("bufferevent_enable");
       
   157     
       
   158     return;
       
   159 
       
   160 error:
       
   161     _remote_render_fail(ctx);
       
   162 }
       
   163 
       
   164 static void render_cmd_build (struct render *render, struct remote_render_ctx *ctx) {
       
   165     // just copy over the render params to the render_cmd
       
   166     ctx->render_cmd.mode = render->mode;
   203     ctx->render_cmd.mode = render->mode;
   167     ctx->render_cmd.img_w = htonl(render->img_w);
   204     ctx->render_cmd.img_w = htonl(render->img_w);
   168     ctx->render_cmd.img_h = htonl(render->img_h);
   205     ctx->render_cmd.img_h = htonl(render->img_h);
   169     ctx->render_cmd.x1 = render->x1;
   206     ctx->render_cmd.x1 = render->x1;
   170     ctx->render_cmd.y1 = render->y1;
   207     ctx->render_cmd.y1 = render->y1;
   171     ctx->render_cmd.x2 = render->x2;
   208     ctx->render_cmd.x2 = render->x2;
   172     ctx->render_cmd.y2 = render->y2;
   209     ctx->render_cmd.y2 = render->y2;
   173 }
   210 
   174 
   211     // create the socket
   175 struct remote_render_ctx *render_remote (
   212     if ((ctx->sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0)
       
   213         PERROR("socket");
       
   214 
       
   215     // mark it as nonblocking
       
   216     if (fcntl(ctx->sock, F_SETFL, O_NONBLOCK) == -1)
       
   217         PERROR("fcntl");
       
   218     
       
   219     // initiate the connect
       
   220     int err = connect(ctx->sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr));
       
   221 
       
   222     if (err != -1 || errno != EINPROGRESS)
       
   223         PERROR("connect");
       
   224 
       
   225     // return the raw ctx
       
   226     return ctx;
       
   227 
       
   228 error:
       
   229     _render_remote_free(ctx);
       
   230     return NULL;
       
   231 }
       
   232 
       
   233 /*
       
   234  * Raw unbuffered I/O mode
       
   235  */
       
   236 struct render_remote *render_remote_rawio (
       
   237         struct render *render,
       
   238         struct remote_node *remote_node,
       
   239         void (*cb_sent)(void *arg),
       
   240         void (*cb_fail)(void *arg),
       
   241         void (*cb_io_data)(evutil_socket_t, short, void*),
       
   242         void *cb_arg
       
   243 ) {
       
   244     struct render_remote *ctx;
       
   245     
       
   246     // short-circuit error handling
       
   247     if (!(ctx = _render_remote_init(render, remote_node)))
       
   248         return NULL;
       
   249 
       
   250     // store the provided callback functions
       
   251     ctx->cb_sent = cb_sent;
       
   252     ctx->cb_fail = cb_fail;
       
   253     ctx->cb_arg = cb_arg;
       
   254     
       
   255     // set up the write bufferevent
       
   256     if ((ctx->bev_data = bufferevent_new(ctx->sock, NULL, &_remote_write, &_remote_error, ctx)) == NULL)
       
   257         ERROR("bufferevent_new");
       
   258 
       
   259     // wait for it to connect
       
   260     if (bufferevent_enable(ctx->bev_data, EV_WRITE))
       
   261         ERROR("bufferevent_enable");
       
   262 
       
   263     // set up the custom EV_READ callback
       
   264     event_set(&ctx->ev_data, ctx->sock, EV_READ, cb_io_data, cb_arg);
       
   265 
       
   266     // we are now alive
       
   267     ctx->alive = 1;
       
   268 
       
   269     // success
       
   270     return ctx;
       
   271 
       
   272 error:
       
   273     _render_remote_free(ctx);
       
   274     return NULL;
       
   275 }   
       
   276 
       
   277 /*
       
   278  * Old buffered mode
       
   279  */
       
   280 struct render_remote *render_remote (
   176         struct render *render,
   281         struct render *render,
   177         struct remote_node *remote_node,
   282         struct remote_node *remote_node,
   178         void (*cb_sent)(void *arg),
   283         void (*cb_sent)(void *arg),
   179         void (*cb_data)(struct evbuffer *buf, void *arg),
   284         void (*cb_data)(struct evbuffer *buf, void *arg),
   180         void (*cb_done)(void *arg),
   285         void (*cb_done)(void *arg),
   181         void (*cb_fail)(void *arg),
   286         void (*cb_fail)(void *arg),
   182         void *cb_arg
   287         void *cb_arg
   183 ) {    
   288 ) {    
   184     struct remote_render_ctx *ctx;
   289     struct render_remote *ctx;
   185     int sock;
   290     
   186 
   291     // short-circuit error handling
   187     printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders);
   292     if (!(ctx = _render_remote_init(render, remote_node)))
   188 
   293         return NULL;
   189     // alloc the remote render ctx
       
   190     if (!(ctx = calloc(1, sizeof(struct remote_render_ctx))))
       
   191         ERROR("calloc");
       
   192     
   294     
   193     // store the provided callback functions
   295     // store the provided callback functions
   194     ctx->cb_sent = cb_sent;
   296     ctx->cb_sent = cb_sent;
   195     ctx->cb_data = cb_data;
   297     ctx->cb_data = cb_data;
   196     ctx->cb_done = cb_done;
   298     ctx->cb_done = cb_done;
   197     ctx->cb_fail = cb_fail;
   299     ctx->cb_fail = cb_fail;
   198     ctx->cb_arg = cb_arg;
   300     ctx->cb_arg = cb_arg;
   199     
   301 
   200     // copy the relevant stuff from the render_ctx
   302     // set up the read/write bufferevent
   201     render_cmd_build(render, ctx);
   303     if ((ctx->bev_data = bufferevent_new(ctx->sock, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL)
   202     
   304         ERROR("bufferevent_new");
   203     // create the socket
   305 
   204     if ((sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0)
   306     // wait for it to connect
   205         PERROR("socket");
   307     if (bufferevent_enable(ctx->bev_data, EV_WRITE))
   206 
   308         ERROR("bufferevent_enable");
   207     // mark it as nonblocking
   309 
   208     if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1)
       
   209         PERROR("fcntl");
       
   210     
       
   211     // initiate the connect
       
   212     int err = connect(sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr));
       
   213 
       
   214     if (err != -1 || errno != EINPROGRESS)
       
   215         PERROR("connect");
       
   216 
       
   217     // do the libevent dance
       
   218     if (!(ctx->ev_conn = event_new(NULL, sock, EV_WRITE, &_remote_connected, ctx)))
       
   219         ERROR("event_new");
       
   220 
       
   221     if (event_add(ctx->ev_conn, NULL))
       
   222         ERROR("event_add");
       
   223     
       
   224     // we are now alive
   310     // we are now alive
   225     ctx->alive = 1;
   311     ctx->alive = 1;
   226 
   312 
   227     // success
   313     // success
   228     return ctx;
   314     return ctx;
   229 
   315 
   230 error:
   316 error:
   231     free(ctx);
   317     _render_remote_free(ctx);
   232 
       
   233     if (sock > 0)
       
   234         close(sock);
       
   235 
       
   236     return NULL;
   318     return NULL;
   237 }
   319 }
   238 
   320 
   239 int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer) {
   321 void render_remote_set_recv (struct render_remote *ctx, size_t recv_threshold, size_t unread_buffer) {
   240     if (ctx->data_bev == NULL)
   322     assert(ctx->bev_data);
   241         return -1;
   323 
   242 
   324     bufferevent_setwatermark(ctx->bev_data, EV_READ, recv_threshold, recv_threshold + unread_buffer);
   243     bufferevent_setwatermark(ctx->data_bev, EV_READ, recv_threshold, recv_threshold + unread_buffer);
   325 }
   244 
   326 
       
   327 void render_remote_flush (struct render_remote *ctx) {
       
   328     assert(ctx->bev_data);
       
   329 
       
   330     // call cb_data/cb_done as appropriate
       
   331     _render_remote_do_data(ctx);
       
   332 }
       
   333 
       
   334 int render_remote_reschedule (struct render_remote *ctx) {
       
   335     assert(event_initialized(&ctx->ev_data));
       
   336     
       
   337     // just reschedule it
       
   338     if (event_add(&ctx->ev_data, NULL))
       
   339         ERROR("event_add");
       
   340 
       
   341     // ok
   245     return 0;
   342     return 0;
   246 }
   343 
   247 
   344 error:
   248 int render_remote_shake (struct remote_render_ctx *ctx) {
   345     return -1;
   249     if (ctx->data_bev == NULL)
   346 }
   250         return -1;
   347 
   251 
   348 void render_remote_cancel (struct render_remote *ctx) {
   252     ctx->cb_data(EVBUFFER_INPUT(ctx->data_bev), ctx->cb_arg);
       
   253 
       
   254     return 0;
       
   255 }
       
   256 
       
   257 void render_remote_cancel (struct remote_render_ctx *ctx) {
       
   258     // we must be alive for this..
   349     // we must be alive for this..
   259     assert(ctx->alive);
   350     assert(ctx->alive);
   260 
   351 
   261     // if it's still just connecting, cancel that
   352     _render_remote_free(ctx);
   262     if (event_pending(ctx->ev_conn, EV_WRITE, NULL))
   353 }
   263         event_del(ctx->ev_conn);
   354 
   264     
   355 void render_remote_free (struct render_remote *ctx) {
   265     // this takes care of the rest
   356     // XXX: add some sanity checks
   266     _remote_render_free (ctx);
   357     
   267 }
   358     _render_remote_free(ctx);
   268 
   359 }
       
   360