terom@19: #include terom@19: #include terom@19: terom@19: #include terom@19: terom@23: //#define DEBUG_ENABLED terom@20: terom@19: #include "common.h" terom@19: #include "render_threads.h" terom@19: #include "render_slices_struct.h" terom@19: #include "render_mandelbrot.h" terom@19: terom@19: /* terom@19: * Render a mandelbrot in several slices simultaneously using threads. terom@19: * terom@19: * This works as an n:1 producer-consumer model, where we have n worker threads terom@19: * rendering n slices, and one manager thread that processes the completed rows. terom@19: * terom@19: * The worker threads function by first rendering a row of pixel data into memory, terom@19: * and then calling a function that will do something with that completed segment. terom@19: * terom@19: * * clear the can_continue marker, because the real value may change depending terom@19: * on what our segment does, and we don't want stale values from before we terom@19: * processed thse segment in it terom@19: * * process the segment terom@19: * terom@19: * * if the segment didn't complete a row, wait until some worker thread submits terom@19: * a segment that does, or the manager thread processes a row terom@19: * * if noone has signaled the cond in between us deciding what to do with terom@19: * the segment and this step, terom@19: * * wait on the cond terom@19: * * we can proceed to render the next segment terom@19: * * we should now be marked as continueable terom@19: * terom@19: * * if the segment completed a row, the manager thread needs to process this terom@19: * newly completed row. terom@19: * * if we can also continue on to the next row without waiting for the manager terom@19: * to process the row we just filled terom@19: * * mark all the segments as continueable terom@19: * * signal the cond the threads may be waiting on terom@19: * * increment the waiting_rows counter terom@19: * * signal the manager in case it's waiting terom@19: * * else, if noone has signaled the continue cond after we processed the terom@19: * segment, terom@19: * * wait on the cond terom@19: * * we can proceed to render the next segment terom@19: * * we should now be marked as continueable terom@19: * terom@19: * * if we have no more segments to submit terom@19: * * update the slices_running counter terom@19: * * if the counter is now zero (we were the last slice to complete) terom@19: * * signal the manager thread that it doesn't need to wait anymore terom@19: * terom@19: * The manager thread just sits in a loop calling process_row until all the slices terom@19: * have completed. terom@19: * terom@19: * * if no workers have updated the waiting_rows counter since we last processed terom@19: * a row, and the slices_running counter isn't zero (we still have threads terom@19: * running) terom@19: * * wait on a cond until something happens terom@19: * terom@19: * * if waiting_rows is nonzero terom@19: * * decrement it terom@19: * * process a row terom@19: * * if processing the row means that waiting slices can continue terom@19: * * mark all the segments as continueable terom@19: * * signal the cond the threads may be waiting on terom@19: * terom@19: * * if slices_running is zero terom@19: * * break out of the loop terom@19: * terom@19: * * otherwise, go back to the start again terom@19: */ terom@19: terom@19: struct render_threads { terom@19: // do I need to free this? terom@19: int owned_by_me; terom@19: terom@19: // info for the induvidual threads terom@19: struct render_thread { terom@19: // the slice that we need to render terom@19: struct render_slice_info *slice_info; terom@19: terom@19: // our thread id terom@19: pthread_t thread; terom@19: terom@19: // how many times we are allowed to continue now terom@19: int can_continue; terom@19: terom@19: // how many segments we've written terom@19: int segments_done; terom@19: terom@19: // ptr back to ctx terom@19: struct render_threads *self; terom@19: terom@19: } threads[RENDER_SLICES_MAX]; terom@19: terom@19: // the render_slices struct terom@19: struct render_slices slices_info; terom@19: terom@19: // the manager thread id terom@19: pthread_t manager_thread; terom@19: terom@19: // how many slices? terom@19: int slice_count; terom@19: terom@19: /* terom@19: * The worker threads must be careful to wait for the manager thread to start running before they attempt to signal terom@19: * the process_row_cond. This is done via this flag, in conjunction with continue_{cond,mut}. terom@19: * terom@19: * Workers should lock continue_mut, and if this flag is zero, wait on continue_cond (releasing the mutex). If the terom@19: * flag is already nonzero, the worker threads do not need to cond_wait and can just continue. terom@19: * terom@19: * On startup, the manager thread will lock continue_mut, set this to nonzero, signal continue_cond, and unlock terom@19: * continue_mut as normal. terom@19: */ terom@19: int manager_running; terom@19: terom@19: /* terom@19: * Signals to the manager terom@19: */ terom@19: int slices_running; terom@19: int waiting_rows; terom@19: terom@19: /* the inter-thread communication stuff */ terom@19: terom@19: // the worker threads signal this when their segment_done call indicates that the manager thread should call terom@19: // process_row now. terom@19: pthread_cond_t process_row_cond; terom@19: pthread_mutex_t process_row_mut; terom@19: terom@19: // the worker threads and the manager thread signal this when segment_done or process_row indicate that the worker terom@19: // threads should call segment_done again. Worker threads that receive ~SLICE_CONTINUE from segment_done will wait terom@19: // on this cond. terom@19: pthread_cond_t continue_cond; terom@19: pthread_mutex_t continue_mut; terom@19: terom@19: // used to protect slices_info terom@19: pthread_mutex_t slices_mut; terom@20: terom@20: /* terom@20: * Return value from the manager thread terom@20: */ terom@20: enum render_threads_retval { terom@20: RETVAL_SUCCESS, terom@20: RETVAL_FAILURE, terom@20: } manager_retval; terom@19: }; terom@19: terom@20: terom@19: void render_threads_deinit (struct render_threads *ctx) { terom@19: render_slices_deinit(&ctx->slices_info); terom@19: } terom@19: terom@19: void render_threads_free (struct render_threads *ctx) { terom@19: render_threads_deinit(ctx); terom@19: terom@19: if (ctx->owned_by_me) terom@19: free(ctx); terom@19: } terom@19: terom@19: int _render_threads_slice_row (void *arg, unsigned char *rowbuf) { terom@19: struct render_thread *subctx = arg; terom@19: int status, i; terom@20: terom@20: // test for cancelation, we don't hold any locks right now terom@20: pthread_testcancel(); terom@19: terom@19: // we need to handle the render_slices state atomically, so lock it terom@19: pthread_mutex_lock(&subctx->self->slices_mut); terom@19: pthread_mutex_lock(&subctx->self->continue_mut); terom@19: terom@19: // make sure that we don't have any stale state in this terom@19: subctx->can_continue = 0; terom@19: terom@19: // process the segment terom@19: status = render_slices_segment_done(&subctx->self->slices_info, subctx->slice_info->index); terom@19: terom@19: // debugging terom@20: DEBUG("slice %zu: segment_done, status=(%s %s), row=%d", subctx->slice_info->index, terom@19: status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "", terom@19: status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "", terom@20: ++subctx->segments_done terom@19: ); terom@19: terom@19: // do we need to notify the other worker threads? terom@19: if (status & SLICE_CONTINUE) { terom@20: DEBUG("slice %zu: signal continue", subctx->slice_info->index); terom@19: terom@19: // mark them as continueable terom@19: for (i = 0; i < subctx->self->slice_count; i++) terom@19: subctx->self->threads[i].can_continue = 1; terom@19: terom@19: // signal then to continue terom@19: pthread_cond_broadcast(&subctx->self->continue_cond); terom@20: } terom@19: terom@20: // done with the slices stuff terom@20: pthread_mutex_unlock(&subctx->self->continue_mut); terom@20: pthread_mutex_unlock(&subctx->self->slices_mut); terom@20: terom@19: // tell the manager thread that it can process a row terom@19: if (status & SLICE_PROCESS_ROW) { terom@19: // grab the process_row lock so that we can do stuff with it terom@19: pthread_mutex_lock(&subctx->self->process_row_mut); terom@19: terom@19: // tell it that it has a row waiting terom@19: subctx->self->waiting_rows++; terom@19: terom@19: // signal it in case it was waiting terom@19: pthread_cond_signal(&subctx->self->process_row_cond); terom@19: terom@19: // release the process_row mutex terom@19: pthread_mutex_unlock(&subctx->self->process_row_mut); terom@19: } terom@19: terom@20: #define BEGIN_MUTEX(mut) pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &(mut)); pthread_mutex_lock(&(mut)); terom@20: #define END_MUTEX pthread_cleanup_pop(1); terom@19: terom@20: // handle our can-continue status terom@20: // we need to release this lock if we get canceled in pthread_cond_wait terom@20: BEGIN_MUTEX(subctx->self->continue_mut) terom@20: if (status & SLICE_CONTINUE) { terom@20: // don't wait for anything, we can just continue right away terom@20: DEBUG("slice %zu: direct continue", subctx->slice_info->index); terom@19: terom@19: } else { terom@20: if (!subctx->can_continue) { terom@20: // we need to wait until someone signals it terom@20: DEBUG("slice %zu: waiting...", subctx->slice_info->index); terom@20: terom@20: pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut); terom@20: terom@20: DEBUG("slice %zu: continue after wait", subctx->slice_info->index); terom@20: terom@20: } else { terom@20: // no need to wait, because someone else took care of that before we got a chance to wait terom@20: DEBUG("slice %zu: indirect continue", subctx->slice_info->index); terom@20: terom@20: } terom@19: } terom@20: terom@20: // we should now be marked as continueable terom@20: assert(subctx->can_continue); terom@19: terom@20: // proceed to continue, so clear this terom@20: subctx->can_continue = 0; terom@20: terom@20: // done handling the continue state terom@20: pthread_mutex_unlock(&subctx->self->continue_mut); terom@20: terom@20: END_MUTEX terom@19: terom@19: // row handled succesfully terom@19: return 0; terom@19: } terom@19: terom@19: void *_render_threads_slice_func (void *arg) { terom@19: struct render_thread *subctx = arg; terom@19: terom@19: // set up the render_info to render into local memory using the address provided via slice_info terom@19: // use _render_threads_slice_row to handle the completed rows terom@19: if (render_local_mem(subctx->slice_info->render_info, terom@19: &subctx->slice_info->render_buf, terom@19: &_render_threads_slice_row, subctx) terom@19: ) terom@19: goto error; terom@19: terom@19: // wait for the manager to start up terom@19: pthread_mutex_lock(&subctx->self->continue_mut); terom@19: if (!subctx->self->manager_running) terom@19: pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut); terom@19: pthread_mutex_unlock(&subctx->self->continue_mut); terom@19: terom@19: // start rendering... terom@20: DEBUG("slice %zu: start", subctx->slice_info->index); terom@19: terom@19: if (render_mandelbrot(subctx->slice_info->render_info)) terom@19: goto error; terom@19: terom@19: // success, slice render complete terom@20: DEBUG("slice %zu: done", subctx->slice_info->index); terom@19: terom@19: // sanity checks terom@19: assert(subctx->self->slices_running > 0); terom@19: terom@19: pthread_mutex_lock(&subctx->self->process_row_mut); terom@19: terom@19: if (--subctx->self->slices_running == 0) { terom@19: // this was the last row, send the manager a final signal in case it's waiting terom@20: DEBUG("slice %zu: signal", subctx->slice_info->index); terom@19: pthread_cond_signal(&subctx->self->process_row_cond); terom@19: } terom@19: terom@19: pthread_mutex_unlock(&subctx->self->process_row_mut); terom@19: terom@19: // successful exit terom@19: pthread_exit(NULL); terom@19: terom@19: error: terom@19: /* XXX: signal an error condition? Mind, with the current code, this should never happen... */ terom@19: FATAL("a render thread failed somehow, taking down the rest of the daemon application as well"); terom@19: } terom@19: terom@19: void *_render_threads_manager_func (void *arg) { terom@19: struct render_threads *ctx = arg; terom@19: terom@19: // the process_row return status terom@19: int status, i; terom@19: terom@19: // first, mark us as running and signal any waiting workers terom@19: pthread_mutex_lock(&ctx->continue_mut); terom@19: ctx->manager_running = 1; terom@19: pthread_cond_broadcast(&ctx->continue_cond); terom@19: pthread_mutex_unlock(&ctx->continue_mut); terom@19: terom@19: // needs to be locked inside the loop terom@19: pthread_mutex_lock(&ctx->process_row_mut); terom@19: terom@19: // then we wait around and call process_row when told to terom@19: do { terom@19: // the process_row must be locked here terom@20: DEBUG("manager: to wait"); terom@19: terom@19: // figure out if we need to wait terom@19: if (ctx->waiting_rows == 0 && ctx->slices_running > 0) { terom@19: // no work to do right now, so wait for some terom@20: DEBUG("manager: waiting"); terom@19: terom@19: // wait until a worker thread signals us terom@19: pthread_cond_wait(&ctx->process_row_cond, &ctx->process_row_mut); terom@19: } terom@19: terom@19: // make sure we actually have something to do now... terom@19: assert(ctx->waiting_rows > 0 || ctx->slices_running == 0); terom@19: terom@19: // keep the process_row mutex locked until we've inspected/updated both vars terom@19: terom@19: // do we need to process a row? terom@19: if (ctx->waiting_rows > 0) { terom@19: // claim the row for ourself terom@19: ctx->waiting_rows--; terom@19: terom@19: // unlock the process_row mut while we handle this row terom@19: pthread_mutex_unlock(&ctx->process_row_mut); terom@19: terom@19: terom@19: // keep calls to render_slices synchronized terom@19: pthread_mutex_lock(&ctx->slices_mut); terom@19: terom@19: // call into render_slices terom@19: status = render_slices_process_row(&ctx->slices_info); terom@19: terom@20: DEBUG("manager: did process_row, status=(%s %s)", terom@19: status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "", terom@19: status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "" terom@19: ); terom@20: terom@19: // any errors? terom@19: if (status == -1) { terom@20: pthread_mutex_unlock(&ctx->slices_mut); terom@19: goto error; terom@19: } terom@19: terom@19: // can any waiting slices now continue? terom@19: if (status & SLICE_CONTINUE) { terom@19: // grab the continue_mut lock while we update this state terom@19: pthread_mutex_lock(&ctx->continue_mut); terom@19: terom@20: // set can_continue for all slices terom@19: for (i = 0; i < ctx->slice_count; i++) terom@19: ctx->threads[i].can_continue = 1; terom@19: terom@20: DEBUG("manager: signal continue"); terom@19: terom@19: // signal any waiting worker threads to continue, giving them a chance to call cond_wait first. terom@19: pthread_cond_broadcast(&ctx->continue_cond); terom@19: terom@19: // done with that terom@19: pthread_mutex_unlock(&ctx->continue_mut); terom@19: } terom@20: terom@20: // done with render_slices terom@20: pthread_mutex_unlock(&ctx->slices_mut); terom@19: terom@19: // XXX: ignore SLICE_PROCESS_ROW terom@19: terom@19: // grab our mutex again terom@19: pthread_mutex_lock(&ctx->process_row_mut); terom@19: } terom@19: terom@21: // are all the rows and slices done now? terom@21: if (ctx->slices_running == 0 && ctx->waiting_rows == 0) { terom@19: // unlock the mutex so the worker threads don't get stuck on it terom@19: pthread_mutex_unlock(&ctx->process_row_mut); terom@19: terom@19: break; terom@19: } terom@19: terom@19: // keep the process_row mutex locked at the top of the loop terom@19: } while (1); terom@19: terom@19: // all slices are done now terom@19: if (render_slices_done(&ctx->slices_info)) terom@19: goto error; terom@19: terom@20: DEBUG("manager: done"); terom@19: terom@19: void *retval; terom@19: terom@20: // reap all the workers terom@19: for (i = 0; i < ctx->slice_count; i++) { terom@19: if (pthread_join(ctx->threads[i].thread, &retval)) terom@19: PERROR("pthread_join"); terom@19: terom@20: // XXX: assume they don't get canceled terom@19: assert(retval == NULL); terom@19: } terom@20: terom@20: // succesfull return terom@20: ctx->manager_retval = RETVAL_SUCCESS; terom@20: terom@20: goto exit; terom@19: terom@19: error: terom@20: /* cancel and join all worker threads */ terom@20: WARNING("canceling and reaping all threads"); terom@20: terom@20: for (i = 0; i < ctx->slice_count; i++) { terom@20: if (pthread_cancel(ctx->threads[i].thread)) terom@20: PWARNING("pthread_cancel(worker:%d)", i); terom@21: terom@21: // join it in any case terom@21: if (pthread_join(ctx->threads[i].thread, &retval)) { terom@21: PWARNING("pthread_join(worker:%d)", i); terom@21: } else { terom@21: if (retval != PTHREAD_CANCELED) terom@21: PWARNING("worker:%d retval = %p", i, retval); terom@20: } terom@20: } terom@20: terom@20: // failure terom@20: ctx->manager_retval = RETVAL_FAILURE; terom@20: terom@20: exit: terom@20: // ok, exit ourself terom@20: pthread_exit(ctx); terom@19: } terom@19: terom@19: int render_threads_init (struct render_threads *ctx, struct render *render_info) { terom@19: // break the render operation down into induvidual slices terom@19: if (render_slices_init(&ctx->slices_info, render_info)) terom@19: goto error; terom@19: terom@19: // init the mutexes (this should never fail) terom@19: assert(!( terom@19: pthread_mutex_init(&ctx->process_row_mut, NULL) terom@19: || pthread_mutex_init(&ctx->continue_mut, NULL) terom@19: || pthread_mutex_init(&ctx->slices_mut, NULL) terom@19: )); terom@19: terom@19: // init the conds (this should never fail) terom@19: assert(!( terom@19: pthread_cond_init(&ctx->process_row_cond, NULL) terom@19: || pthread_cond_init(&ctx->continue_cond, NULL) terom@19: )); terom@19: terom@19: // how many slices? terom@19: ctx->slices_running = ctx->slice_count = render_slices_get_count(&ctx->slices_info); terom@19: terom@19: // spawn a thread for each slice terom@19: int index; terom@19: for (index = 0; index < ctx->slice_count; index++) { terom@19: // the self ptr... terom@19: ctx->threads[index].self = ctx; terom@19: terom@19: // fetch the slice info terom@19: ctx->threads[index].slice_info = render_slices_get_slice_info(&ctx->slices_info, index); terom@19: terom@19: // spawn the thread terom@19: if (pthread_create(&ctx->threads[index].thread, NULL, &_render_threads_slice_func, &ctx->threads[index])) terom@19: PERROR("pthread_create(slice_func:%d)", index); terom@19: } terom@19: terom@19: // init the attrs for the manager thread terom@19: pthread_attr_t manager_attrs; terom@19: terom@19: if ( terom@19: pthread_attr_init(&manager_attrs) terom@19: // || pthread_setdetachstate(&manager_attrs, PTHREAD_CREATE_DETACHED) terom@19: ) terom@19: PERROR("pthread_attr_init/pthread_attr_setdetachstate"); terom@19: terom@19: // spawn the manager thread terom@19: if (pthread_create(&ctx->manager_thread, &manager_attrs, &_render_threads_manager_func, ctx)) terom@19: PERROR("pthread_create(manager_func)"); terom@19: terom@19: // success terom@19: return 0; terom@19: terom@19: error: terom@19: render_threads_deinit(ctx); terom@19: return -1; terom@19: } terom@19: terom@19: struct render_threads *render_threads_alloc (struct render *render_info) { terom@19: struct render_threads *ctx = NULL; terom@19: terom@19: if (!(ctx = calloc(1, sizeof(*ctx)))) terom@19: ERROR("calloc"); terom@19: terom@21: // flag it for free()ing terom@21: ctx->owned_by_me = 1; terom@21: terom@19: // init with silent fall-through terom@19: if (render_threads_init(ctx, render_info)) terom@19: goto error; terom@19: terom@19: // success terom@19: return ctx; terom@19: terom@19: error: terom@19: return NULL; terom@19: } terom@19: terom@19: int render_threads_wait (struct render_threads *ctx) { terom@19: void *retval; terom@19: terom@19: if (pthread_join(ctx->manager_thread, &retval)) terom@19: PERROR("pthread_join"); terom@19: terom@19: if (retval == PTHREAD_CANCELED) terom@19: ERROR("manager thread was canceled"); terom@20: else if (retval != ctx) terom@19: ERROR("manager thread exited with unknown return value"); terom@20: else { terom@20: if (ctx->manager_retval != RETVAL_SUCCESS) { terom@20: ERROR("manger thread exited with an error"); terom@20: } terom@20: } terom@19: terom@19: // ok, success terom@19: return 0; terom@19: terom@19: error: terom@19: // caller needs to free ctx terom@19: return -1; terom@19: } terom@19: