a working threaded sliced render, plus modifications to other modules to use this in web_main
committer: Tero Marttila <terom@fixme.fi>
--- a/Makefile Mon Jun 09 18:58:39 2008 +0300
+++ b/Makefile Tue Jun 17 16:39:55 2008 +0300
@@ -1,4 +1,4 @@
-LDFLAGS = -Llib/libevent-dev/lib -levent -lpng
+LDFLAGS = -Llib/libevent-dev/lib -levent -lpng -pthread
CFLAGS = -Wall -g -Ilib/libevent-dev/include
EXECS = file_main web_main node_main
@@ -6,24 +6,25 @@
all: web_main file_main node_main
common.o: common.c common.h
-http.o: http.c http.h
-remote_node.o: remote_node.c remote_node.h
-remote_pool.o: remote_pool.c remote_pool.h
-render.o: render.c render.h
-render_remote.o: render_remote.c render_remote.h
-render_png.o: render_png.c render_png.h
-render_raw.o: render_raw.c render_raw.h
-render_local.o: render_local.c render_local.h
-render_multi.o: render_multi.c render_multi.h render_remote.h
-render_mandelbrot.o: render_mandelbrot.c render_mandelbrot.h
-render_slices.o: render_slices.c render_slices.h
+http.o: http.c http.h common.o
+remote_node.o: remote_node.c remote_node.h common.o
+remote_pool.o: remote_pool.c remote_pool.h common.o
+render.o: render.c render.h common.o
+render_remote.o: render_remote.c render_remote.h common.o
+render_png.o: render_png.c render_png.h common.o
+render_raw.o: render_raw.c render_raw.h common.o
+render_local.o: render_local.c render_local.h common.o
+render_multi.o: render_multi.c render_multi.h render_remote.o common.o
+render_mandelbrot.o: render_mandelbrot.c render_mandelbrot.h common.o
+render_slices.o: render_slices.c render_slices.h common.o
+render_threads.o: render_threads.c render_threads.h common.o
file_main.o: file_main.c
node_main.o: node_main.c
web_main.o: web_main.c
file_main: file_main.o common.o render.o render_raw.o render_png.o render_local.o render_mandelbrot.o
-node_main: node_main.o common.o render.o render_raw.o render_png.o render_local.o render_mandelbrot.o
+node_main: node_main.o common.o render.o render_raw.o render_png.o render_local.o render_slices.o render_threads.o render_mandelbrot.o
web_main: web_main.o common.o http.o render.o render_png.o remote_node.o remote_pool.o render_remote.o render_multi.o render_slices.o
clean :
--- a/common.c Mon Jun 09 18:58:39 2008 +0300
+++ b/common.c Tue Jun 17 16:39:55 2008 +0300
@@ -7,23 +7,34 @@
#include "common.h"
-void _generic_err (const char *func, int perr, int do_exit, const char *fmt, ...) {
- va_list va;
-
+static void _generic_err_vargs (const char *func, int perr, const char *fmt, va_list va) {
if (func)
fprintf(stderr, "%s: ", func);
-
- va_start(va, fmt);
+
vfprintf(stderr, fmt, va);
- va_end(va);
-
+
if (perr)
fprintf(stderr, ": %s\n", strerror(errno));
fprintf(stderr, "\n");
-
- if (do_exit)
- exit(EXIT_FAILURE);
+}
+
+void _generic_err (const char *func, int perr, const char *fmt, ...) {
+ va_list va;
+
+ va_start(va, fmt);
+ _generic_err_vargs(func, perr, fmt, va);
+ va_end(va);
+}
+
+void _generic_err_exit (const char *func, int perr, const char *fmt, ...) {
+ va_list va;
+
+ va_start(va, fmt);
+ _generic_err_vargs(func, perr, fmt, va);
+ va_end(va);
+
+ exit(EXIT_FAILURE);
}
int parse_hostport (char *hostport, char **host, char **port) {
--- a/common.h Mon Jun 09 18:58:39 2008 +0300
+++ b/common.h Tue Jun 17 16:39:55 2008 +0300
@@ -3,18 +3,24 @@
* error handling
*/
-void _generic_err (const char *func, int perr, int do_exit, const char *fmt, ...)
- __attribute__ ((format (printf, 4, 5)));
+void _generic_err (const char *func, int perr, const char *fmt, ...)
+ __attribute__ ((format (printf, 3, 4)));
+
+// needs to be defined as its own function for the noreturn attribute
+void _generic_err_exit (const char *func, int perr, const char *fmt, ...)
+ __attribute__ ((format (printf, 3, 4)))
+ __attribute__ ((noreturn));
+
// various kinds of ways to handle an error, 2**3 of them, *g*
-#define error(...) _generic_err(NULL, 0, 0, __VA_ARGS__)
-#define err_exit(...) _generic_err(NULL, 0, 1, __VA_ARGS__)
-#define perr(...) _generic_err(NULL, 1, 0, __VA_ARGS__)
-#define perr_exit(...) _generic_err(NULL, 1, 1, __VA_ARGS__)
-#define err_func(func, ...) _generic_err(func, 0, 0, __VA_ARGS__)
-#define err_func_exit(func, ...) _generic_err(func, 0, 1, __VA_ARGS__)
-#define perr_func(func, ...) _generic_err(func, 1, 0, __VA_ARGS__)
-#define perr_func_exit(func, ...) _generic_err(func, 1, 1, __VA_ARGS__)
+#define error(...) _generic_err( NULL, 0, __VA_ARGS__)
+#define err_exit(...) _generic_err_exit( NULL, 0, __VA_ARGS__)
+#define perr(...) _generic_err( NULL, 1, __VA_ARGS__)
+#define perr_exit(...) _generic_err_exit( NULL, 1, __VA_ARGS__)
+#define err_func(func, ...) _generic_err( func, 0, __VA_ARGS__)
+#define err_func_exit(func, ...) _generic_err_exit( func, 0, __VA_ARGS__)
+#define perr_func(func, ...) _generic_err( func, 1, __VA_ARGS__)
+#define perr_func_exit(func, ...) _generic_err_exit( func, 1, __VA_ARGS__)
// error(func + colon + msg, ...) + goto error
#define ERROR(...) do { err_func(__func__, __VA_ARGS__); goto error; } while (0)
--- a/node_main.c Mon Jun 09 18:58:39 2008 +0300
+++ b/node_main.c Tue Jun 17 16:39:55 2008 +0300
@@ -11,6 +11,7 @@
#include "render.h"
#include "render_remote.h" // for RENDER_PORT_NAME
#include "render_local.h"
+#include "render_threads.h"
void sigpipe_handler (int signal) {
/* ignore */
@@ -105,15 +106,32 @@
if (render_io_stream(ctx, fh))
ERROR("render_io_stream");
-
+ struct render_threads *threads_info = NULL;
+
+ // render threaded \o/
+ if (!(threads_info = render_threads_alloc(ctx)))
+ goto error;
+
+ if (render_threads_wait(threads_info))
+ goto error;
+
+ printf("done!\n");
+/*
// render!
if (render_local(ctx, &duration))
ERROR("render_local");
printf("time=%fs\n", duration);
+*/
+
+ // fall through to just clean up normally
error:
+ // free the threads_info?
+ if (threads_info)
+ render_threads_free(threads_info);
+
// close the FILE* and socket
fclose(fh);
--- a/render_multi.c Mon Jun 09 18:58:39 2008 +0300
+++ b/render_multi.c Tue Jun 17 16:39:55 2008 +0300
@@ -437,8 +437,6 @@
int i;
for (i = 0; i < ctx->node_count; i++) {
- struct remote_node *node_info;
-
// store the info struct
ctx->nodes[i].info = render_slices_get_slice_info(&ctx->slices, i);
@@ -446,12 +444,9 @@
ctx->nodes[i].slice_width = ctx->nodes[i].info->render_info->img_w;
ctx->nodes[i].self = ctx;
- // get a node from the pool
- if (!(node_info = remote_pool_get(pool_info)))
- ERROR("remote_pool_get");
-
+
// the render_remote
- if (!(ctx->nodes[i].render_remote = render_remote_rawio(ctx->nodes[i].info->render_info, node_info,
+ if (!(ctx->nodes[i].render_remote = render_remote_rawio(ctx->nodes[i].info->render_info, pool_info,
&_render_multi_sent, &_render_multi_fail, &_render_multi_data_raw, &ctx->nodes[i]))
)
ERROR("render_remote_rawio");
--- a/render_png.c Mon Jun 09 18:58:39 2008 +0300
+++ b/render_png.c Tue Jun 17 16:39:55 2008 +0300
@@ -35,23 +35,20 @@
free(ctx);
}
-int render_png_deinit (struct render_png *ctx) {
- // not initialized? Just return a positive nonzero value
- if (!ctx->png_ptr)
- return 1;
+void render_png_deinit (struct render_png *ctx) {
+ // are we initialized?
+ if (ctx->png_ptr) {
+ // libpng error handling
+ if (setjmp(png_jmpbuf(ctx->png_ptr)))
+ ERROR("libpng");
+
+ png_destroy_write_struct(&ctx->png_ptr, &ctx->info_ptr);
+ }
- // libpng error handling
- if (setjmp(png_jmpbuf(ctx->png_ptr)))
- ERROR("libpng");
-
- png_destroy_write_struct(&ctx->png_ptr, &ctx->info_ptr);
-
- // success
- return 0;
+ return;
error:
- free(ctx);
- return -1;
+ WARNING("destroying our libpng structures failed, possible memory leak?");
}
/*
--- a/render_png_struct.h Mon Jun 09 18:58:39 2008 +0300
+++ b/render_png_struct.h Tue Jun 17 16:39:55 2008 +0300
@@ -21,6 +21,6 @@
};
int render_png_init (struct render_png *ctx, struct render *render);
-int render_png_deinit (struct render_png *ctx);
+void render_png_deinit (struct render_png *ctx);
#endif /* RENDER_PNG_INTERNAL_H */
--- a/render_remote.c Mon Jun 09 18:58:39 2008 +0300
+++ b/render_remote.c Tue Jun 17 16:39:55 2008 +0300
@@ -192,13 +192,18 @@
/*
* Do the initial IO-agnostic work to initialize the rendering process
*/
-static struct render_remote *_render_remote_init (struct render *render, struct remote_node *remote_node) {
+static struct render_remote *_render_remote_init (struct render *render, struct remote_pool *pool_info) {
struct render_remote *ctx;
+ struct remote_node *node_info;
- printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders);
+ // get a node from the pool
+ if (!(node_info = remote_pool_get(pool_info)))
+ ERROR("remote_pool_get");
+
+ printf("remote_node render load: %d/%d\n", node_info->current_load, node_info->parallel_renders);
// alloc the remote render ctx
- if (!(ctx = calloc(1, sizeof(struct render_remote))))
+ if (!(ctx = calloc(1, sizeof(*ctx))))
ERROR("calloc");
// copy the relevant stuff from the render_ctx
@@ -211,7 +216,7 @@
ctx->render_cmd.y2 = render->y2;
// create the socket
- if ((ctx->sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0)
+ if ((ctx->sock = socket(node_info->addr.ss_family, SOCK_STREAM, 0)) < 0)
PERROR("socket");
// mark it as nonblocking
@@ -219,7 +224,7 @@
PERROR("fcntl");
// initiate the connect
- int err = connect(ctx->sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr));
+ int err = connect(ctx->sock, (struct sockaddr *) &node_info->addr, sizeof(node_info->addr));
if (err != -1 || errno != EINPROGRESS)
PERROR("connect");
@@ -237,7 +242,7 @@
*/
struct render_remote *render_remote_rawio (
struct render *render,
- struct remote_node *remote_node,
+ struct remote_pool *pool_info,
void (*cb_sent)(void *arg),
void (*cb_fail)(void *arg),
void (*cb_io_data)(evutil_socket_t, short, void*),
@@ -246,7 +251,7 @@
struct render_remote *ctx;
// short-circuit error handling
- if (!(ctx = _render_remote_init(render, remote_node)))
+ if (!(ctx = _render_remote_init(render, pool_info)))
return NULL;
// store the provided callback functions
@@ -281,7 +286,7 @@
*/
struct render_remote *render_remote (
struct render *render,
- struct remote_node *remote_node,
+ struct remote_pool *pool_info,
void (*cb_sent)(void *arg),
void (*cb_data)(struct evbuffer *buf, void *arg),
void (*cb_done)(void *arg),
@@ -291,7 +296,7 @@
struct render_remote *ctx;
// short-circuit error handling
- if (!(ctx = _render_remote_init(render, remote_node)))
+ if (!(ctx = _render_remote_init(render, pool_info)))
return NULL;
// store the provided callback functions
--- a/render_remote.h Mon Jun 09 18:58:39 2008 +0300
+++ b/render_remote.h Tue Jun 17 16:39:55 2008 +0300
@@ -5,7 +5,7 @@
#include <event2/buffer.h>
#include "render.h"
-#include "remote_node.h"
+#include "remote_pool.h"
/*
* Execute a render operation on a remote render_node
@@ -48,7 +48,7 @@
*/
struct render_remote *render_remote (
struct render *render, // what to render
- struct remote_node *remote_node, // what render node to use
+ struct remote_pool *pool_info, // pick a render node from this pool
void (*cb_sent)(void *arg),
void (*cb_data)(struct evbuffer *buf, void *arg),
void (*cb_done)(void *arg),
@@ -74,7 +74,7 @@
*/
struct render_remote *render_remote_rawio (
struct render *render,
- struct remote_node *remote_node,
+ struct remote_pool *pool_info,
void (*cb_sent)(void *arg),
void (*cb_fail)(void *arg),
void (*cb_io_data)(evutil_socket_t, short, void*),
--- a/render_slices.c Mon Jun 09 18:58:39 2008 +0300
+++ b/render_slices.c Tue Jun 17 16:39:55 2008 +0300
@@ -13,16 +13,13 @@
free(ctx);
}
-int render_slices_deinit (struct render_slices *ctx) {
- // if it's not initialized, just ignore
- if (!ctx->rowbuf)
- return 1;
-
- free(ctx->rowbuf);
+void render_slices_deinit (struct render_slices *ctx) {
+ if (ctx->rowbuf) {
+ free(ctx->rowbuf);
+ ctx->rowbuf = NULL;
+ }
render_png_deinit(&ctx->png_info);
-
- return 0;
}
#define HALF(a, b) (( a + b) / 2)
@@ -113,41 +110,40 @@
int render_slices_segment_done (struct render_slices *ctx, int index) {
assert(index >= 0 && index < ctx->num_slices);
+ assert(ctx->render_row >= 0);
+ assert(ctx->segments_done >= 0 && ctx->segments_done < ctx->num_slices);
// our return status
- int status = 0;
+ int i, status = 0;
- // keep track of the number of full segments in this row
- ctx->segments_done++;
-
- // is this row complete?
- if (ctx->segments_done == ctx->num_slices) {
+ // is this row now complete?
+ if (++ctx->segments_done == ctx->num_slices) {
// the row is complete
ctx->segments_done = 0;
// once we fill up a row we can always call process_row
status |= SLICE_PROCESS_ROW;
- // is the other row still waiting for it to get processed?
+ // is the other row already rendered?
if (ctx->done_row < 0) {
- // no, point the done_row at this row, point the render buffers to the unused row
+ // yes, point the done_row at this row, point the render buffers to the unused row
ctx->done_row = ctx->render_row;
ctx->render_row = (ctx->render_row == 0 ? 1 : 0);
- int i ;
-
+ // can now continue rendering as well as call process_row
+ status |= SLICE_CONTINUE;
+
// update the remaining render buffers
for (i = 0; i < ctx->num_slices; i++) {
ctx->slices[i].info.render_buf = ctx->rows[ctx->render_row] + ctx->slices[i].row_offset;
}
-
- // can now continue rendering as well as call process_row
- status |= SLICE_CONTINUE;
} else {
// cannot continue rendering, need to have process_row complete first
+
+ ctx->render_row = -1;
+ }
- }
} else {
// the row is not complete, do not render the next segment, do not call process_row
@@ -160,15 +156,39 @@
int render_slices_process_row (struct render_slices *ctx) {
assert(ctx->done_row >= 0);
+ int i;
+
// pass the data to render_png, this results in calls to _render_multi_png_data
if (render_png_row(&ctx->png_info, ctx->rows[ctx->done_row]))
ERROR("render_png_row");
- // mark the row as processed
- ctx->done_row = -1;
-
- // ok, but don't call me again until I tell you to.
- return SLICE_CONTINUE;
+ // is the other row still in the process of being assembled?
+ if (ctx->render_row == -1) {
+ // no, both rows were full
+ ctx->render_row = ctx->done_row;
+ ctx->done_row = (ctx->done_row == 0 ? 1 : 0);
+
+ // update the remaining render buffers
+ for (i = 0; i < ctx->num_slices; i++) {
+ ctx->slices[i].info.render_buf = ctx->rows[ctx->render_row] + ctx->slices[i].row_offset;
+ }
+
+ // you can call me again, but also segment_done
+ return SLICE_PROCESS_ROW | SLICE_CONTINUE;
+
+ } else {
+ // yes, so that still needs to be finished
+ ctx->done_row = -1;
+
+ if (ctx->segments_done == 0) {
+ // that row is empty, then we can continue
+ return SLICE_CONTINUE;
+
+ } else {
+ // that row is partially built, so some segment is still on the way - don't resume the other slices until that's arrived!
+ return 0;
+ }
+ }
error:
// user needs to deinit/free us
@@ -178,7 +198,7 @@
int render_slices_done (struct render_slices *ctx) {
// finish off render_png
if (render_png_done(&ctx->png_info))
- ERROR("render_png_done");
+ goto error;
return 0;
@@ -186,3 +206,4 @@
render_slices_free(ctx);
return -1;
}
+
--- a/render_slices.h Mon Jun 09 18:58:39 2008 +0300
+++ b/render_slices.h Tue Jun 17 16:39:55 2008 +0300
@@ -57,6 +57,8 @@
*/
#define SLICE_PROCESS_ROW 0x02
+// XXX: you can't modify the slice's row buffer until these return SLICE_CONTINUE
+
/*
* The render_buf for the render_slice_info with the given index value contains
* the full information for the next row of the render_info render. Returns a
--- a/render_slices_struct.h Mon Jun 09 18:58:39 2008 +0300
+++ b/render_slices_struct.h Tue Jun 17 16:39:55 2008 +0300
@@ -42,5 +42,5 @@
};
int render_slices_init (struct render_slices *ctx, struct render *render);
-int render_slices_deinit (struct render_slices *ctx);
+void render_slices_deinit (struct render_slices *ctx);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/render_threads.c Tue Jun 17 16:39:55 2008 +0300
@@ -0,0 +1,507 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include <pthread.h>
+
+#include "common.h"
+#include "render_threads.h"
+#include "render_slices_struct.h"
+#include "render_mandelbrot.h"
+
+/*
+ * Render a mandelbrot in several slices simultaneously using threads.
+ *
+ * This works as an n:1 producer-consumer model, where we have n worker threads
+ * rendering n slices, and one manager thread that processes the completed rows.
+ *
+ * The worker threads function by first rendering a row of pixel data into memory,
+ * and then calling a function that will do something with that completed segment.
+ *
+ * * clear the can_continue marker, because the real value may change depending
+ * on what our segment does, and we don't want stale values from before we
+ * processed thse segment in it
+ * * process the segment
+ *
+ * * if the segment didn't complete a row, wait until some worker thread submits
+ * a segment that does, or the manager thread processes a row
+ * * if noone has signaled the cond in between us deciding what to do with
+ * the segment and this step,
+ * * wait on the cond
+ * * we can proceed to render the next segment
+ * * we should now be marked as continueable
+ *
+ * * if the segment completed a row, the manager thread needs to process this
+ * newly completed row.
+ * * if we can also continue on to the next row without waiting for the manager
+ * to process the row we just filled
+ * * mark all the segments as continueable
+ * * signal the cond the threads may be waiting on
+ * * increment the waiting_rows counter
+ * * signal the manager in case it's waiting
+ * * else, if noone has signaled the continue cond after we processed the
+ * segment,
+ * * wait on the cond
+ * * we can proceed to render the next segment
+ * * we should now be marked as continueable
+ *
+ * * if we have no more segments to submit
+ * * update the slices_running counter
+ * * if the counter is now zero (we were the last slice to complete)
+ * * signal the manager thread that it doesn't need to wait anymore
+ *
+ * The manager thread just sits in a loop calling process_row until all the slices
+ * have completed.
+ *
+ * * if no workers have updated the waiting_rows counter since we last processed
+ * a row, and the slices_running counter isn't zero (we still have threads
+ * running)
+ * * wait on a cond until something happens
+ *
+ * * if waiting_rows is nonzero
+ * * decrement it
+ * * process a row
+ * * if processing the row means that waiting slices can continue
+ * * mark all the segments as continueable
+ * * signal the cond the threads may be waiting on
+ *
+ * * if slices_running is zero
+ * * break out of the loop
+ *
+ * * otherwise, go back to the start again
+ */
+
+struct render_threads {
+ // do I need to free this?
+ int owned_by_me;
+
+ // info for the induvidual threads
+ struct render_thread {
+ // the slice that we need to render
+ struct render_slice_info *slice_info;
+
+ // our thread id
+ pthread_t thread;
+
+ // how many times we are allowed to continue now
+ int can_continue;
+
+ // how many segments we've written
+ int segments_done;
+
+ // ptr back to ctx
+ struct render_threads *self;
+
+ } threads[RENDER_SLICES_MAX];
+
+ // the render_slices struct
+ struct render_slices slices_info;
+
+ // the manager thread id
+ pthread_t manager_thread;
+
+ // how many slices?
+ int slice_count;
+
+ /*
+ * The worker threads must be careful to wait for the manager thread to start running before they attempt to signal
+ * the process_row_cond. This is done via this flag, in conjunction with continue_{cond,mut}.
+ *
+ * Workers should lock continue_mut, and if this flag is zero, wait on continue_cond (releasing the mutex). If the
+ * flag is already nonzero, the worker threads do not need to cond_wait and can just continue.
+ *
+ * On startup, the manager thread will lock continue_mut, set this to nonzero, signal continue_cond, and unlock
+ * continue_mut as normal.
+ */
+ int manager_running;
+
+ /*
+ * Signals to the manager
+ */
+ int slices_running;
+ int waiting_rows;
+
+ /* the inter-thread communication stuff */
+
+ // the worker threads signal this when their segment_done call indicates that the manager thread should call
+ // process_row now.
+ pthread_cond_t process_row_cond;
+ pthread_mutex_t process_row_mut;
+
+ // the worker threads and the manager thread signal this when segment_done or process_row indicate that the worker
+ // threads should call segment_done again. Worker threads that receive ~SLICE_CONTINUE from segment_done will wait
+ // on this cond.
+ pthread_cond_t continue_cond;
+ pthread_mutex_t continue_mut;
+
+ // used to protect slices_info
+ pthread_mutex_t slices_mut;
+};
+
+void render_threads_deinit (struct render_threads *ctx) {
+ render_slices_deinit(&ctx->slices_info);
+}
+
+void render_threads_free (struct render_threads *ctx) {
+ render_threads_deinit(ctx);
+
+ if (ctx->owned_by_me)
+ free(ctx);
+}
+
+int _render_threads_slice_row (void *arg, unsigned char *rowbuf) {
+ struct render_thread *subctx = arg;
+ int status, i;
+
+ // we need to handle the render_slices state atomically, so lock it
+ pthread_mutex_lock(&subctx->self->slices_mut);
+ pthread_mutex_lock(&subctx->self->continue_mut);
+
+ // make sure that we don't have any stale state in this
+ subctx->can_continue = 0;
+
+ // process the segment
+ status = render_slices_segment_done(&subctx->self->slices_info, subctx->slice_info->index);
+
+ // debugging
+/* subctx->segments_done++;
+ printf("slice %zu: segment_done, status=(%s %s), row=%d\n", subctx->slice_info->index,
+ status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
+ status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "",
+ subctx->segments_done
+ );
+*/
+
+ // done with the slices stuff
+ pthread_mutex_unlock(&subctx->self->continue_mut);
+ pthread_mutex_unlock(&subctx->self->slices_mut);
+
+ // do we need to notify the other worker threads?
+ if (status & SLICE_CONTINUE) {
+ pthread_mutex_lock(&subctx->self->continue_mut);
+
+// printf("slice %zu: signal continue\n", subctx->slice_info->index);
+
+ // mark them as continueable
+ for (i = 0; i < subctx->self->slice_count; i++)
+ subctx->self->threads[i].can_continue = 1;
+
+ // signal then to continue
+ pthread_cond_broadcast(&subctx->self->continue_cond);
+
+ pthread_mutex_unlock(&subctx->self->continue_mut);
+ }
+
+ // tell the manager thread that it can process a row
+ if (status & SLICE_PROCESS_ROW) {
+ // grab the process_row lock so that we can do stuff with it
+ pthread_mutex_lock(&subctx->self->process_row_mut);
+
+ // tell it that it has a row waiting
+ subctx->self->waiting_rows++;
+
+ // signal it in case it was waiting
+ pthread_cond_signal(&subctx->self->process_row_cond);
+
+ // release the process_row mutex
+ pthread_mutex_unlock(&subctx->self->process_row_mut);
+ }
+
+ // handle our can-continue status
+ pthread_mutex_lock(&subctx->self->continue_mut);
+
+ if (status & SLICE_CONTINUE) {
+ // don't wait for anything, we can just continue right away
+// printf("slice %zu: direct continue\n", subctx->slice_info->index);
+
+ } else {
+ if (!subctx->can_continue) {
+ // we need to wait until someone signals it
+// printf("slice %zu: waiting...\n", subctx->slice_info->index);
+
+ pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
+
+// printf("slice %zu: continue after wait\n", subctx->slice_info->index);
+
+ } else {
+ // no need to wait, because someone else took care of that before we got a chance to wait
+// printf("slice %zu: indirect continue\n", subctx->slice_info->index);
+
+ }
+ }
+
+ // we should now be marked as continueable
+ assert(subctx->can_continue);
+
+ // proceed to continue, so clear this
+ subctx->can_continue = 0;
+
+ // done handling the continue state
+ pthread_mutex_unlock(&subctx->self->continue_mut);
+
+ // row handled succesfully
+ return 0;
+}
+
+void *_render_threads_slice_func (void *arg) {
+ struct render_thread *subctx = arg;
+
+ // set up the render_info to render into local memory using the address provided via slice_info
+ // use _render_threads_slice_row to handle the completed rows
+ if (render_local_mem(subctx->slice_info->render_info,
+ &subctx->slice_info->render_buf,
+ &_render_threads_slice_row, subctx)
+ )
+ goto error;
+
+ // wait for the manager to start up
+ pthread_mutex_lock(&subctx->self->continue_mut);
+ if (!subctx->self->manager_running)
+ pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
+ pthread_mutex_unlock(&subctx->self->continue_mut);
+
+ // start rendering...
+// printf("slice %zu: start\n", subctx->slice_info->index);
+
+ if (render_mandelbrot(subctx->slice_info->render_info))
+ goto error;
+
+ // success, slice render complete
+// printf("slice %zu: done\n", subctx->slice_info->index);
+
+ // sanity checks
+ assert(subctx->self->slices_running > 0);
+
+ pthread_mutex_lock(&subctx->self->process_row_mut);
+
+ if (--subctx->self->slices_running == 0) {
+ // this was the last row, send the manager a final signal in case it's waiting
+// printf("slice %zu: signal\n", subctx->slice_info->index);
+ pthread_cond_signal(&subctx->self->process_row_cond);
+ }
+
+ pthread_mutex_unlock(&subctx->self->process_row_mut);
+
+ // successful exit
+ pthread_exit(NULL);
+
+error:
+ /* XXX: signal an error condition? Mind, with the current code, this should never happen... */
+ FATAL("a render thread failed somehow, taking down the rest of the daemon application as well");
+}
+
+void *_render_threads_manager_func (void *arg) {
+ struct render_threads *ctx = arg;
+
+ // the process_row return status
+ int status, i;
+
+ // first, mark us as running and signal any waiting workers
+ pthread_mutex_lock(&ctx->continue_mut);
+ ctx->manager_running = 1;
+ pthread_cond_broadcast(&ctx->continue_cond);
+ pthread_mutex_unlock(&ctx->continue_mut);
+
+ // needs to be locked inside the loop
+ pthread_mutex_lock(&ctx->process_row_mut);
+
+ // then we wait around and call process_row when told to
+ do {
+ // the process_row must be locked here
+// printf("manager: to wait\n");
+
+ // figure out if we need to wait
+ if (ctx->waiting_rows == 0 && ctx->slices_running > 0) {
+ // no work to do right now, so wait for some
+// printf("manager: waiting\n");
+
+ // wait until a worker thread signals us
+ pthread_cond_wait(&ctx->process_row_cond, &ctx->process_row_mut);
+ }
+
+ // make sure we actually have something to do now...
+ assert(ctx->waiting_rows > 0 || ctx->slices_running == 0);
+
+ // keep the process_row mutex locked until we've inspected/updated both vars
+
+ // do we need to process a row?
+ if (ctx->waiting_rows > 0) {
+ // claim the row for ourself
+ ctx->waiting_rows--;
+
+ // unlock the process_row mut while we handle this row
+ pthread_mutex_unlock(&ctx->process_row_mut);
+
+
+ // keep calls to render_slices synchronized
+ pthread_mutex_lock(&ctx->slices_mut);
+
+ // call into render_slices
+ status = render_slices_process_row(&ctx->slices_info);
+
+/* printf("manager: did process_row, status=(%s %s)\n",
+ status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
+ status & SLICE_CONTINUE ? "SLICE_CONTINUE" : ""
+ );
+*/
+
+ // done with render_slices
+ pthread_mutex_unlock(&ctx->slices_mut);
+
+
+ // any errors?
+ if (status == -1) {
+ goto error;
+ }
+
+ // can any waiting slices now continue?
+ if (status & SLICE_CONTINUE) {
+ // grab the continue_mut lock while we update this state
+ pthread_mutex_lock(&ctx->continue_mut);
+
+ // increment continue_count for all slices
+ for (i = 0; i < ctx->slice_count; i++)
+ ctx->threads[i].can_continue = 1;
+
+// printf("manager: signal continue\n");
+
+ // signal any waiting worker threads to continue, giving them a chance to call cond_wait first.
+ pthread_cond_broadcast(&ctx->continue_cond);
+
+ // done with that
+ pthread_mutex_unlock(&ctx->continue_mut);
+ }
+
+ // XXX: ignore SLICE_PROCESS_ROW
+
+ // grab our mutex again
+ pthread_mutex_lock(&ctx->process_row_mut);
+ }
+
+ // are all the slices done now?
+ if (ctx->slices_running == 0) {
+ // unlock the mutex so the worker threads don't get stuck on it
+ pthread_mutex_unlock(&ctx->process_row_mut);
+
+ break;
+ }
+
+ // keep the process_row mutex locked at the top of the loop
+ } while (1);
+
+ // all slices are done now
+ if (render_slices_done(&ctx->slices_info))
+ goto error;
+
+// printf("manager: done\n");
+
+ // reap all the workers
+ void *retval;
+
+ for (i = 0; i < ctx->slice_count; i++) {
+ if (pthread_join(ctx->threads[i].thread, &retval))
+ PERROR("pthread_join");
+
+ // XXX: assume they don't get canceled (SIGSEV etc?)
+ assert(retval == NULL);
+ }
+
+ // ok, exit ourself
+ pthread_exit(NULL);
+
+error:
+ /* XXX: handle errors gracefully... */
+ FATAL("the threaded render operation failed, taking down the rest of the daemon application as well...");
+}
+
+int render_threads_init (struct render_threads *ctx, struct render *render_info) {
+ // break the render operation down into induvidual slices
+ if (render_slices_init(&ctx->slices_info, render_info))
+ goto error;
+
+ // init the mutexes (this should never fail)
+ assert(!(
+ pthread_mutex_init(&ctx->process_row_mut, NULL)
+ || pthread_mutex_init(&ctx->continue_mut, NULL)
+ || pthread_mutex_init(&ctx->slices_mut, NULL)
+ ));
+
+ // init the conds (this should never fail)
+ assert(!(
+ pthread_cond_init(&ctx->process_row_cond, NULL)
+ || pthread_cond_init(&ctx->continue_cond, NULL)
+ ));
+
+ // how many slices?
+ ctx->slices_running = ctx->slice_count = render_slices_get_count(&ctx->slices_info);
+
+ // spawn a thread for each slice
+ int index;
+ for (index = 0; index < ctx->slice_count; index++) {
+ // the self ptr...
+ ctx->threads[index].self = ctx;
+
+ // fetch the slice info
+ ctx->threads[index].slice_info = render_slices_get_slice_info(&ctx->slices_info, index);
+
+ // spawn the thread
+ if (pthread_create(&ctx->threads[index].thread, NULL, &_render_threads_slice_func, &ctx->threads[index]))
+ PERROR("pthread_create(slice_func:%d)", index);
+ }
+
+ // init the attrs for the manager thread
+ pthread_attr_t manager_attrs;
+
+ if (
+ pthread_attr_init(&manager_attrs)
+// || pthread_setdetachstate(&manager_attrs, PTHREAD_CREATE_DETACHED)
+ )
+ PERROR("pthread_attr_init/pthread_attr_setdetachstate");
+
+ // spawn the manager thread
+ if (pthread_create(&ctx->manager_thread, &manager_attrs, &_render_threads_manager_func, ctx))
+ PERROR("pthread_create(manager_func)");
+
+ // success
+ return 0;
+
+error:
+ render_threads_deinit(ctx);
+ return -1;
+}
+
+struct render_threads *render_threads_alloc (struct render *render_info) {
+ struct render_threads *ctx = NULL;
+
+ if (!(ctx = calloc(1, sizeof(*ctx))))
+ ERROR("calloc");
+
+ // init with silent fall-through
+ if (render_threads_init(ctx, render_info))
+ goto error;
+
+ // success
+ return ctx;
+
+error:
+ return NULL;
+}
+
+int render_threads_wait (struct render_threads *ctx) {
+ void *retval;
+
+ if (pthread_join(ctx->manager_thread, &retval))
+ PERROR("pthread_join");
+
+ if (retval == PTHREAD_CANCELED)
+ ERROR("manager thread was canceled");
+ else if (retval != NULL)
+ ERROR("manager thread exited with unknown return value");
+
+ // ok, success
+ return 0;
+
+error:
+ // caller needs to free ctx
+ return -1;
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/render_threads.h Tue Jun 17 16:39:55 2008 +0300
@@ -0,0 +1,14 @@
+#ifndef RENDER_THREADS_H
+#define RENDER_THREADS_H
+
+#include "render.h"
+
+struct render_threads;
+
+struct render_threads *render_threads_alloc (struct render *render_info);
+
+int render_threads_wait (struct render_threads *ctx);
+
+void render_threads_free (struct render_threads *ctx);
+
+#endif /* RENDER_THREADS_H */
--- a/web_main.c Mon Jun 09 18:58:39 2008 +0300
+++ b/web_main.c Tue Jun 17 16:39:55 2008 +0300
@@ -44,7 +44,7 @@
int headers_sent;
- struct render_multi *render_info;
+ struct render_remote *render_info;
size_t bytes_sent;
@@ -56,7 +56,7 @@
static void _render_cleanup (struct render_request *ctx) {
if (ctx->render_info)
- render_multi_free(ctx->render_info);
+ render_remote_free(ctx->render_info);
free(ctx);
}
@@ -81,7 +81,7 @@
size_t buf_size = EVBUFFER_LENGTH(buf);
- assert(buf_size > 0); // shouldn't happen anymore with the new render_multi
+ assert(buf_size > 0); // shouldn't happen anymore with the new render_remote
// check if we are paused
if (ctx->paused) {
@@ -137,7 +137,7 @@
printf("render [%p]: lost http connection\n", ctx);
// cancel
- render_multi_cancel(ctx->render_info);
+ render_remote_cancel(ctx->render_info);
ctx->render_info = NULL;
_render_cleanup(ctx);
@@ -152,7 +152,7 @@
ctx->paused = 0;
// any data waiting in the buffer?
- render_multi_flush(ctx->render_info);
+ render_remote_flush(ctx->render_info);
}
static void _http_render_execute (struct evhttp_request *request, u_int32_t img_w, u_int32_t img_h) {
@@ -179,17 +179,17 @@
ERROR("render_region_full");
// initiate the remote render operation
- if ((ctx->render_info = render_multi(&render, &remote_pool,
+ if ((ctx->render_info = render_remote(&render, &remote_pool,
&_render_sent,
&_render_data,
&_render_done,
&_render_fail,
ctx
)) == NULL)
- ERROR("render_multi");
+ ERROR("render_remote");
// set chunk size
- render_multi_set_recv(ctx->render_info, MIN_CHUNK_SIZE, OVERFLOW_BUFFER);
+ render_remote_set_recv(ctx->render_info, MIN_CHUNK_SIZE, OVERFLOW_BUFFER);
// set close cb
evhttp_set_reply_abortcb(request, &_render_http_lost, ctx);