#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
//#define DEBUG_ENABLED
#include "common.h"
#include "render_threads.h"
#include "render_slices_struct.h"
#include "render_mandelbrot.h"
/*
* Render a mandelbrot in several slices simultaneously using threads.
*
* This works as an n:1 producer-consumer model, where we have n worker threads
* rendering n slices, and one manager thread that processes the completed rows.
*
* The worker threads function by first rendering a row of pixel data into memory,
* and then calling a function that will do something with that completed segment.
*
* * clear the can_continue marker, because the real value may change depending
* on what our segment does, and we don't want stale values from before we
* processed thse segment in it
* * process the segment
*
* * if the segment didn't complete a row, wait until some worker thread submits
* a segment that does, or the manager thread processes a row
* * if noone has signaled the cond in between us deciding what to do with
* the segment and this step,
* * wait on the cond
* * we can proceed to render the next segment
* * we should now be marked as continueable
*
* * if the segment completed a row, the manager thread needs to process this
* newly completed row.
* * if we can also continue on to the next row without waiting for the manager
* to process the row we just filled
* * mark all the segments as continueable
* * signal the cond the threads may be waiting on
* * increment the waiting_rows counter
* * signal the manager in case it's waiting
* * else, if noone has signaled the continue cond after we processed the
* segment,
* * wait on the cond
* * we can proceed to render the next segment
* * we should now be marked as continueable
*
* * if we have no more segments to submit
* * update the slices_running counter
* * if the counter is now zero (we were the last slice to complete)
* * signal the manager thread that it doesn't need to wait anymore
*
* The manager thread just sits in a loop calling process_row until all the slices
* have completed.
*
* * if no workers have updated the waiting_rows counter since we last processed
* a row, and the slices_running counter isn't zero (we still have threads
* running)
* * wait on a cond until something happens
*
* * if waiting_rows is nonzero
* * decrement it
* * process a row
* * if processing the row means that waiting slices can continue
* * mark all the segments as continueable
* * signal the cond the threads may be waiting on
*
* * if slices_running is zero
* * break out of the loop
*
* * otherwise, go back to the start again
*/
struct render_threads {
// do I need to free this?
int owned_by_me;
// info for the induvidual threads
struct render_thread {
// the slice that we need to render
struct render_slice_info *slice_info;
// our thread id
pthread_t thread;
// how many times we are allowed to continue now
int can_continue;
// how many segments we've written
int segments_done;
// ptr back to ctx
struct render_threads *self;
} threads[RENDER_SLICES_MAX];
// the render_slices struct
struct render_slices slices_info;
// the manager thread id
pthread_t manager_thread;
// how many slices?
int slice_count;
/*
* The worker threads must be careful to wait for the manager thread to start running before they attempt to signal
* the process_row_cond. This is done via this flag, in conjunction with continue_{cond,mut}.
*
* Workers should lock continue_mut, and if this flag is zero, wait on continue_cond (releasing the mutex). If the
* flag is already nonzero, the worker threads do not need to cond_wait and can just continue.
*
* On startup, the manager thread will lock continue_mut, set this to nonzero, signal continue_cond, and unlock
* continue_mut as normal.
*/
int manager_running;
/*
* Signals to the manager
*/
int slices_running;
int waiting_rows;
/* the inter-thread communication stuff */
// the worker threads signal this when their segment_done call indicates that the manager thread should call
// process_row now.
pthread_cond_t process_row_cond;
pthread_mutex_t process_row_mut;
// the worker threads and the manager thread signal this when segment_done or process_row indicate that the worker
// threads should call segment_done again. Worker threads that receive ~SLICE_CONTINUE from segment_done will wait
// on this cond.
pthread_cond_t continue_cond;
pthread_mutex_t continue_mut;
// used to protect slices_info
pthread_mutex_t slices_mut;
/*
* Return value from the manager thread
*/
enum render_threads_retval {
RETVAL_SUCCESS,
RETVAL_FAILURE,
} manager_retval;
};
void render_threads_deinit (struct render_threads *ctx) {
render_slices_deinit(&ctx->slices_info);
}
void render_threads_free (struct render_threads *ctx) {
render_threads_deinit(ctx);
if (ctx->owned_by_me)
free(ctx);
}
int _render_threads_slice_row (void *arg, unsigned char *rowbuf) {
struct render_thread *subctx = arg;
int status, i;
// test for cancelation, we don't hold any locks right now
pthread_testcancel();
// we need to handle the render_slices state atomically, so lock it
pthread_mutex_lock(&subctx->self->slices_mut);
pthread_mutex_lock(&subctx->self->continue_mut);
// make sure that we don't have any stale state in this
subctx->can_continue = 0;
// process the segment
status = render_slices_segment_done(&subctx->self->slices_info, subctx->slice_info->index);
// debugging
DEBUG("slice %zu: segment_done, status=(%s %s), row=%d", subctx->slice_info->index,
status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "",
++subctx->segments_done
);
// do we need to notify the other worker threads?
if (status & SLICE_CONTINUE) {
DEBUG("slice %zu: signal continue", subctx->slice_info->index);
// mark them as continueable
for (i = 0; i < subctx->self->slice_count; i++)
subctx->self->threads[i].can_continue = 1;
// signal then to continue
pthread_cond_broadcast(&subctx->self->continue_cond);
}
// done with the slices stuff
pthread_mutex_unlock(&subctx->self->continue_mut);
pthread_mutex_unlock(&subctx->self->slices_mut);
// tell the manager thread that it can process a row
if (status & SLICE_PROCESS_ROW) {
// grab the process_row lock so that we can do stuff with it
pthread_mutex_lock(&subctx->self->process_row_mut);
// tell it that it has a row waiting
subctx->self->waiting_rows++;
// signal it in case it was waiting
pthread_cond_signal(&subctx->self->process_row_cond);
// release the process_row mutex
pthread_mutex_unlock(&subctx->self->process_row_mut);
}
#define BEGIN_MUTEX(mut) pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &(mut)); pthread_mutex_lock(&(mut));
#define END_MUTEX pthread_cleanup_pop(1);
// handle our can-continue status
// we need to release this lock if we get canceled in pthread_cond_wait
BEGIN_MUTEX(subctx->self->continue_mut)
if (status & SLICE_CONTINUE) {
// don't wait for anything, we can just continue right away
DEBUG("slice %zu: direct continue", subctx->slice_info->index);
} else {
if (!subctx->can_continue) {
// we need to wait until someone signals it
DEBUG("slice %zu: waiting...", subctx->slice_info->index);
pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
DEBUG("slice %zu: continue after wait", subctx->slice_info->index);
} else {
// no need to wait, because someone else took care of that before we got a chance to wait
DEBUG("slice %zu: indirect continue", subctx->slice_info->index);
}
}
// we should now be marked as continueable
assert(subctx->can_continue);
// proceed to continue, so clear this
subctx->can_continue = 0;
// done handling the continue state
pthread_mutex_unlock(&subctx->self->continue_mut);
END_MUTEX
// row handled succesfully
return 0;
}
void *_render_threads_slice_func (void *arg) {
struct render_thread *subctx = arg;
// set up the render_info to render into local memory using the address provided via slice_info
// use _render_threads_slice_row to handle the completed rows
if (render_local_mem(subctx->slice_info->render_info,
&subctx->slice_info->render_buf,
&_render_threads_slice_row, subctx)
)
goto error;
// wait for the manager to start up
pthread_mutex_lock(&subctx->self->continue_mut);
if (!subctx->self->manager_running)
pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
pthread_mutex_unlock(&subctx->self->continue_mut);
// start rendering...
DEBUG("slice %zu: start", subctx->slice_info->index);
if (render_mandelbrot(subctx->slice_info->render_info))
goto error;
// success, slice render complete
DEBUG("slice %zu: done", subctx->slice_info->index);
// sanity checks
assert(subctx->self->slices_running > 0);
pthread_mutex_lock(&subctx->self->process_row_mut);
if (--subctx->self->slices_running == 0) {
// this was the last row, send the manager a final signal in case it's waiting
DEBUG("slice %zu: signal", subctx->slice_info->index);
pthread_cond_signal(&subctx->self->process_row_cond);
}
pthread_mutex_unlock(&subctx->self->process_row_mut);
// successful exit
pthread_exit(NULL);
error:
/* XXX: signal an error condition? Mind, with the current code, this should never happen... */
FATAL("a render thread failed somehow, taking down the rest of the daemon application as well");
}
void *_render_threads_manager_func (void *arg) {
struct render_threads *ctx = arg;
// the process_row return status
int status, i;
// first, mark us as running and signal any waiting workers
pthread_mutex_lock(&ctx->continue_mut);
ctx->manager_running = 1;
pthread_cond_broadcast(&ctx->continue_cond);
pthread_mutex_unlock(&ctx->continue_mut);
// needs to be locked inside the loop
pthread_mutex_lock(&ctx->process_row_mut);
// then we wait around and call process_row when told to
do {
// the process_row must be locked here
DEBUG("manager: to wait");
// figure out if we need to wait
if (ctx->waiting_rows == 0 && ctx->slices_running > 0) {
// no work to do right now, so wait for some
DEBUG("manager: waiting");
// wait until a worker thread signals us
pthread_cond_wait(&ctx->process_row_cond, &ctx->process_row_mut);
}
// make sure we actually have something to do now...
assert(ctx->waiting_rows > 0 || ctx->slices_running == 0);
// keep the process_row mutex locked until we've inspected/updated both vars
// do we need to process a row?
if (ctx->waiting_rows > 0) {
// claim the row for ourself
ctx->waiting_rows--;
// unlock the process_row mut while we handle this row
pthread_mutex_unlock(&ctx->process_row_mut);
// keep calls to render_slices synchronized
pthread_mutex_lock(&ctx->slices_mut);
// call into render_slices
status = render_slices_process_row(&ctx->slices_info);
DEBUG("manager: did process_row, status=(%s %s)",
status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "",
status & SLICE_CONTINUE ? "SLICE_CONTINUE" : ""
);
// any errors?
if (status == -1) {
pthread_mutex_unlock(&ctx->slices_mut);
goto error;
}
// can any waiting slices now continue?
if (status & SLICE_CONTINUE) {
// grab the continue_mut lock while we update this state
pthread_mutex_lock(&ctx->continue_mut);
// set can_continue for all slices
for (i = 0; i < ctx->slice_count; i++)
ctx->threads[i].can_continue = 1;
DEBUG("manager: signal continue");
// signal any waiting worker threads to continue, giving them a chance to call cond_wait first.
pthread_cond_broadcast(&ctx->continue_cond);
// done with that
pthread_mutex_unlock(&ctx->continue_mut);
}
// done with render_slices
pthread_mutex_unlock(&ctx->slices_mut);
// XXX: ignore SLICE_PROCESS_ROW
// grab our mutex again
pthread_mutex_lock(&ctx->process_row_mut);
}
// are all the rows and slices done now?
if (ctx->slices_running == 0 && ctx->waiting_rows == 0) {
// unlock the mutex so the worker threads don't get stuck on it
pthread_mutex_unlock(&ctx->process_row_mut);
break;
}
// keep the process_row mutex locked at the top of the loop
} while (1);
// all slices are done now
if (render_slices_done(&ctx->slices_info))
goto error;
DEBUG("manager: done");
void *retval;
// reap all the workers
for (i = 0; i < ctx->slice_count; i++) {
if (pthread_join(ctx->threads[i].thread, &retval))
PERROR("pthread_join");
// XXX: assume they don't get canceled
assert(retval == NULL);
}
// succesfull return
ctx->manager_retval = RETVAL_SUCCESS;
goto exit;
error:
/* cancel and join all worker threads */
WARNING("canceling and reaping all threads");
for (i = 0; i < ctx->slice_count; i++) {
if (pthread_cancel(ctx->threads[i].thread))
PWARNING("pthread_cancel(worker:%d)", i);
// join it in any case
if (pthread_join(ctx->threads[i].thread, &retval)) {
PWARNING("pthread_join(worker:%d)", i);
} else {
if (retval != PTHREAD_CANCELED)
PWARNING("worker:%d retval = %p", i, retval);
}
}
// failure
ctx->manager_retval = RETVAL_FAILURE;
exit:
// ok, exit ourself
pthread_exit(ctx);
}
int render_threads_init (struct render_threads *ctx, struct render *render_info) {
// break the render operation down into induvidual slices
if (render_slices_init(&ctx->slices_info, render_info))
goto error;
// init the mutexes (this should never fail)
assert(!(
pthread_mutex_init(&ctx->process_row_mut, NULL)
|| pthread_mutex_init(&ctx->continue_mut, NULL)
|| pthread_mutex_init(&ctx->slices_mut, NULL)
));
// init the conds (this should never fail)
assert(!(
pthread_cond_init(&ctx->process_row_cond, NULL)
|| pthread_cond_init(&ctx->continue_cond, NULL)
));
// how many slices?
ctx->slices_running = ctx->slice_count = render_slices_get_count(&ctx->slices_info);
// spawn a thread for each slice
int index;
for (index = 0; index < ctx->slice_count; index++) {
// the self ptr...
ctx->threads[index].self = ctx;
// fetch the slice info
ctx->threads[index].slice_info = render_slices_get_slice_info(&ctx->slices_info, index);
// spawn the thread
if (pthread_create(&ctx->threads[index].thread, NULL, &_render_threads_slice_func, &ctx->threads[index]))
PERROR("pthread_create(slice_func:%d)", index);
}
// init the attrs for the manager thread
pthread_attr_t manager_attrs;
if (
pthread_attr_init(&manager_attrs)
// || pthread_setdetachstate(&manager_attrs, PTHREAD_CREATE_DETACHED)
)
PERROR("pthread_attr_init/pthread_attr_setdetachstate");
// spawn the manager thread
if (pthread_create(&ctx->manager_thread, &manager_attrs, &_render_threads_manager_func, ctx))
PERROR("pthread_create(manager_func)");
// success
return 0;
error:
render_threads_deinit(ctx);
return -1;
}
struct render_threads *render_threads_alloc (struct render *render_info) {
struct render_threads *ctx = NULL;
if (!(ctx = calloc(1, sizeof(*ctx))))
ERROR("calloc");
// flag it for free()ing
ctx->owned_by_me = 1;
// init with silent fall-through
if (render_threads_init(ctx, render_info))
goto error;
// success
return ctx;
error:
return NULL;
}
int render_threads_wait (struct render_threads *ctx) {
void *retval;
if (pthread_join(ctx->manager_thread, &retval))
PERROR("pthread_join");
if (retval == PTHREAD_CANCELED)
ERROR("manager thread was canceled");
else if (retval != ctx)
ERROR("manager thread exited with unknown return value");
else {
if (ctx->manager_retval != RETVAL_SUCCESS) {
ERROR("manger thread exited with an error");
}
}
// ok, success
return 0;
error:
// caller needs to free ctx
return -1;
}