working chunked-streaming of remote-rendered mandelbrots in web_main, next step will be flow control. Remote rendering doesn't compile in render_node.
authorTero Marttila <terom@fixme.fi>
Sun, 01 Jun 2008 01:48:09 +0300
changeset 3 675be0a45157
parent 2 69f8c0acaac7
child 4 49edbdf9ebe7
working chunked-streaming of remote-rendered mandelbrots in web_main, next step will be flow control. Remote rendering doesn't compile in render_node.

committer: Tero Marttila <terom@fixme.fi>
Makefile
mandelbrot.c
render_file.c
render_node.c
render_remote.c
render_remote.h
web_main.c
--- a/Makefile	Sat May 31 19:35:21 2008 +0300
+++ b/Makefile	Sun Jun 01 01:48:09 2008 +0300
@@ -3,6 +3,9 @@
 
 OBJS = common.o mandelbrot.o render.o render_remote.o
 HEADERS = common.h mandelbrot.h render.h
+EXECS = render_file web_main render_node
+
+all: ${EXECS}
 
 render_file: ${OBJS}
 
@@ -10,3 +13,7 @@
 
 render_node: ${OBJS}
 
+clean :
+	rm ${OBJS}
+	rm ${EXECS}
+
--- a/mandelbrot.c	Sat May 31 19:35:21 2008 +0300
+++ b/mandelbrot.c	Sun Jun 01 01:48:09 2008 +0300
@@ -77,6 +77,11 @@
 
         if (!info_ptr)
             goto error;
+
+        // libpng error handling
+        if (setjmp(png_jmpbuf(png_ptr))) {
+            goto error;
+        }
         
         if (ctx->io_stream) {
             // use normal libpng I/O
--- a/render_file.c	Sat May 31 19:35:21 2008 +0300
+++ b/render_file.c	Sun Jun 01 01:48:09 2008 +0300
@@ -28,6 +28,7 @@
         fprintf(stdout, "rendered %dx%d mandelbrot in %f seconds\n", img_w, img_h, duration);
 }
 
+#if 0
 void render_remote (int img_w, int img_h, FILE *output, FILE *remote) {
     render_t ctx;
     struct render_cmd cmd;
@@ -101,6 +102,7 @@
 
     return fh;
 }
+#endif
 
 int main (int argc, char **argv) {
     int opt;
@@ -134,6 +136,7 @@
                 verbose = 1;
                 break;
 
+#if 0
             case 'r' :
                 if (remote)
                     err_exit("Only use -r once");
@@ -141,6 +144,7 @@
                 remote = open_remote(optarg);
 
                 break;
+#endif
 
             default :
                 err_exit("Usage: %s [-w img_w] [-h img_h] [-o output_file] [-v] [-r host[:port]]", argv[0]);
@@ -149,7 +153,8 @@
 
     if (!output)
         output = stdout;
-    
+
+#if 0
     if (remote) {
         if (verbose)
             fprintf(stderr, "Render [%dx%d] mandelbrot remotely\n", img_w, img_h);
@@ -157,7 +162,9 @@
         render_remote(img_w, img_h, output, remote);
 
         fclose(remote);
-    } else {
+    } else
+#endif    
+    {
         if (verbose)
             fprintf(stderr, "Render [%dx%d] mandelbrot locally\n", img_w, img_h);
 
--- a/render_node.c	Sat May 31 19:35:21 2008 +0300
+++ b/render_node.c	Sun Jun 01 01:48:09 2008 +0300
@@ -3,12 +3,29 @@
 #include <arpa/inet.h>
 #include <sys/socket.h>
 #include <stdlib.h>
+#include <signal.h>
+#include <string.h>
 
 #include "render.h"
 #include "render_remote.h"
 #include "mandelbrot.h"
 #include "common.h"
 
+void sigpipe_handler (int signal) {
+    /* ignore */
+    fprintf(stderr, "SIGPIPE\n");
+}
+
+void sigpipe_ignore () {
+    struct sigaction sigpipe_action;
+
+    memset(&sigpipe_action, 0, sizeof(sigpipe_action));
+    sigpipe_action.sa_handler = SIG_IGN;
+
+    if (sigaction(SIGPIPE, &sigpipe_action, NULL))
+        perr_exit("sigaction");
+}
+
 int my_fread(FILE *fh, void *ptr, size_t size) {
     int ret = fread(ptr, size, 1, fh);
     
@@ -78,10 +95,13 @@
     render_region_raw(&ctx, x1, y1, x2, y2);
     render_io_stream(&ctx, fh);
     
+    sigpipe_ignore();
+    
     // render!
-    mandelbrot_render_timed(&ctx, &duration);
-
-    printf("time=%fs\n", duration);
+    if (mandelbrot_render_timed(&ctx, &duration))
+        printf("error\n");  // XXX: notify our client?
+    else
+        printf("time=%fs\n", duration);
     
     // close the FILE* and socket
     fclose(fh);
@@ -89,14 +109,16 @@
     return;
 }
 
+
 int main (int argc, char** argv) {
     int ssock, sock;
     struct sockaddr_in addr;
     socklen_t addr_len;
 
+
     // create the socket
     if ((ssock = socket(PF_INET, SOCK_STREAM, 0)) == -1)
-        die("socket");
+        perr_exit("socket");
 
     addr.sin_family = AF_INET;
     addr.sin_port = htons(RENDER_PORT);
@@ -106,10 +128,10 @@
         addr.sin_port = htons(atoi(argv[1]));
     
     if (bind(ssock, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) == -1)
-        die("bind");
+        perr_exit("bind");
     
     if (listen(ssock, 1) == -1)
-        die("listen");
+        perr_exit("listen");
     
     printf("RUN: %s:%hu\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
 
@@ -119,7 +141,7 @@
 
         // accept a new client
         if ((sock = accept(ssock, (struct sockaddr *) &addr, &addr_len)) == -1)
-            die("accept");
+            perr_exit("accept");
         
         printf("ACCEPT: %s:%hu\n", inet_ntoa(addr.sin_addr), addr.sin_port);
         
--- a/render_remote.c	Sat May 31 19:35:21 2008 +0300
+++ b/render_remote.c	Sun Jun 01 01:48:09 2008 +0300
@@ -54,6 +54,7 @@
         perror(desc);                           \
         ctx->cb_fail(ctx->cb_arg);              \
         _remote_render_ctx_free(&ctx);          \
+        return;                                 \
     } while (0)
 
 void _remote_write (struct bufferevent *bev, void *arg) {
@@ -75,7 +76,7 @@
     struct remote_render_ctx *ctx = arg;
     
     // pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly
-    ctx->cb_data(bev->input, ctx->cb_arg);
+    ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
 }
 
 void _remote_error (struct bufferevent *bev, short what, void *arg) {
@@ -85,6 +86,12 @@
     
     if (what & EVBUFFER_EOF) {
         // great!
+        
+        // send any remaining-chunk data
+        if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0)
+            ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
+
+        // signal completion
         ctx->cb_done(ctx->cb_arg);
 
     } else if (what & EVBUFFER_ERROR) {
@@ -134,7 +141,7 @@
     rrctx->render_cmd.y2 = rctx->y2;
 }
 
-int render_remote (
+struct remote_render_ctx *render_remote (
         render_t *render_ctx,
         struct sockaddr_storage *remote,
         void (*cb_sent)(void *arg),
@@ -148,7 +155,7 @@
 
     if (!ctx) {
         error("render_remote: malloc");
-        return -1;
+        return NULL;
     }
     
     // store the provided callback functions
@@ -167,7 +174,7 @@
     if (sock < 0) {
         free(ctx);
         perror("render_remote: socket");
-        return -1;
+        return NULL;
     }
 
     // mark it as nonblocking
@@ -175,7 +182,7 @@
         free(ctx);
         close(sock);
         perror("render_remote: fcntl");
-        return -1;
+        return NULL;
     }
     
     // initiate the connect
@@ -185,7 +192,7 @@
         free(ctx);
         close(sock);
         perror("render_remote: connect");
-        return -1;
+        return NULL;
     }
 
     // do the libevent dance
@@ -195,9 +202,33 @@
         free(ctx);
         close(sock);
         error("render_remote: event_add");
-        return -1;
+        return NULL;
     }
     
     // success
+    return ctx;
+}
+
+int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer) {
+    if (ctx->data_bev == NULL)
+        return -1;
+
+    bufferevent_setwatermark(ctx->data_bev, EV_READ, chunk_size, chunk_size + overflow_buffer);
+
     return 0;
 }
+
+void render_remote_cancel (struct remote_render_ctx *ctx) {
+    // if it's still just connecting, cancel that
+    if (event_pending(&ctx->ev_conn, EV_WRITE, NULL)) {
+        event_del(&ctx->ev_conn);
+
+    }
+    
+    // close the socket (ctx->ev_conn remains valid even after we're done with it...)
+    close(EVENT_FD(&ctx->ev_conn));
+    
+    // this takes care of the rest
+    _remote_render_ctx_free (&ctx);
+}
+
--- a/render_remote.h	Sat May 31 19:35:21 2008 +0300
+++ b/render_remote.h	Sun Jun 01 01:48:09 2008 +0300
@@ -12,10 +12,19 @@
 
 #define RENDER_PORT 6159
 
+struct remote_render_ctx;
+
 /*
- * Execute the given render operation on the render_node at the given remote address
+ * Execute the given render operation on the render_node at the given remote address.
+ *
+ * The various callback functions must all be provided.
+ *
+ * cb_sent will be invoked after the request has succesfully been written, and before cb_data is called.
+ * cb_data is called whenever new data has been received. See also, render_remote_set_chunk_size
+ * cb_done is called when all the data has been passed to cb_data
+ * cb_fail is called when an error is encountered. This can (and will) happen at any time!
  */
-int render_remote (
+struct remote_render_ctx *render_remote (
         render_t *render_ctx,               // what to render
         struct sockaddr_storage *remote,    // what render node to use
         void (*cb_sent)(void *arg),
@@ -25,4 +34,20 @@
         void *cb_arg;
 );
 
+/*
+ * Cancel the given request. No more callbacks will be called, buffered data is discarded and the remote render process will cancel asap.
+ */
+void render_remote_cancel (struct remote_render_ctx *ctx);
+
+/*
+ * If you don't want to receive the rendered data one byte at a time, you can set a minimum chunk size.
+ *
+ * cb_done will only be called when the buffer contains at least chunk_size bytes, or no more data is available (cb_done will be called next).
+ *
+ * overflow_buffer is how many bytes will be buffered, at most, in addition to chunk_size (in case rendering is paused). After that, the render node will start to block.
+ *
+ * Only call this once cb_sent has fired
+ */
+int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer);
+
 #endif /* RENDER_REMOTE_H */
--- a/web_main.c	Sat May 31 19:35:21 2008 +0300
+++ b/web_main.c	Sun Jun 01 01:48:09 2008 +0300
@@ -6,6 +6,7 @@
 #include <string.h>
 #include <netinet/ip.h>
 #include <arpa/inet.h>
+#include <signal.h>
 
 #include <event.h>
 #include <evhttp.h>
@@ -14,6 +15,9 @@
 #include "render_remote.h"
 #include "common.h"
 
+#define CHUNK_SIZE 4096
+#define OVERFLOW_BUFFER 4096
+
 // what render node to use
 static struct sockaddr_storage render_node;
 
@@ -22,11 +26,26 @@
     struct evhttp_request *http_request;
 
     int headers_sent;
+    
+    struct remote_render_ctx *remote_ctx;
+
+    size_t bytes_sent;
 };
 
+void _render_cleanup (struct render_request *ctx) {
+     // not interested anymore
+    evhttp_connection_set_closecb(ctx->http_request->evcon, NULL, NULL);
+
+    // clean up
+    free(ctx);
+}
+
 void _render_sent (void *arg) {
     struct render_request *ctx = arg;
 
+    // set chunk size
+    render_remote_set_chunk_size(ctx->remote_ctx, CHUNK_SIZE, OVERFLOW_BUFFER);
+
     // send headers
     evhttp_add_header(ctx->http_request->output_headers, "Content-Type", "image/png");
     evhttp_send_reply_start(ctx->http_request, HTTP_OK, "OK");
@@ -39,10 +58,14 @@
 void _render_data (struct evbuffer *buf, void *arg) {
     struct render_request *ctx = arg;
 
+    size_t buf_size = EVBUFFER_LENGTH(buf);
+
     // send chunk
     evhttp_send_reply_chunk(ctx->http_request, buf);
     
-    printf("render [%p]: sent chunk\n", ctx);
+    printf("render [%p]: enqueued chunk: %zu/%zu bytes\n", ctx, buf_size, ctx->bytes_sent);
+
+    ctx->bytes_sent += buf_size;
 }
 
 void _render_done (void *arg) {
@@ -51,10 +74,9 @@
     // send end
     evhttp_send_reply_end(ctx->http_request);
 
-    printf("render [%p]: done\n", ctx);
-
-    // clean up
-    free(ctx);
+    printf("render [%p]: done: %zu bytes\n", ctx, ctx->bytes_sent);
+    
+    _render_cleanup(ctx);
 }
 
 void _render_fail (void *arg) {
@@ -69,8 +91,18 @@
     
     printf("render [%p]: failed\n", ctx);
 
-    // clean up
-    free(ctx);
+    _render_cleanup(ctx);
+}
+
+void _render_http_lost (struct evhttp_connection *connection, void *arg) {
+    struct render_request *ctx = arg;
+
+    printf("render [%p]: lost http connection\n", ctx);
+
+    // cancel
+    render_remote_cancel(ctx->remote_ctx);
+
+    _render_cleanup(ctx);
 }
 
 void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) {
@@ -79,6 +111,7 @@
     
     req_ctx->http_request = request;
     req_ctx->headers_sent = 0;
+    req_ctx->bytes_sent = 0;
     
     // render context
     render_t rend_ctx;
@@ -87,17 +120,20 @@
     render_region_full(&rend_ctx);
     
     // initiate the remote render operation
-    if (render_remote(&rend_ctx, &render_node,
+    if ((req_ctx->remote_ctx = render_remote(&rend_ctx, &render_node,
         &_render_sent,
         &_render_data,
         &_render_done,
         &_render_fail,
         req_ctx
-    )) {
+    )) == NULL) {
         free(req_ctx);
         fprintf(stderr, "ERR: render_remote\n");
         return;
     }
+
+    // set close cb
+    evhttp_connection_set_closecb(request->evcon, &_render_http_lost, req_ctx);
     
     printf("render [%p]: started\n", req_ctx);
 }
@@ -131,19 +167,41 @@
     evhttp_clear_headers(&qargs);
 
     // request log
-    printf("REQ: [%s:%d] uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, uri, img_w, img_h);
+    printf("REQ: [%s:%d] method=%d, uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, request->type, uri, img_w, img_h);
     
     // do it
     _http_render_execute(request, img_w, img_h);
 }
 
+struct event ev_sigint;
+
+void sigint_handler (int signal, short event, void *arg) {
+    printf("SIGINT: shutting down\n");
+    
+    if (event_loopexit(NULL))
+        err_exit("event_loopexit");
+}
+
+void signals_init () {
+    signal_set(&ev_sigint, SIGINT, &sigint_handler, NULL);
+    signal_add(&ev_sigint, NULL);
+}
+
+void signals_deinit () {
+    signal_del(&ev_sigint);
+}
+
 int main (void) {
-    // libevent/http init
+    // libevent init
     struct event_base *ev_base = event_init();
 
     if (!ev_base)
         die("event_init");
-
+    
+    // handle signals
+    signals_init();
+    
+    // evhttp init
     struct evhttp *http_server = evhttp_new(ev_base);
 
     if (!http_server)
@@ -169,9 +227,13 @@
     // run the libevent mainloop
     if (event_dispatch())
         die("event_dispatch");
+
+    printf("SHUTDOWN\n");
     
     // clean up
+    signals_deinit();
     evhttp_free(http_server);
+    event_base_free(ev_base);
     
     // successfull exit
     return 0;