render_threads.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 21:30:32 +0300
changeset 41 540737bf6bac
parent 23 31307efd7e78
permissions -rw-r--r--
sending requests, and partial support for receiving -- incomplete, not tested
#include <stdlib.h>
#include <assert.h>

#include <pthread.h>

//#define DEBUG_ENABLED

#include "common.h"
#include "render_threads.h"
#include "render_slices_struct.h"
#include "render_mandelbrot.h"

/*
 * Render a mandelbrot in several slices simultaneously using threads. 
 *
 * This works as an n:1 producer-consumer model, where we have n worker threads
 * rendering n slices, and one manager thread that processes the completed rows.
 * 
 * The worker threads function by first rendering a row of pixel data into memory,
 * and then calling a function that will do something with that completed segment.
 *  
 *  * clear the can_continue marker, because the real value may change depending
 *    on what our segment does, and we don't want stale values from before we
 *    processed thse segment in it
 *      * process the segment
 *
 *  * if the segment didn't complete a row, wait until some worker thread submits
 *    a segment that does, or the manager thread processes a row
 *      * if noone has signaled the cond in between us deciding what to do with
 *        the segment and this step,
 *          * wait on the cond
 *      * we can proceed to render the next segment
 *          * we should now be marked as continueable
 *
 *  * if the segment completed a row, the manager thread needs to process this
 *    newly completed row. 
 *      * if we can also continue on to the next row without waiting for the manager
 *        to process the row we just filled
 *          * mark all the segments as continueable
 *          * signal the cond the threads may be waiting on
 *      * increment the waiting_rows counter
 *          * signal the manager in case it's waiting
 *      * else, if noone has signaled the continue cond after we processed the
 *        segment,
 *          * wait on the cond
 *      * we can proceed to render the next segment
 *          * we should now be marked as continueable
 *
 * * if we have no more segments to submit
 *      * update the slices_running counter
 *          * if the counter is now zero (we were the last slice to complete)
 *              * signal the manager thread that it doesn't need to wait anymore
 *
 * The manager thread just sits in a loop calling process_row until all the slices
 * have completed.
 *
 * * if no workers have updated the waiting_rows counter since we last processed
 *   a row, and the slices_running counter isn't zero (we still have threads
 *   running)
 *      * wait on a cond until something happens
 *
 * * if waiting_rows is nonzero
 *      * decrement it
 *      * process a row
 *      * if processing the row means that waiting slices can continue
 *          * mark all the segments as continueable
 *          * signal the cond the threads may be waiting on
 *
 * * if slices_running is zero
 *      * break out of the loop
 *
 * * otherwise, go back to the start again
 */     

struct render_threads {
    // do I need to free this?
    int owned_by_me;

    // info for the induvidual threads
    struct render_thread {
        // the slice that we need to render
        struct render_slice_info *slice_info;

        // our thread id
        pthread_t thread;

        // how many times we are allowed to continue now
        int can_continue;

        // how many segments we've written
        int segments_done;

        // ptr back to ctx
        struct render_threads *self;

    } threads[RENDER_SLICES_MAX];

    // the render_slices struct
    struct render_slices slices_info;

    // the manager thread id
    pthread_t manager_thread;

    // how many slices?
    int slice_count;
    
    /*
     * The worker threads must be careful to wait for the manager thread to start running before they attempt to signal
     * the process_row_cond. This is done via this flag, in conjunction with continue_{cond,mut}. 
     *
     * Workers should lock continue_mut, and if this flag is zero, wait on continue_cond (releasing the mutex). If the
     * flag is already nonzero, the worker threads do not need to cond_wait and can just continue.
     *
     * On startup, the manager thread will lock continue_mut, set this to nonzero, signal continue_cond, and unlock
     * continue_mut as normal.
     */
    int manager_running;
    
    /*
     * Signals to the manager
     */
    int slices_running;
    int waiting_rows;

    /* the inter-thread communication stuff */

    // the worker threads signal this when their segment_done call indicates that the manager thread should call
    // process_row now.
    pthread_cond_t process_row_cond;
    pthread_mutex_t process_row_mut;

    // the worker threads and the manager thread signal this when segment_done or process_row indicate that the worker
    // threads should call segment_done again. Worker threads that receive ~SLICE_CONTINUE from segment_done will wait
    // on this cond.
    pthread_cond_t continue_cond;
    pthread_mutex_t continue_mut;
    
    // used to protect slices_info
    pthread_mutex_t slices_mut;

    /*
     * Return value from the manager thread
     */
    enum render_threads_retval {
        RETVAL_SUCCESS,
        RETVAL_FAILURE,
    } manager_retval;
};


void render_threads_deinit (struct render_threads *ctx) {
    render_slices_deinit(&ctx->slices_info);
}

void render_threads_free (struct render_threads *ctx) {
    render_threads_deinit(ctx);

    if (ctx->owned_by_me)
        free(ctx);
}

int _render_threads_slice_row (void *arg, unsigned char *rowbuf) {
    struct render_thread *subctx = arg;
    int status, i;

    // test for cancelation, we don't hold any locks right now
    pthread_testcancel();
    
    // we need to handle the render_slices state atomically, so lock it
    pthread_mutex_lock(&subctx->self->slices_mut);
    pthread_mutex_lock(&subctx->self->continue_mut);
    
    // make sure that we don't have any stale state in this
    subctx->can_continue = 0;

    // process the segment
    status = render_slices_segment_done(&subctx->self->slices_info, subctx->slice_info->index);
    
    // debugging
    DEBUG("slice %zu: segment_done, status=(%s %s), row=%d", subctx->slice_info->index,
        status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "", 
        status & SLICE_CONTINUE ? "SLICE_CONTINUE" : "",
        ++subctx->segments_done
    );
    
    // do we need to notify the other worker threads?
    if (status & SLICE_CONTINUE) {
        DEBUG("slice %zu: signal continue", subctx->slice_info->index);

        // mark them as continueable
        for (i = 0; i < subctx->self->slice_count; i++)
            subctx->self->threads[i].can_continue = 1;

        // signal then to continue
        pthread_cond_broadcast(&subctx->self->continue_cond);
    }

    // done with the slices stuff
    pthread_mutex_unlock(&subctx->self->continue_mut);
    pthread_mutex_unlock(&subctx->self->slices_mut);
   
    // tell the manager thread that it can process a row
    if (status & SLICE_PROCESS_ROW) {
        // grab the process_row lock so that we can do stuff with it
        pthread_mutex_lock(&subctx->self->process_row_mut);
        
        // tell it that it has a row waiting
        subctx->self->waiting_rows++;

        // signal it in case it was waiting
        pthread_cond_signal(&subctx->self->process_row_cond);

        // release the process_row mutex
        pthread_mutex_unlock(&subctx->self->process_row_mut);
    }

#define BEGIN_MUTEX(mut) pthread_cleanup_push((void (*)(void *)) pthread_mutex_unlock, &(mut)); pthread_mutex_lock(&(mut));
#define END_MUTEX pthread_cleanup_pop(1);

    // handle our can-continue status
    // we need to release this lock if we get canceled in pthread_cond_wait
    BEGIN_MUTEX(subctx->self->continue_mut)
        if (status & SLICE_CONTINUE) {
            // don't wait for anything, we can just continue right away
            DEBUG("slice %zu: direct continue", subctx->slice_info->index);

        } else {
            if (!subctx->can_continue) {
                // we need to wait until someone signals it
                DEBUG("slice %zu: waiting...", subctx->slice_info->index);

                pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);

                DEBUG("slice %zu: continue after wait", subctx->slice_info->index);

            } else {
                // no need to wait, because someone else took care of that before we got a chance to wait
                DEBUG("slice %zu: indirect continue", subctx->slice_info->index);
             
            }
        }
        
        // we should now be marked as continueable
        assert(subctx->can_continue);

        // proceed to continue, so clear this
        subctx->can_continue = 0;
        
        // done handling the continue state
        pthread_mutex_unlock(&subctx->self->continue_mut);

    END_MUTEX

    // row handled succesfully
    return 0;
}

void *_render_threads_slice_func (void *arg) {
    struct render_thread *subctx = arg;

    // set up the render_info to render into local memory using the address provided via slice_info
    // use _render_threads_slice_row to handle the completed rows
    if (render_local_mem(subctx->slice_info->render_info, 
        &subctx->slice_info->render_buf, 
        &_render_threads_slice_row, subctx)
    )
        goto error;
   
    // wait for the manager to start up
    pthread_mutex_lock(&subctx->self->continue_mut);
    if (!subctx->self->manager_running)
        pthread_cond_wait(&subctx->self->continue_cond, &subctx->self->continue_mut);
    pthread_mutex_unlock(&subctx->self->continue_mut);

    // start rendering...
    DEBUG("slice %zu: start", subctx->slice_info->index);

    if (render_mandelbrot(subctx->slice_info->render_info))
        goto error;

    // success, slice render complete
    DEBUG("slice %zu: done", subctx->slice_info->index);
    
    // sanity checks
    assert(subctx->self->slices_running > 0);
    
    pthread_mutex_lock(&subctx->self->process_row_mut);

    if (--subctx->self->slices_running == 0) {
        // this was the last row, send the manager a final signal in case it's waiting
        DEBUG("slice %zu: signal", subctx->slice_info->index);
        pthread_cond_signal(&subctx->self->process_row_cond);
    }

    pthread_mutex_unlock(&subctx->self->process_row_mut);

    // successful exit
    pthread_exit(NULL);

error:
    /* XXX: signal an error condition? Mind, with the current code, this should never happen... */
    FATAL("a render thread failed somehow, taking down the rest of the daemon application as well");
}

void *_render_threads_manager_func (void *arg) {
    struct render_threads *ctx = arg;
    
    // the process_row return status
    int status, i;

    // first, mark us as running and signal any waiting workers
    pthread_mutex_lock(&ctx->continue_mut);
    ctx->manager_running = 1;
    pthread_cond_broadcast(&ctx->continue_cond);
    pthread_mutex_unlock(&ctx->continue_mut);
    
    // needs to be locked inside the loop
    pthread_mutex_lock(&ctx->process_row_mut);

    // then we wait around and call process_row when told to
    do {
        // the process_row must be locked here
        DEBUG("manager: to wait");
        
        // figure out if we need to wait
        if (ctx->waiting_rows == 0 && ctx->slices_running > 0) {
            // no work to do right now, so wait for some
            DEBUG("manager: waiting");
            
            // wait until a worker thread signals us
            pthread_cond_wait(&ctx->process_row_cond, &ctx->process_row_mut);
        }

        // make sure we actually have something to do now...
        assert(ctx->waiting_rows > 0 || ctx->slices_running == 0);

        // keep the process_row mutex locked until we've inspected/updated both vars
        
        // do we need to process a row?
        if (ctx->waiting_rows > 0) {
            // claim the row for ourself
            ctx->waiting_rows--;

            // unlock the process_row mut while we handle this row
            pthread_mutex_unlock(&ctx->process_row_mut);


            // keep calls to render_slices synchronized
            pthread_mutex_lock(&ctx->slices_mut);
            
            // call into render_slices
            status = render_slices_process_row(&ctx->slices_info);
            
            DEBUG("manager: did process_row, status=(%s %s)", 
                status & SLICE_PROCESS_ROW ? "SLICE_PROCESS_ROW" : "", 
                status & SLICE_CONTINUE ? "SLICE_CONTINUE" : ""
            );
           
            // any errors?
            if (status == -1) {
                pthread_mutex_unlock(&ctx->slices_mut);
                goto error;
            }

            // can any waiting slices now continue?
            if (status & SLICE_CONTINUE) {
                // grab the continue_mut lock while we update this state
                pthread_mutex_lock(&ctx->continue_mut);

                // set can_continue for all slices
                for (i = 0; i < ctx->slice_count; i++)
                    ctx->threads[i].can_continue = 1;

                DEBUG("manager: signal continue");

                // signal any waiting worker threads to continue, giving them a chance to call cond_wait first.
                pthread_cond_broadcast(&ctx->continue_cond);
                
                // done with that
                pthread_mutex_unlock(&ctx->continue_mut);
            }
            
            // done with render_slices
            pthread_mutex_unlock(&ctx->slices_mut);

            // XXX: ignore SLICE_PROCESS_ROW
    
            // grab our mutex again
            pthread_mutex_lock(&ctx->process_row_mut);
        }

        // are all the rows and slices done now?
        if (ctx->slices_running == 0 && ctx->waiting_rows == 0) {
            // unlock the mutex so the worker threads don't get stuck on it
            pthread_mutex_unlock(&ctx->process_row_mut);

            break;
        }
        
        // keep the process_row mutex locked at the top of the loop 
    } while (1);

    // all slices are done now
    if (render_slices_done(&ctx->slices_info))
        goto error;

    DEBUG("manager: done");
    
    void *retval;

    // reap all the workers
    for (i = 0; i < ctx->slice_count; i++) {
        if (pthread_join(ctx->threads[i].thread, &retval))
            PERROR("pthread_join");
        
        // XXX: assume they don't get canceled
        assert(retval == NULL);
    }
    
    // succesfull return
    ctx->manager_retval = RETVAL_SUCCESS;
    
    goto exit;

error:
    /* cancel and join all worker threads */
    WARNING("canceling and reaping all threads");

    for (i = 0; i < ctx->slice_count; i++) {
        if (pthread_cancel(ctx->threads[i].thread))
            PWARNING("pthread_cancel(worker:%d)", i);

        // join it in any case
        if (pthread_join(ctx->threads[i].thread, &retval)) {
            PWARNING("pthread_join(worker:%d)", i);
        } else {
            if (retval != PTHREAD_CANCELED)
                PWARNING("worker:%d retval = %p", i, retval);
        }
    }
    
    // failure
    ctx->manager_retval = RETVAL_FAILURE;

exit:    
    // ok, exit ourself
    pthread_exit(ctx);
}

int render_threads_init (struct render_threads *ctx, struct render *render_info) {
    // break the render operation down into induvidual slices
    if (render_slices_init(&ctx->slices_info, render_info))
        goto error;
    
    // init the mutexes (this should never fail)
    assert(!(
            pthread_mutex_init(&ctx->process_row_mut, NULL)
         || pthread_mutex_init(&ctx->continue_mut, NULL)
         || pthread_mutex_init(&ctx->slices_mut, NULL)
    ));

    // init the conds (this should never fail)
    assert(!(
            pthread_cond_init(&ctx->process_row_cond, NULL)
         || pthread_cond_init(&ctx->continue_cond, NULL)
    ));
    
    // how many slices?
    ctx->slices_running = ctx->slice_count = render_slices_get_count(&ctx->slices_info);

    // spawn a thread for each slice
    int index;
    for (index = 0; index < ctx->slice_count; index++) {
        // the self ptr...
        ctx->threads[index].self = ctx;

        // fetch the slice info
        ctx->threads[index].slice_info = render_slices_get_slice_info(&ctx->slices_info, index);

        // spawn the thread
        if (pthread_create(&ctx->threads[index].thread, NULL, &_render_threads_slice_func, &ctx->threads[index]))
            PERROR("pthread_create(slice_func:%d)", index);
    }

    // init the attrs for the manager thread
    pthread_attr_t manager_attrs;
    
    if (
            pthread_attr_init(&manager_attrs)
//         || pthread_setdetachstate(&manager_attrs, PTHREAD_CREATE_DETACHED)
    )
        PERROR("pthread_attr_init/pthread_attr_setdetachstate");

    // spawn the manager thread
    if (pthread_create(&ctx->manager_thread, &manager_attrs, &_render_threads_manager_func, ctx))
        PERROR("pthread_create(manager_func)");

    // success
    return 0;

error:
    render_threads_deinit(ctx);
    return -1;
}

struct render_threads *render_threads_alloc (struct render *render_info) {
    struct render_threads *ctx = NULL;

    if (!(ctx = calloc(1, sizeof(*ctx))))
        ERROR("calloc");
    
    // flag it for free()ing
    ctx->owned_by_me = 1;
    
    // init with silent fall-through
    if (render_threads_init(ctx, render_info))
        goto error;
    
    // success
    return ctx;

error:
    return NULL;
}

int render_threads_wait (struct render_threads *ctx) {
    void *retval;

    if (pthread_join(ctx->manager_thread, &retval))
        PERROR("pthread_join");
    
    if (retval == PTHREAD_CANCELED)
        ERROR("manager thread was canceled");
    else if (retval != ctx)
        ERROR("manager thread exited with unknown return value");
    else {
        if (ctx->manager_retval != RETVAL_SUCCESS) {
            ERROR("manger thread exited with an error");
        }
    }
    
    // ok, success
    return 0;

error:
    // caller needs to free ctx
    return -1;
}