# HG changeset patch # User Tero Marttila # Date 1212274089 -10800 # Node ID 675be0a451570a63fd3a7e84b51f3b3d4aa01ecf # Parent 69f8c0acaac7f3138a4114fad0f7d3ea3ddef93a working chunked-streaming of remote-rendered mandelbrots in web_main, next step will be flow control. Remote rendering doesn't compile in render_node. committer: Tero Marttila diff -r 69f8c0acaac7 -r 675be0a45157 Makefile --- a/Makefile Sat May 31 19:35:21 2008 +0300 +++ b/Makefile Sun Jun 01 01:48:09 2008 +0300 @@ -3,6 +3,9 @@ OBJS = common.o mandelbrot.o render.o render_remote.o HEADERS = common.h mandelbrot.h render.h +EXECS = render_file web_main render_node + +all: ${EXECS} render_file: ${OBJS} @@ -10,3 +13,7 @@ render_node: ${OBJS} +clean : + rm ${OBJS} + rm ${EXECS} + diff -r 69f8c0acaac7 -r 675be0a45157 mandelbrot.c --- a/mandelbrot.c Sat May 31 19:35:21 2008 +0300 +++ b/mandelbrot.c Sun Jun 01 01:48:09 2008 +0300 @@ -77,6 +77,11 @@ if (!info_ptr) goto error; + + // libpng error handling + if (setjmp(png_jmpbuf(png_ptr))) { + goto error; + } if (ctx->io_stream) { // use normal libpng I/O diff -r 69f8c0acaac7 -r 675be0a45157 render_file.c --- a/render_file.c Sat May 31 19:35:21 2008 +0300 +++ b/render_file.c Sun Jun 01 01:48:09 2008 +0300 @@ -28,6 +28,7 @@ fprintf(stdout, "rendered %dx%d mandelbrot in %f seconds\n", img_w, img_h, duration); } +#if 0 void render_remote (int img_w, int img_h, FILE *output, FILE *remote) { render_t ctx; struct render_cmd cmd; @@ -101,6 +102,7 @@ return fh; } +#endif int main (int argc, char **argv) { int opt; @@ -134,6 +136,7 @@ verbose = 1; break; +#if 0 case 'r' : if (remote) err_exit("Only use -r once"); @@ -141,6 +144,7 @@ remote = open_remote(optarg); break; +#endif default : err_exit("Usage: %s [-w img_w] [-h img_h] [-o output_file] [-v] [-r host[:port]]", argv[0]); @@ -149,7 +153,8 @@ if (!output) output = stdout; - + +#if 0 if (remote) { if (verbose) fprintf(stderr, "Render [%dx%d] mandelbrot remotely\n", img_w, img_h); @@ -157,7 +162,9 @@ render_remote(img_w, img_h, output, remote); fclose(remote); - } else { + } else +#endif + { if (verbose) fprintf(stderr, "Render [%dx%d] mandelbrot locally\n", img_w, img_h); diff -r 69f8c0acaac7 -r 675be0a45157 render_node.c --- a/render_node.c Sat May 31 19:35:21 2008 +0300 +++ b/render_node.c Sun Jun 01 01:48:09 2008 +0300 @@ -3,12 +3,29 @@ #include #include #include +#include +#include #include "render.h" #include "render_remote.h" #include "mandelbrot.h" #include "common.h" +void sigpipe_handler (int signal) { + /* ignore */ + fprintf(stderr, "SIGPIPE\n"); +} + +void sigpipe_ignore () { + struct sigaction sigpipe_action; + + memset(&sigpipe_action, 0, sizeof(sigpipe_action)); + sigpipe_action.sa_handler = SIG_IGN; + + if (sigaction(SIGPIPE, &sigpipe_action, NULL)) + perr_exit("sigaction"); +} + int my_fread(FILE *fh, void *ptr, size_t size) { int ret = fread(ptr, size, 1, fh); @@ -78,10 +95,13 @@ render_region_raw(&ctx, x1, y1, x2, y2); render_io_stream(&ctx, fh); + sigpipe_ignore(); + // render! - mandelbrot_render_timed(&ctx, &duration); - - printf("time=%fs\n", duration); + if (mandelbrot_render_timed(&ctx, &duration)) + printf("error\n"); // XXX: notify our client? + else + printf("time=%fs\n", duration); // close the FILE* and socket fclose(fh); @@ -89,14 +109,16 @@ return; } + int main (int argc, char** argv) { int ssock, sock; struct sockaddr_in addr; socklen_t addr_len; + // create the socket if ((ssock = socket(PF_INET, SOCK_STREAM, 0)) == -1) - die("socket"); + perr_exit("socket"); addr.sin_family = AF_INET; addr.sin_port = htons(RENDER_PORT); @@ -106,10 +128,10 @@ addr.sin_port = htons(atoi(argv[1])); if (bind(ssock, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) == -1) - die("bind"); + perr_exit("bind"); if (listen(ssock, 1) == -1) - die("listen"); + perr_exit("listen"); printf("RUN: %s:%hu\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); @@ -119,7 +141,7 @@ // accept a new client if ((sock = accept(ssock, (struct sockaddr *) &addr, &addr_len)) == -1) - die("accept"); + perr_exit("accept"); printf("ACCEPT: %s:%hu\n", inet_ntoa(addr.sin_addr), addr.sin_port); diff -r 69f8c0acaac7 -r 675be0a45157 render_remote.c --- a/render_remote.c Sat May 31 19:35:21 2008 +0300 +++ b/render_remote.c Sun Jun 01 01:48:09 2008 +0300 @@ -54,6 +54,7 @@ perror(desc); \ ctx->cb_fail(ctx->cb_arg); \ _remote_render_ctx_free(&ctx); \ + return; \ } while (0) void _remote_write (struct bufferevent *bev, void *arg) { @@ -75,7 +76,7 @@ struct remote_render_ctx *ctx = arg; // pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly - ctx->cb_data(bev->input, ctx->cb_arg); + ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg); } void _remote_error (struct bufferevent *bev, short what, void *arg) { @@ -85,6 +86,12 @@ if (what & EVBUFFER_EOF) { // great! + + // send any remaining-chunk data + if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0) + ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg); + + // signal completion ctx->cb_done(ctx->cb_arg); } else if (what & EVBUFFER_ERROR) { @@ -134,7 +141,7 @@ rrctx->render_cmd.y2 = rctx->y2; } -int render_remote ( +struct remote_render_ctx *render_remote ( render_t *render_ctx, struct sockaddr_storage *remote, void (*cb_sent)(void *arg), @@ -148,7 +155,7 @@ if (!ctx) { error("render_remote: malloc"); - return -1; + return NULL; } // store the provided callback functions @@ -167,7 +174,7 @@ if (sock < 0) { free(ctx); perror("render_remote: socket"); - return -1; + return NULL; } // mark it as nonblocking @@ -175,7 +182,7 @@ free(ctx); close(sock); perror("render_remote: fcntl"); - return -1; + return NULL; } // initiate the connect @@ -185,7 +192,7 @@ free(ctx); close(sock); perror("render_remote: connect"); - return -1; + return NULL; } // do the libevent dance @@ -195,9 +202,33 @@ free(ctx); close(sock); error("render_remote: event_add"); - return -1; + return NULL; } // success + return ctx; +} + +int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer) { + if (ctx->data_bev == NULL) + return -1; + + bufferevent_setwatermark(ctx->data_bev, EV_READ, chunk_size, chunk_size + overflow_buffer); + return 0; } + +void render_remote_cancel (struct remote_render_ctx *ctx) { + // if it's still just connecting, cancel that + if (event_pending(&ctx->ev_conn, EV_WRITE, NULL)) { + event_del(&ctx->ev_conn); + + } + + // close the socket (ctx->ev_conn remains valid even after we're done with it...) + close(EVENT_FD(&ctx->ev_conn)); + + // this takes care of the rest + _remote_render_ctx_free (&ctx); +} + diff -r 69f8c0acaac7 -r 675be0a45157 render_remote.h --- a/render_remote.h Sat May 31 19:35:21 2008 +0300 +++ b/render_remote.h Sun Jun 01 01:48:09 2008 +0300 @@ -12,10 +12,19 @@ #define RENDER_PORT 6159 +struct remote_render_ctx; + /* - * Execute the given render operation on the render_node at the given remote address + * Execute the given render operation on the render_node at the given remote address. + * + * The various callback functions must all be provided. + * + * cb_sent will be invoked after the request has succesfully been written, and before cb_data is called. + * cb_data is called whenever new data has been received. See also, render_remote_set_chunk_size + * cb_done is called when all the data has been passed to cb_data + * cb_fail is called when an error is encountered. This can (and will) happen at any time! */ -int render_remote ( +struct remote_render_ctx *render_remote ( render_t *render_ctx, // what to render struct sockaddr_storage *remote, // what render node to use void (*cb_sent)(void *arg), @@ -25,4 +34,20 @@ void *cb_arg; ); +/* + * Cancel the given request. No more callbacks will be called, buffered data is discarded and the remote render process will cancel asap. + */ +void render_remote_cancel (struct remote_render_ctx *ctx); + +/* + * If you don't want to receive the rendered data one byte at a time, you can set a minimum chunk size. + * + * cb_done will only be called when the buffer contains at least chunk_size bytes, or no more data is available (cb_done will be called next). + * + * overflow_buffer is how many bytes will be buffered, at most, in addition to chunk_size (in case rendering is paused). After that, the render node will start to block. + * + * Only call this once cb_sent has fired + */ +int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer); + #endif /* RENDER_REMOTE_H */ diff -r 69f8c0acaac7 -r 675be0a45157 web_main.c --- a/web_main.c Sat May 31 19:35:21 2008 +0300 +++ b/web_main.c Sun Jun 01 01:48:09 2008 +0300 @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -14,6 +15,9 @@ #include "render_remote.h" #include "common.h" +#define CHUNK_SIZE 4096 +#define OVERFLOW_BUFFER 4096 + // what render node to use static struct sockaddr_storage render_node; @@ -22,11 +26,26 @@ struct evhttp_request *http_request; int headers_sent; + + struct remote_render_ctx *remote_ctx; + + size_t bytes_sent; }; +void _render_cleanup (struct render_request *ctx) { + // not interested anymore + evhttp_connection_set_closecb(ctx->http_request->evcon, NULL, NULL); + + // clean up + free(ctx); +} + void _render_sent (void *arg) { struct render_request *ctx = arg; + // set chunk size + render_remote_set_chunk_size(ctx->remote_ctx, CHUNK_SIZE, OVERFLOW_BUFFER); + // send headers evhttp_add_header(ctx->http_request->output_headers, "Content-Type", "image/png"); evhttp_send_reply_start(ctx->http_request, HTTP_OK, "OK"); @@ -39,10 +58,14 @@ void _render_data (struct evbuffer *buf, void *arg) { struct render_request *ctx = arg; + size_t buf_size = EVBUFFER_LENGTH(buf); + // send chunk evhttp_send_reply_chunk(ctx->http_request, buf); - printf("render [%p]: sent chunk\n", ctx); + printf("render [%p]: enqueued chunk: %zu/%zu bytes\n", ctx, buf_size, ctx->bytes_sent); + + ctx->bytes_sent += buf_size; } void _render_done (void *arg) { @@ -51,10 +74,9 @@ // send end evhttp_send_reply_end(ctx->http_request); - printf("render [%p]: done\n", ctx); - - // clean up - free(ctx); + printf("render [%p]: done: %zu bytes\n", ctx, ctx->bytes_sent); + + _render_cleanup(ctx); } void _render_fail (void *arg) { @@ -69,8 +91,18 @@ printf("render [%p]: failed\n", ctx); - // clean up - free(ctx); + _render_cleanup(ctx); +} + +void _render_http_lost (struct evhttp_connection *connection, void *arg) { + struct render_request *ctx = arg; + + printf("render [%p]: lost http connection\n", ctx); + + // cancel + render_remote_cancel(ctx->remote_ctx); + + _render_cleanup(ctx); } void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) { @@ -79,6 +111,7 @@ req_ctx->http_request = request; req_ctx->headers_sent = 0; + req_ctx->bytes_sent = 0; // render context render_t rend_ctx; @@ -87,17 +120,20 @@ render_region_full(&rend_ctx); // initiate the remote render operation - if (render_remote(&rend_ctx, &render_node, + if ((req_ctx->remote_ctx = render_remote(&rend_ctx, &render_node, &_render_sent, &_render_data, &_render_done, &_render_fail, req_ctx - )) { + )) == NULL) { free(req_ctx); fprintf(stderr, "ERR: render_remote\n"); return; } + + // set close cb + evhttp_connection_set_closecb(request->evcon, &_render_http_lost, req_ctx); printf("render [%p]: started\n", req_ctx); } @@ -131,19 +167,41 @@ evhttp_clear_headers(&qargs); // request log - printf("REQ: [%s:%d] uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, uri, img_w, img_h); + printf("REQ: [%s:%d] method=%d, uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, request->type, uri, img_w, img_h); // do it _http_render_execute(request, img_w, img_h); } +struct event ev_sigint; + +void sigint_handler (int signal, short event, void *arg) { + printf("SIGINT: shutting down\n"); + + if (event_loopexit(NULL)) + err_exit("event_loopexit"); +} + +void signals_init () { + signal_set(&ev_sigint, SIGINT, &sigint_handler, NULL); + signal_add(&ev_sigint, NULL); +} + +void signals_deinit () { + signal_del(&ev_sigint); +} + int main (void) { - // libevent/http init + // libevent init struct event_base *ev_base = event_init(); if (!ev_base) die("event_init"); - + + // handle signals + signals_init(); + + // evhttp init struct evhttp *http_server = evhttp_new(ev_base); if (!http_server) @@ -169,9 +227,13 @@ // run the libevent mainloop if (event_dispatch()) die("event_dispatch"); + + printf("SHUTDOWN\n"); // clean up + signals_deinit(); evhttp_free(http_server); + event_base_free(ev_base); // successfull exit return 0;