a working threaded sliced render, plus modifications to other modules to use this in web_main
committer: Tero Marttila <terom@fixme.fi>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include "common.h"
#include "render_threads.h"
#include "render_slices_struct.h"
#include "render_mandelbrot.h"
/*
* Render a mandelbrot in several slices simultaneously using threads.
*
* This works as an n:1 producer-consumer model, where we have n worker threads
* rendering n slices, and one manager thread that processes the completed rows.
*
* The worker threads function by first rendering a row of pixel data into memory,
* and then calling a function that will do something with that completed segment.
*
* * clear the can_continue marker, because the real value may change depending
* on what our segment does, and we don't want stale values from before we
* processed thse segment in it
* * process the segment
*
* * if the segment didn't complete a row, wait until some worker thread submits
* a segment that does, or the manager thread processes a row
* * if noone has signaled the cond in between us deciding what to do with
* the segment and this step,
* * wait on the cond
* * we can proceed to render the next segment
* * we should now be marked as continueable
*
* * if the segment completed a row, the manager thread needs to process this
* newly completed row.
* * if we can also continue on to the next row without waiting for the manager
* to process the row we just filled
* * mark all the segments as continueable
* * signal the cond the threads may be waiting on
* * increment the waiting_rows counter
* * signal the manager in case it's waiting
* * else, if noone has signaled the continue cond after we processed the
* segment,
* * wait on the cond
* * we can proceed to render the next segment
* * we should now be marked as continueable
*
* * if we have no more segments to submit
* * update the slices_running counter
* * if the counter is now zero (we were the last slice to complete)
* * signal the manager thread that it doesn't need to wait anymore
*
* The manager thread just sits in a loop calling process_row until all the slices
* have completed.
*
* * if no workers have updated the waiting_rows counter since we last processed
* a row, and the slices_running counter isn't zero (we still have threads
* running)
* * wait on a cond until something happens
*
* * if waiting_rows is nonzero
* * decrement it
* * process a row
* * if processing the row means that waiting slices can continue
* * mark all the segments as continueable
* * signal the cond the threads may be waiting on
*
* * if slices_running is zero
* * break out of the loop
*
* * otherwise, go back to the start again
*/
struct render_threads {
// do I need to free this?
int owned_by_me;
// info for the induvidual threads
struct render_thread {
// the slice that we need to render
struct render_slice_info *slice_info;
// our thread id
pthread_t thread;
// how many times we are allowed to continue now
int can_continue;
// how many segments we've written
int segments_done;
// ptr back to ctx
struct render_threads *self;
} threads[RENDER_SLICES_MAX];
// the render_slices struct
struct render_slices slices_info;
// the manager thread id
pthread_t manager_thread;
// how many slices?
int slice_count;
/*
* The worker threads must be careful to wait for the manager thread to start running before they attempt to signal
* the process_row_cond. This is done via this flag, in conjunction with continue_{cond,mut}.
*
* Workers should lock continue_mut, and if this flag is zero, wait on continue_cond (releasing the mutex). If the
* flag is already nonzero, the worker threads do not need to cond_wait and can just continue.
*
* On startup, the manager thread will lock continue_mut, set this to nonzero, signal continue_cond, and unlock
* continue_mut as normal.
*/
int manager_running;
/*
* Signals to the manager
*/
int slices_running;
int waiting_rows;
/* the inter-thread communication stuff */
// the worker threads signal this when their segment_done call indicates that the manager thread should call
// process_row now.
pthread_cond_t process_row_cond;
pthread_mutex_t process_row_mut;
// the worker threads and the manager thread signal this when segment_done or process_row indicate that the worker
// threads should call segment_done again. Worker threads that receive ~SLICE_CONTINUE from segment_done will wait
// on this cond.
pthread_cond_t continue_cond;
pthread_mutex_t continue_mut;
// used to protect slices_info
pthread_mutex_t slices_mut;
};
void render_threads_deinit (struct render_threads *ctx) {
render_slices_deinit(&ctx->slices_info);
}
void render_threads_free (struct render_threads *ctx) {
render_threads_deinit(ctx);
if (ctx->owned_by_me)
free(ctx);
}
int _render_threads_slice_row (void *arg, unsigned char *rowbuf) {
struct render_thread *subctx = arg;
int status, i;
// we need to handle the render_slices state atomically, so lock it
pthread_mutex_lock(&subctx->self->slices_mut);
pthread_mutex_lock(&subctx->self->continue_mut);
// make sure that we don't have any stale state in this
subctx->can_continue = 0;
// process the segment
status = render_slices_segment_done(&subctx->self->slices_info, subctx->slice_info->index);
// debugging
/* subctx->segments_done++;
printf("slice %zu: segment_done, status=(%s %s), row=%d\n", subctx->slice_info->index,
status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "",
subctx->segments_done
);
*/
// done with the slices stuff
pthread_mutex_unlock(&subctx->self->continue_mut);
pthread_mutex_unlock(&subctx->self->slices_mut);
// do we need to notify the other worker threads?
if (status & SLICE_CONTINUE) {
pthread_mutex_lock(&subctx->self->continue_mut);
// printf("slice %zu: signal continue\n", subctx->slice_info->index);
// mark them as continueable
for (i = 0; i < subctx->self->slice_count; i++)
subctx->self->threads[i].can_continue = 1;
// signal then to continue
pthread_cond_broadcast(&subctx->self->continue_cond);
pthread_mutex_unlock(&subctx->self->continue_mut);
}
// tell the manager thread that it can process a row
if (status & SLICE_PROCESS_ROW) {
// grab the process_row lock so that we can do stuff with it
pthread_mutex_lock(&subctx->self->process_row_mut);
// tell it that it has a row waiting
subctx->self->waiting_rows++;
// signal it in case it was waiting
pthread_cond_signal(&subctx->self->process_row_cond);
// release the process_row mutex
pthread_mutex_unlock(&subctx->self->process_row_mut);
}
// handle our can-continue status
pthread_mutex_lock(&subctx->self->continue_mut);
if (status & SLICE_CONTINUE) {
// don't wait for anything, we can just continue right away
// printf("slice %zu: direct continue\n", subctx->slice_info->index);
} else {
if (!subctx->can_continue) {
// we need to wait until someone signals it
// printf("slice %zu: waiting...\n", subctx->slice_info->index);
pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
// printf("slice %zu: continue after wait\n", subctx->slice_info->index);
} else {
// no need to wait, because someone else took care of that before we got a chance to wait
// printf("slice %zu: indirect continue\n", subctx->slice_info->index);
}
}
// we should now be marked as continueable
assert(subctx->can_continue);
// proceed to continue, so clear this
subctx->can_continue = 0;
// done handling the continue state
pthread_mutex_unlock(&subctx->self->continue_mut);
// row handled succesfully
return 0;
}
void *_render_threads_slice_func (void *arg) {
struct render_thread *subctx = arg;
// set up the render_info to render into local memory using the address provided via slice_info
// use _render_threads_slice_row to handle the completed rows
if (render_local_mem(subctx->slice_info->render_info,
&subctx->slice_info->render_buf,
&_render_threads_slice_row, subctx)
)
goto error;
// wait for the manager to start up
pthread_mutex_lock(&subctx->self->continue_mut);
if (!subctx->self->manager_running)
pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
pthread_mutex_unlock(&subctx->self->continue_mut);
// start rendering...
// printf("slice %zu: start\n", subctx->slice_info->index);
if (render_mandelbrot(subctx->slice_info->render_info))
goto error;
// success, slice render complete
// printf("slice %zu: done\n", subctx->slice_info->index);
// sanity checks
assert(subctx->self->slices_running > 0);
pthread_mutex_lock(&subctx->self->process_row_mut);
if (--subctx->self->slices_running == 0) {
// this was the last row, send the manager a final signal in case it's waiting
// printf("slice %zu: signal\n", subctx->slice_info->index);
pthread_cond_signal(&subctx->self->process_row_cond);
}
pthread_mutex_unlock(&subctx->self->process_row_mut);
// successful exit
pthread_exit(NULL);
error:
/* XXX: signal an error condition? Mind, with the current code, this should never happen... */
FATAL("a render thread failed somehow, taking down the rest of the daemon application as well");
}
void *_render_threads_manager_func (void *arg) {
struct render_threads *ctx = arg;
// the process_row return status
int status, i;
// first, mark us as running and signal any waiting workers
pthread_mutex_lock(&ctx->continue_mut);
ctx->manager_running = 1;
pthread_cond_broadcast(&ctx->continue_cond);
pthread_mutex_unlock(&ctx->continue_mut);
// needs to be locked inside the loop
pthread_mutex_lock(&ctx->process_row_mut);
// then we wait around and call process_row when told to
do {
// the process_row must be locked here
// printf("manager: to wait\n");
// figure out if we need to wait
if (ctx->waiting_rows == 0 && ctx->slices_running > 0) {
// no work to do right now, so wait for some
// printf("manager: waiting\n");
// wait until a worker thread signals us
pthread_cond_wait(&ctx->process_row_cond, &ctx->process_row_mut);
}
// make sure we actually have something to do now...
assert(ctx->waiting_rows > 0 || ctx->slices_running == 0);
// keep the process_row mutex locked until we've inspected/updated both vars
// do we need to process a row?
if (ctx->waiting_rows > 0) {
// claim the row for ourself
ctx->waiting_rows--;
// unlock the process_row mut while we handle this row
pthread_mutex_unlock(&ctx->process_row_mut);
// keep calls to render_slices synchronized
pthread_mutex_lock(&ctx->slices_mut);
// call into render_slices
status = render_slices_process_row(&ctx->slices_info);
/* printf("manager: did process_row, status=(%s %s)\n",
status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
status & SLICE_CONTINUE ? "SLICE_CONTINUE" : ""
);
*/
// done with render_slices
pthread_mutex_unlock(&ctx->slices_mut);
// any errors?
if (status == -1) {
goto error;
}
// can any waiting slices now continue?
if (status & SLICE_CONTINUE) {
// grab the continue_mut lock while we update this state
pthread_mutex_lock(&ctx->continue_mut);
// increment continue_count for all slices
for (i = 0; i < ctx->slice_count; i++)
ctx->threads[i].can_continue = 1;
// printf("manager: signal continue\n");
// signal any waiting worker threads to continue, giving them a chance to call cond_wait first.
pthread_cond_broadcast(&ctx->continue_cond);
// done with that
pthread_mutex_unlock(&ctx->continue_mut);
}
// XXX: ignore SLICE_PROCESS_ROW
// grab our mutex again
pthread_mutex_lock(&ctx->process_row_mut);
}
// are all the slices done now?
if (ctx->slices_running == 0) {
// unlock the mutex so the worker threads don't get stuck on it
pthread_mutex_unlock(&ctx->process_row_mut);
break;
}
// keep the process_row mutex locked at the top of the loop
} while (1);
// all slices are done now
if (render_slices_done(&ctx->slices_info))
goto error;
// printf("manager: done\n");
// reap all the workers
void *retval;
for (i = 0; i < ctx->slice_count; i++) {
if (pthread_join(ctx->threads[i].thread, &retval))
PERROR("pthread_join");
// XXX: assume they don't get canceled (SIGSEV etc?)
assert(retval == NULL);
}
// ok, exit ourself
pthread_exit(NULL);
error:
/* XXX: handle errors gracefully... */
FATAL("the threaded render operation failed, taking down the rest of the daemon application as well...");
}
int render_threads_init (struct render_threads *ctx, struct render *render_info) {
// break the render operation down into induvidual slices
if (render_slices_init(&ctx->slices_info, render_info))
goto error;
// init the mutexes (this should never fail)
assert(!(
pthread_mutex_init(&ctx->process_row_mut, NULL)
|| pthread_mutex_init(&ctx->continue_mut, NULL)
|| pthread_mutex_init(&ctx->slices_mut, NULL)
));
// init the conds (this should never fail)
assert(!(
pthread_cond_init(&ctx->process_row_cond, NULL)
|| pthread_cond_init(&ctx->continue_cond, NULL)
));
// how many slices?
ctx->slices_running = ctx->slice_count = render_slices_get_count(&ctx->slices_info);
// spawn a thread for each slice
int index;
for (index = 0; index < ctx->slice_count; index++) {
// the self ptr...
ctx->threads[index].self = ctx;
// fetch the slice info
ctx->threads[index].slice_info = render_slices_get_slice_info(&ctx->slices_info, index);
// spawn the thread
if (pthread_create(&ctx->threads[index].thread, NULL, &_render_threads_slice_func, &ctx->threads[index]))
PERROR("pthread_create(slice_func:%d)", index);
}
// init the attrs for the manager thread
pthread_attr_t manager_attrs;
if (
pthread_attr_init(&manager_attrs)
// || pthread_setdetachstate(&manager_attrs, PTHREAD_CREATE_DETACHED)
)
PERROR("pthread_attr_init/pthread_attr_setdetachstate");
// spawn the manager thread
if (pthread_create(&ctx->manager_thread, &manager_attrs, &_render_threads_manager_func, ctx))
PERROR("pthread_create(manager_func)");
// success
return 0;
error:
render_threads_deinit(ctx);
return -1;
}
struct render_threads *render_threads_alloc (struct render *render_info) {
struct render_threads *ctx = NULL;
if (!(ctx = calloc(1, sizeof(*ctx))))
ERROR("calloc");
// init with silent fall-through
if (render_threads_init(ctx, render_info))
goto error;
// success
return ctx;
error:
return NULL;
}
int render_threads_wait (struct render_threads *ctx) {
void *retval;
if (pthread_join(ctx->manager_thread, &retval))
PERROR("pthread_join");
if (retval == PTHREAD_CANCELED)
ERROR("manager thread was canceled");
else if (retval != NULL)
ERROR("manager thread exited with unknown return value");
// ok, success
return 0;
error:
// caller needs to free ctx
return -1;
}