# HG changeset patch # User Tero Marttila # Date 1212695627 -10800 # Node ID 4252c27f2b72030b3a1dabc21325a19a0cf2b5d1 # Parent d4263f1f5b55026ef4fadd22cd78e0a8a4147d1d flow control for web_main.c committer: Tero Marttila diff -r d4263f1f5b55 -r 4252c27f2b72 render_node.c --- a/render_node.c Sun Jun 01 05:41:41 2008 +0300 +++ b/render_node.c Thu Jun 05 22:53:47 2008 +0300 @@ -84,7 +84,8 @@ return; } - printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ", img_w, img_h, x1, y1, x2, y2); + printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ...", img_w, img_h, x1, y1, x2, y2); + fflush(stdout); double duration; diff -r d4263f1f5b55 -r 4252c27f2b72 render_remote.c --- a/render_remote.c Sun Jun 01 05:41:41 2008 +0300 +++ b/render_remote.c Thu Jun 05 22:53:47 2008 +0300 @@ -221,11 +221,20 @@ return NULL; } -int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer) { +int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer) { if (ctx->data_bev == NULL) return -1; - bufferevent_setwatermark(ctx->data_bev, EV_READ, chunk_size, chunk_size + overflow_buffer); + bufferevent_setwatermark(ctx->data_bev, EV_READ, recv_threshold, recv_threshold + unread_buffer); + + return 0; +} + +int render_remote_shake (struct remote_render_ctx *ctx) { + if (ctx->data_bev == NULL) + return -1; + + ctx->cb_data(EVBUFFER_INPUT(ctx->data_bev), ctx->cb_arg); return 0; } diff -r d4263f1f5b55 -r 4252c27f2b72 render_remote.h --- a/render_remote.h Sun Jun 01 05:41:41 2008 +0300 +++ b/render_remote.h Thu Jun 05 22:53:47 2008 +0300 @@ -41,14 +41,29 @@ void render_remote_cancel (struct remote_render_ctx *ctx); /* - * If you don't want to receive the rendered data one byte at a time, you can set a minimum chunk size. + * Controls the behaviour of when cb_data is called, and how remote data is read in. * - * cb_done will only be called when the buffer contains at least chunk_size bytes, or no more data is available (cb_done will be called next). + * recv_threshold, sets a threshold for calling cb_data - cb_data will only be called + * if the buffer contains at least recv_threshold, bytes. Note that cb_data is only + * ever called when new data has been receieved from the remote end - never + * otherwise. If cb_data doesn't drain the buffer, cb_data will be called again + * once more data has been received from the remote end. * - * overflow_buffer is how many bytes will be buffered, at most, in addition to chunk_size (in case rendering is paused). After that, the render node will start to block. + * overflow_size can be used to control the amount of unread data in cb_data. If + * the buffer contains more than recv_threshold + unread_buffer bytes, we will stop + * accepting bytes from the remote end, and cb_data will not be called any more. * * Only call this once cb_sent has fired */ -int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer); +int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer); + +/* + * Call cb_data with the current set of buffered input data immediately, + * regardless of whether or not the buffer contains any data, or any new + * data has been received. + * + * Only call this after cb_sent and before cb_done/cb_fail. + */ +int render_remote_shake (struct remote_render_ctx *ctx); #endif /* RENDER_REMOTE_H */ diff -r d4263f1f5b55 -r 4252c27f2b72 web_main.c --- a/web_main.c Sun Jun 01 05:41:41 2008 +0300 +++ b/web_main.c Thu Jun 05 22:53:47 2008 +0300 @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -17,9 +18,11 @@ #include "render_remote.h" #include "common.h" -#define CHUNK_SIZE 4096 +#define MIN_CHUNK_SIZE 4096 #define OVERFLOW_BUFFER 4096 +// do not do any userland socket output buffering +#define HTTP_BUFFER 0 // what event_base we're using static struct event_base *ev_base; @@ -36,8 +39,13 @@ struct remote_render_ctx *remote_ctx; size_t bytes_sent; + + int paused; }; +// cb func prototypes +void _render_http_written (struct evhttp_request *request, void *arg); + void _render_cleanup (struct render_request *ctx) { // clean up free(ctx); @@ -47,11 +55,14 @@ struct render_request *ctx = arg; // set chunk size - render_remote_set_chunk_size(ctx->remote_ctx, CHUNK_SIZE, OVERFLOW_BUFFER); + render_remote_set_recv(ctx->remote_ctx, MIN_CHUNK_SIZE, OVERFLOW_BUFFER); // send headers evhttp_add_header(evhttp_request_get_output_headers(ctx->http_request), "Content-Type", "image/png"); evhttp_send_reply_start(ctx->http_request, HTTP_OK, "OK"); + + // setup flow-control + evhttp_set_reply_notify(ctx->http_request, HTTP_BUFFER, &_render_http_written, ctx); ctx->headers_sent = 1; @@ -63,22 +74,50 @@ size_t buf_size = EVBUFFER_LENGTH(buf); - // send chunk + // ignore empty buffers, a result of render_remote_shake() + if (buf_size == 0) { + printf("render [%p]: remote buffer is empty\n", ctx); + + return; + } + + // check if we are paused + if (ctx->paused) { + // we are waiting for the HTTP send buffer to clear, so keep the data in the render buffer + printf("render [%p]: delaying data: %zu:%zu bytes\n", ctx, buf_size, ctx->bytes_sent); + + return; + } + + // move chunk to http buffers evhttp_send_reply_chunk(ctx->http_request, buf); - + printf("render [%p]: enqueued chunk: %zu/%zu bytes\n", ctx, buf_size, ctx->bytes_sent); + // mark ourself as paused until httpd tells us to continue + ctx->paused = 1; + + // keep a tally of total sent bytes ctx->bytes_sent += buf_size; } void _render_done (void *arg) { struct render_request *ctx = arg; + // if we are paused, just shove the data into the http buffers, they might become larger than they should be, but it's easier to just move the data there and let render_remote complete + if (ctx->paused) { + printf("render [%p]: done: flushing the rest of our data\n", ctx); + ctx->paused = 0; + + render_remote_shake(ctx->remote_ctx); + } + // send end evhttp_send_reply_end(ctx->http_request); printf("render [%p]: done: %zu bytes\n", ctx, ctx->bytes_sent); - + + // the request is now done, clean up _render_cleanup(ctx); } @@ -108,6 +147,18 @@ _render_cleanup(ctx); } +void _render_http_written (struct evhttp_request *request, void *arg) { + struct render_request *ctx = arg; + + printf("render [%p]: http available for write\n", ctx); + + // unpause ourself + ctx->paused = 0; + + // shake out the buffers + render_remote_shake(ctx->remote_ctx); +} + void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) { // render request context struct render_request *req_ctx = calloc(1, sizeof(struct render_request)); @@ -211,16 +262,41 @@ signal_del(&ev_sigint); } -int main (void) { +void log_null (int severity, const char *msg) { + // ignore +} + +int main (int argc, char **argv) { // libevent init ev_base = event_init(); if (!ev_base) die("event_init"); + int opt; + int enable_debug = 0; + + // arguments + while ((opt = getopt(argc, argv, "d")) != -1) { + switch (opt) { + case 'd': + // enable libevent debugging + enable_debug = 1; + break; + + default: + err_exit("Usage: %s [-d]", argv[0]); + + } + } + + // per default it is enabled + if (!enable_debug) + event_set_log_callback(&log_null); + // handle signals signals_init(); - + // evhttp init struct evhttp *http_server = evhttp_new(ev_base);