working chunked-streaming of remote-rendered mandelbrots in web_main, next step will be flow control. Remote rendering doesn't compile in render_node.
committer: Tero Marttila <terom@fixme.fi>
--- a/Makefile Sat May 31 19:35:21 2008 +0300
+++ b/Makefile Sun Jun 01 01:48:09 2008 +0300
@@ -3,6 +3,9 @@
OBJS = common.o mandelbrot.o render.o render_remote.o
HEADERS = common.h mandelbrot.h render.h
+EXECS = render_file web_main render_node
+
+all: ${EXECS}
render_file: ${OBJS}
@@ -10,3 +13,7 @@
render_node: ${OBJS}
+clean :
+ rm ${OBJS}
+ rm ${EXECS}
+
--- a/mandelbrot.c Sat May 31 19:35:21 2008 +0300
+++ b/mandelbrot.c Sun Jun 01 01:48:09 2008 +0300
@@ -77,6 +77,11 @@
if (!info_ptr)
goto error;
+
+ // libpng error handling
+ if (setjmp(png_jmpbuf(png_ptr))) {
+ goto error;
+ }
if (ctx->io_stream) {
// use normal libpng I/O
--- a/render_file.c Sat May 31 19:35:21 2008 +0300
+++ b/render_file.c Sun Jun 01 01:48:09 2008 +0300
@@ -28,6 +28,7 @@
fprintf(stdout, "rendered %dx%d mandelbrot in %f seconds\n", img_w, img_h, duration);
}
+#if 0
void render_remote (int img_w, int img_h, FILE *output, FILE *remote) {
render_t ctx;
struct render_cmd cmd;
@@ -101,6 +102,7 @@
return fh;
}
+#endif
int main (int argc, char **argv) {
int opt;
@@ -134,6 +136,7 @@
verbose = 1;
break;
+#if 0
case 'r' :
if (remote)
err_exit("Only use -r once");
@@ -141,6 +144,7 @@
remote = open_remote(optarg);
break;
+#endif
default :
err_exit("Usage: %s [-w img_w] [-h img_h] [-o output_file] [-v] [-r host[:port]]", argv[0]);
@@ -149,7 +153,8 @@
if (!output)
output = stdout;
-
+
+#if 0
if (remote) {
if (verbose)
fprintf(stderr, "Render [%dx%d] mandelbrot remotely\n", img_w, img_h);
@@ -157,7 +162,9 @@
render_remote(img_w, img_h, output, remote);
fclose(remote);
- } else {
+ } else
+#endif
+ {
if (verbose)
fprintf(stderr, "Render [%dx%d] mandelbrot locally\n", img_w, img_h);
--- a/render_node.c Sat May 31 19:35:21 2008 +0300
+++ b/render_node.c Sun Jun 01 01:48:09 2008 +0300
@@ -3,12 +3,29 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include <stdlib.h>
+#include <signal.h>
+#include <string.h>
#include "render.h"
#include "render_remote.h"
#include "mandelbrot.h"
#include "common.h"
+void sigpipe_handler (int signal) {
+ /* ignore */
+ fprintf(stderr, "SIGPIPE\n");
+}
+
+void sigpipe_ignore () {
+ struct sigaction sigpipe_action;
+
+ memset(&sigpipe_action, 0, sizeof(sigpipe_action));
+ sigpipe_action.sa_handler = SIG_IGN;
+
+ if (sigaction(SIGPIPE, &sigpipe_action, NULL))
+ perr_exit("sigaction");
+}
+
int my_fread(FILE *fh, void *ptr, size_t size) {
int ret = fread(ptr, size, 1, fh);
@@ -78,10 +95,13 @@
render_region_raw(&ctx, x1, y1, x2, y2);
render_io_stream(&ctx, fh);
+ sigpipe_ignore();
+
// render!
- mandelbrot_render_timed(&ctx, &duration);
-
- printf("time=%fs\n", duration);
+ if (mandelbrot_render_timed(&ctx, &duration))
+ printf("error\n"); // XXX: notify our client?
+ else
+ printf("time=%fs\n", duration);
// close the FILE* and socket
fclose(fh);
@@ -89,14 +109,16 @@
return;
}
+
int main (int argc, char** argv) {
int ssock, sock;
struct sockaddr_in addr;
socklen_t addr_len;
+
// create the socket
if ((ssock = socket(PF_INET, SOCK_STREAM, 0)) == -1)
- die("socket");
+ perr_exit("socket");
addr.sin_family = AF_INET;
addr.sin_port = htons(RENDER_PORT);
@@ -106,10 +128,10 @@
addr.sin_port = htons(atoi(argv[1]));
if (bind(ssock, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) == -1)
- die("bind");
+ perr_exit("bind");
if (listen(ssock, 1) == -1)
- die("listen");
+ perr_exit("listen");
printf("RUN: %s:%hu\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
@@ -119,7 +141,7 @@
// accept a new client
if ((sock = accept(ssock, (struct sockaddr *) &addr, &addr_len)) == -1)
- die("accept");
+ perr_exit("accept");
printf("ACCEPT: %s:%hu\n", inet_ntoa(addr.sin_addr), addr.sin_port);
--- a/render_remote.c Sat May 31 19:35:21 2008 +0300
+++ b/render_remote.c Sun Jun 01 01:48:09 2008 +0300
@@ -54,6 +54,7 @@
perror(desc); \
ctx->cb_fail(ctx->cb_arg); \
_remote_render_ctx_free(&ctx); \
+ return; \
} while (0)
void _remote_write (struct bufferevent *bev, void *arg) {
@@ -75,7 +76,7 @@
struct remote_render_ctx *ctx = arg;
// pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly
- ctx->cb_data(bev->input, ctx->cb_arg);
+ ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
}
void _remote_error (struct bufferevent *bev, short what, void *arg) {
@@ -85,6 +86,12 @@
if (what & EVBUFFER_EOF) {
// great!
+
+ // send any remaining-chunk data
+ if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0)
+ ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg);
+
+ // signal completion
ctx->cb_done(ctx->cb_arg);
} else if (what & EVBUFFER_ERROR) {
@@ -134,7 +141,7 @@
rrctx->render_cmd.y2 = rctx->y2;
}
-int render_remote (
+struct remote_render_ctx *render_remote (
render_t *render_ctx,
struct sockaddr_storage *remote,
void (*cb_sent)(void *arg),
@@ -148,7 +155,7 @@
if (!ctx) {
error("render_remote: malloc");
- return -1;
+ return NULL;
}
// store the provided callback functions
@@ -167,7 +174,7 @@
if (sock < 0) {
free(ctx);
perror("render_remote: socket");
- return -1;
+ return NULL;
}
// mark it as nonblocking
@@ -175,7 +182,7 @@
free(ctx);
close(sock);
perror("render_remote: fcntl");
- return -1;
+ return NULL;
}
// initiate the connect
@@ -185,7 +192,7 @@
free(ctx);
close(sock);
perror("render_remote: connect");
- return -1;
+ return NULL;
}
// do the libevent dance
@@ -195,9 +202,33 @@
free(ctx);
close(sock);
error("render_remote: event_add");
- return -1;
+ return NULL;
}
// success
+ return ctx;
+}
+
+int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer) {
+ if (ctx->data_bev == NULL)
+ return -1;
+
+ bufferevent_setwatermark(ctx->data_bev, EV_READ, chunk_size, chunk_size + overflow_buffer);
+
return 0;
}
+
+void render_remote_cancel (struct remote_render_ctx *ctx) {
+ // if it's still just connecting, cancel that
+ if (event_pending(&ctx->ev_conn, EV_WRITE, NULL)) {
+ event_del(&ctx->ev_conn);
+
+ }
+
+ // close the socket (ctx->ev_conn remains valid even after we're done with it...)
+ close(EVENT_FD(&ctx->ev_conn));
+
+ // this takes care of the rest
+ _remote_render_ctx_free (&ctx);
+}
+
--- a/render_remote.h Sat May 31 19:35:21 2008 +0300
+++ b/render_remote.h Sun Jun 01 01:48:09 2008 +0300
@@ -12,10 +12,19 @@
#define RENDER_PORT 6159
+struct remote_render_ctx;
+
/*
- * Execute the given render operation on the render_node at the given remote address
+ * Execute the given render operation on the render_node at the given remote address.
+ *
+ * The various callback functions must all be provided.
+ *
+ * cb_sent will be invoked after the request has succesfully been written, and before cb_data is called.
+ * cb_data is called whenever new data has been received. See also, render_remote_set_chunk_size
+ * cb_done is called when all the data has been passed to cb_data
+ * cb_fail is called when an error is encountered. This can (and will) happen at any time!
*/
-int render_remote (
+struct remote_render_ctx *render_remote (
render_t *render_ctx, // what to render
struct sockaddr_storage *remote, // what render node to use
void (*cb_sent)(void *arg),
@@ -25,4 +34,20 @@
void *cb_arg;
);
+/*
+ * Cancel the given request. No more callbacks will be called, buffered data is discarded and the remote render process will cancel asap.
+ */
+void render_remote_cancel (struct remote_render_ctx *ctx);
+
+/*
+ * If you don't want to receive the rendered data one byte at a time, you can set a minimum chunk size.
+ *
+ * cb_done will only be called when the buffer contains at least chunk_size bytes, or no more data is available (cb_done will be called next).
+ *
+ * overflow_buffer is how many bytes will be buffered, at most, in addition to chunk_size (in case rendering is paused). After that, the render node will start to block.
+ *
+ * Only call this once cb_sent has fired
+ */
+int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer);
+
#endif /* RENDER_REMOTE_H */
--- a/web_main.c Sat May 31 19:35:21 2008 +0300
+++ b/web_main.c Sun Jun 01 01:48:09 2008 +0300
@@ -6,6 +6,7 @@
#include <string.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
+#include <signal.h>
#include <event.h>
#include <evhttp.h>
@@ -14,6 +15,9 @@
#include "render_remote.h"
#include "common.h"
+#define CHUNK_SIZE 4096
+#define OVERFLOW_BUFFER 4096
+
// what render node to use
static struct sockaddr_storage render_node;
@@ -22,11 +26,26 @@
struct evhttp_request *http_request;
int headers_sent;
+
+ struct remote_render_ctx *remote_ctx;
+
+ size_t bytes_sent;
};
+void _render_cleanup (struct render_request *ctx) {
+ // not interested anymore
+ evhttp_connection_set_closecb(ctx->http_request->evcon, NULL, NULL);
+
+ // clean up
+ free(ctx);
+}
+
void _render_sent (void *arg) {
struct render_request *ctx = arg;
+ // set chunk size
+ render_remote_set_chunk_size(ctx->remote_ctx, CHUNK_SIZE, OVERFLOW_BUFFER);
+
// send headers
evhttp_add_header(ctx->http_request->output_headers, "Content-Type", "image/png");
evhttp_send_reply_start(ctx->http_request, HTTP_OK, "OK");
@@ -39,10 +58,14 @@
void _render_data (struct evbuffer *buf, void *arg) {
struct render_request *ctx = arg;
+ size_t buf_size = EVBUFFER_LENGTH(buf);
+
// send chunk
evhttp_send_reply_chunk(ctx->http_request, buf);
- printf("render [%p]: sent chunk\n", ctx);
+ printf("render [%p]: enqueued chunk: %zu/%zu bytes\n", ctx, buf_size, ctx->bytes_sent);
+
+ ctx->bytes_sent += buf_size;
}
void _render_done (void *arg) {
@@ -51,10 +74,9 @@
// send end
evhttp_send_reply_end(ctx->http_request);
- printf("render [%p]: done\n", ctx);
-
- // clean up
- free(ctx);
+ printf("render [%p]: done: %zu bytes\n", ctx, ctx->bytes_sent);
+
+ _render_cleanup(ctx);
}
void _render_fail (void *arg) {
@@ -69,8 +91,18 @@
printf("render [%p]: failed\n", ctx);
- // clean up
- free(ctx);
+ _render_cleanup(ctx);
+}
+
+void _render_http_lost (struct evhttp_connection *connection, void *arg) {
+ struct render_request *ctx = arg;
+
+ printf("render [%p]: lost http connection\n", ctx);
+
+ // cancel
+ render_remote_cancel(ctx->remote_ctx);
+
+ _render_cleanup(ctx);
}
void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) {
@@ -79,6 +111,7 @@
req_ctx->http_request = request;
req_ctx->headers_sent = 0;
+ req_ctx->bytes_sent = 0;
// render context
render_t rend_ctx;
@@ -87,17 +120,20 @@
render_region_full(&rend_ctx);
// initiate the remote render operation
- if (render_remote(&rend_ctx, &render_node,
+ if ((req_ctx->remote_ctx = render_remote(&rend_ctx, &render_node,
&_render_sent,
&_render_data,
&_render_done,
&_render_fail,
req_ctx
- )) {
+ )) == NULL) {
free(req_ctx);
fprintf(stderr, "ERR: render_remote\n");
return;
}
+
+ // set close cb
+ evhttp_connection_set_closecb(request->evcon, &_render_http_lost, req_ctx);
printf("render [%p]: started\n", req_ctx);
}
@@ -131,19 +167,41 @@
evhttp_clear_headers(&qargs);
// request log
- printf("REQ: [%s:%d] uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, uri, img_w, img_h);
+ printf("REQ: [%s:%d] method=%d, uri=%s, img_w=%d, img_h=%d\n", peer_address, peer_port, request->type, uri, img_w, img_h);
// do it
_http_render_execute(request, img_w, img_h);
}
+struct event ev_sigint;
+
+void sigint_handler (int signal, short event, void *arg) {
+ printf("SIGINT: shutting down\n");
+
+ if (event_loopexit(NULL))
+ err_exit("event_loopexit");
+}
+
+void signals_init () {
+ signal_set(&ev_sigint, SIGINT, &sigint_handler, NULL);
+ signal_add(&ev_sigint, NULL);
+}
+
+void signals_deinit () {
+ signal_del(&ev_sigint);
+}
+
int main (void) {
- // libevent/http init
+ // libevent init
struct event_base *ev_base = event_init();
if (!ev_base)
die("event_init");
-
+
+ // handle signals
+ signals_init();
+
+ // evhttp init
struct evhttp *http_server = evhttp_new(ev_base);
if (!http_server)
@@ -169,9 +227,13 @@
// run the libevent mainloop
if (event_dispatch())
die("event_dispatch");
+
+ printf("SHUTDOWN\n");
// clean up
+ signals_deinit();
evhttp_free(http_server);
+ event_base_free(ev_base);
// successfull exit
return 0;