flow control for web_main.c
authorTero Marttila <terom@fixme.fi>
Thu, 05 Jun 2008 22:53:47 +0300
changeset 6 4252c27f2b72
parent 5 d4263f1f5b55
child 7 446c0b816b91
flow control for web_main.c

committer: Tero Marttila <terom@fixme.fi>
render_node.c
render_remote.c
render_remote.h
web_main.c
--- a/render_node.c	Sun Jun 01 05:41:41 2008 +0300
+++ b/render_node.c	Thu Jun 05 22:53:47 2008 +0300
@@ -84,7 +84,8 @@
         return;
     }
 
-    printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ", img_w, img_h, x1, y1, x2, y2);
+    printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ...", img_w, img_h, x1, y1, x2, y2);
+    fflush(stdout);
 
     double duration;
 
--- a/render_remote.c	Sun Jun 01 05:41:41 2008 +0300
+++ b/render_remote.c	Thu Jun 05 22:53:47 2008 +0300
@@ -221,11 +221,20 @@
     return NULL;
 }
 
-int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer) {
+int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer) {
     if (ctx->data_bev == NULL)
         return -1;
 
-    bufferevent_setwatermark(ctx->data_bev, EV_READ, chunk_size, chunk_size + overflow_buffer);
+    bufferevent_setwatermark(ctx->data_bev, EV_READ, recv_threshold, recv_threshold + unread_buffer);
+
+    return 0;
+}
+
+int render_remote_shake (struct remote_render_ctx *ctx) {
+    if (ctx->data_bev == NULL)
+        return -1;
+
+    ctx->cb_data(EVBUFFER_INPUT(ctx->data_bev), ctx->cb_arg);
 
     return 0;
 }
--- a/render_remote.h	Sun Jun 01 05:41:41 2008 +0300
+++ b/render_remote.h	Thu Jun 05 22:53:47 2008 +0300
@@ -41,14 +41,29 @@
 void render_remote_cancel (struct remote_render_ctx *ctx);
 
 /*
- * If you don't want to receive the rendered data one byte at a time, you can set a minimum chunk size.
+ * Controls the behaviour of when cb_data is called, and how remote data is read in.
  *
- * cb_done will only be called when the buffer contains at least chunk_size bytes, or no more data is available (cb_done will be called next).
+ * recv_threshold, sets a threshold for calling cb_data - cb_data will only be called
+ * if the buffer contains at least recv_threshold, bytes. Note that cb_data is only
+ * ever called when new data has been receieved from the remote end - never
+ * otherwise. If cb_data doesn't drain the buffer, cb_data will be called again
+ * once more data has been received from the remote end.
  *
- * overflow_buffer is how many bytes will be buffered, at most, in addition to chunk_size (in case rendering is paused). After that, the render node will start to block.
+ * overflow_size can be used to control the amount of unread data in cb_data. If 
+ * the buffer contains more than recv_threshold + unread_buffer bytes, we will stop
+ * accepting bytes from the remote end, and cb_data will not be called any more.
  *
  * Only call this once cb_sent has fired
  */
-int render_remote_set_chunk_size (struct remote_render_ctx *ctx, size_t chunk_size, size_t overflow_buffer);
+int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer);
+
+/*
+ * Call cb_data with the current set of buffered input data immediately,
+ * regardless of whether or not the buffer contains any data, or any new
+ * data has been received.
+ *
+ * Only call this after cb_sent and before cb_done/cb_fail.
+ */
+int render_remote_shake (struct remote_render_ctx *ctx);
 
 #endif /* RENDER_REMOTE_H */
--- a/web_main.c	Sun Jun 01 05:41:41 2008 +0300
+++ b/web_main.c	Thu Jun 05 22:53:47 2008 +0300
@@ -7,6 +7,7 @@
 #include <netinet/ip.h>
 #include <arpa/inet.h>
 #include <signal.h>
+#include <unistd.h>
 
 #include <event2/event.h>
 #include <event2/event_compat.h>
@@ -17,9 +18,11 @@
 #include "render_remote.h"
 #include "common.h"
 
-#define CHUNK_SIZE 4096
+#define MIN_CHUNK_SIZE 4096
 #define OVERFLOW_BUFFER 4096
 
+// do not do any userland socket output buffering
+#define HTTP_BUFFER 0
 
 // what event_base we're using
 static struct event_base *ev_base;
@@ -36,8 +39,13 @@
     struct remote_render_ctx *remote_ctx;
 
     size_t bytes_sent;
+
+    int paused;
 };
 
+// cb func prototypes
+void _render_http_written (struct evhttp_request *request, void *arg);
+
 void _render_cleanup (struct render_request *ctx) {
     // clean up
     free(ctx);
@@ -47,11 +55,14 @@
     struct render_request *ctx = arg;
 
     // set chunk size
-    render_remote_set_chunk_size(ctx->remote_ctx, CHUNK_SIZE, OVERFLOW_BUFFER);
+    render_remote_set_recv(ctx->remote_ctx, MIN_CHUNK_SIZE, OVERFLOW_BUFFER);
 
     // send headers
     evhttp_add_header(evhttp_request_get_output_headers(ctx->http_request), "Content-Type", "image/png");
     evhttp_send_reply_start(ctx->http_request, HTTP_OK, "OK");
+    
+    // setup flow-control
+    evhttp_set_reply_notify(ctx->http_request, HTTP_BUFFER, &_render_http_written, ctx);
 
     ctx->headers_sent = 1;
 
@@ -63,22 +74,50 @@
 
     size_t buf_size = EVBUFFER_LENGTH(buf);
 
-    // send chunk
+    // ignore empty buffers, a result of render_remote_shake()
+    if (buf_size == 0) {
+        printf("render [%p]: remote buffer is empty\n", ctx);
+
+        return;
+    }
+    
+    // check if we are paused
+    if (ctx->paused) {
+        // we are waiting for the HTTP send buffer to clear, so keep the data in the render buffer
+        printf("render [%p]: delaying data: %zu:%zu bytes\n", ctx, buf_size, ctx->bytes_sent);
+
+        return;
+    }
+
+    // move chunk to http buffers
     evhttp_send_reply_chunk(ctx->http_request, buf);
-    
+
     printf("render [%p]: enqueued chunk: %zu/%zu bytes\n", ctx, buf_size, ctx->bytes_sent);
 
+    // mark ourself as paused until httpd tells us to continue
+    ctx->paused = 1;
+    
+    // keep a tally of total sent bytes
     ctx->bytes_sent += buf_size;
 }
 
 void _render_done (void *arg) {
     struct render_request *ctx = arg;
 
+    // if we are paused, just shove the data into the http buffers, they might become larger than they should be, but it's easier to just move the data there and let render_remote complete
+    if (ctx->paused) {
+        printf("render [%p]: done: flushing the rest of our data\n", ctx);
+        ctx->paused = 0;
+
+        render_remote_shake(ctx->remote_ctx);
+    }
+
     // send end
     evhttp_send_reply_end(ctx->http_request);
 
     printf("render [%p]: done: %zu bytes\n", ctx, ctx->bytes_sent);
-    
+        
+    // the request is now done, clean up
     _render_cleanup(ctx);
 }
 
@@ -108,6 +147,18 @@
     _render_cleanup(ctx);
 }
 
+void _render_http_written (struct evhttp_request *request, void *arg) {
+    struct render_request *ctx = arg;
+    
+    printf("render [%p]: http available for write\n", ctx);
+    
+    // unpause ourself
+    ctx->paused = 0;
+
+    // shake out the buffers
+    render_remote_shake(ctx->remote_ctx);
+}
+
 void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) {
     // render request context
     struct render_request *req_ctx = calloc(1, sizeof(struct render_request));
@@ -211,16 +262,41 @@
     signal_del(&ev_sigint);
 }
 
-int main (void) {
+void log_null (int severity, const char *msg) {
+    // ignore
+}
+
+int main (int argc, char **argv) {
     // libevent init
     ev_base = event_init();
 
     if (!ev_base)
         die("event_init");
     
+    int opt;
+    int enable_debug = 0;
+
+    // arguments
+    while ((opt = getopt(argc, argv, "d")) != -1) {
+        switch (opt) {
+            case 'd':
+                // enable libevent debugging
+                enable_debug = 1;
+                break;
+
+            default:
+                err_exit("Usage: %s [-d]", argv[0]);
+        
+        }
+    }
+    
+    // per default it is enabled
+    if (!enable_debug)
+        event_set_log_callback(&log_null);
+    
     // handle signals
     signals_init();
-    
+
     // evhttp init
     struct evhttp *http_server = evhttp_new(ev_base);