src/lib/ctx.c
author Tero Marttila <terom@fixme.fi>
Tue, 26 Jan 2010 21:22:43 +0200
changeset 123 81d1cad8b588
parent 33 0ed40e11b0e8
permissions -rw-r--r--
docfix README for new Makefile
#include "ctx.h"
#include "error.h"
#include "shared/log.h"

#include <stdlib.h>
#include <signal.h>
#include <assert.h>
#include <stdio.h> // for perror

/**
 * Wrapper around pthread_mutex_unlock for use with pthread_cleanup_push
 */
static void pt_mutex_unlock (void *arg)
{
    pthread_mutex_t *mutex = arg;
    
    log_debug("cleanup");

    assert(!pthread_mutex_unlock(mutex));
}

/**
 * Enqueue the given piece of work
 *
 * This function always succeeds.
 */
static void pt_work_enqueue (struct pt_ctx *ctx, struct pt_work *work)
{
    // acquire
    assert(!pthread_mutex_lock(&ctx->work_mutex));

    // enqueue work
    TAILQ_INSERT_TAIL(&ctx->work, work, ctx_work);

    // if there's a thread waiting, wake one up. Otherwise, some thread will find it once it finishes its current work
    assert(!pthread_cond_signal(&ctx->work_cond));

    // release 
    assert(!pthread_mutex_unlock(&ctx->work_mutex));
}

/**
 * Dequeue a piece of work, returning true while there was still work to do
 */
static bool pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
{
    bool ret;

    // acquire, cancel-safe
    pthread_cleanup_push(pt_mutex_unlock, &ctx->work_mutex);
    assert(!pthread_mutex_lock(&ctx->work_mutex));

    // wait for work
    while (ctx->running && TAILQ_EMPTY(&ctx->work))
        // we can expect to get pthread_cancel'd here
        assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex));
    
    // still got work?
    if (!TAILQ_EMPTY(&ctx->work)) {
        // pop work
        *work_ptr = TAILQ_FIRST(&ctx->work);
        TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);

        log_debug("got work");

        ret = true;

    } else {
        assert(!ctx->running);

        // work empty, idle
        assert(!pthread_cond_signal(&ctx->idle_cond));
        
        log_debug("idle/dead");

        // no more work
        ret = false;
    }

    // release
    pthread_cleanup_pop(true);

    return ret;
}

/**
 * Wait for work queue to empty and workers to finish
 */
static void pt_work_shutdown (struct pt_ctx *ctx)
{
    // acquire
    assert(!pthread_mutex_lock(&ctx->work_mutex));
    
    // indicate to terminate
    ctx->running = false;

    // wake up any idle workers
    assert(!pthread_cond_broadcast(&ctx->work_cond));
    
    // wait for it to drain...
    while (!TAILQ_EMPTY(&ctx->work))
        assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex));

    // release
    assert(!pthread_mutex_unlock(&ctx->work_mutex));
}

/**
 * Execute a piece of work
 */
static void pt_work_execute (struct pt_work *work)
{
    work->func(work->arg);
}

/**
 * Release work state once done
 */
static void pt_work_release (struct pt_work *work)
{
    free(work);
}

/**
 * Worker thread entry point
 */
static void* pt_thread_main (void *arg)
{
    struct pt_thread *thread = arg;
    struct pt_work *work;
    
    log_debug("start");

    // if only life were so simple...
    while (pt_work_dequeue(thread->ctx, &work)) {
        log_debug("work_execute");
        pt_work_execute(work);
        pt_work_release(work);
        log_debug("work_done");
    }

    log_debug("exit");

    return NULL;
}

/**
 * Shut down a pt_thread, waiting for it to finish.
 *
 * Does not remove the thread from the pool or release the pt_thread.
 */
static void pt_thread_shutdown (struct pt_thread *thread)
{
    // wait for it to finish
    if (pthread_join(thread->tid, NULL))
        perror("pthread_join");

    // mark
    thread->tid = 0;
}

/**
 * Release pt_thread state, aborting thread if running.
 *
 * This is guaranteed to remove the thread from the ctx_threads if it was added.
 */
static void pt_thread_destroy (struct pt_thread *thread)
{
    if (thread->tid) {
        // detach
        if (pthread_detach(thread->tid))
            perror("pthread_detach");

        // abort thread
        if (pthread_kill(thread->tid, SIGTERM))
            perror("pthread_detach");

    }
        
    // remove from pool
    TAILQ_REMOVE(&thread->ctx->threads, thread, ctx_threads);
    
    free(thread);
}


/**
 * Start a new thread and add it to the thread pool
 */
static int pt_ctx_add_thread (struct pt_ctx *ctx)
{
    struct pt_thread *thread;
    int err;

    // alloc
    if ((thread = calloc(1, sizeof(*thread))) == NULL)
        return -PT_ERR_MEM;

    // init
    thread->ctx = ctx;

    // start thread, default attrs
    if (pthread_create(&thread->tid, NULL, pt_thread_main, thread))
        JUMP_SET_ERROR(err, PT_ERR_PTHREAD_CREATE);

    // add to pool
    TAILQ_INSERT_TAIL(&ctx->threads, thread, ctx_threads);

    // ok
    return 0;

error:
    // drop, don't try and remove from tailq...
    free(thread);

    return err;
}

int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads)
{
    struct pt_ctx *ctx;
    int err;

    // alloc
    if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
        return -PT_ERR_MEM;

    // init
    TAILQ_INIT(&ctx->threads);
    TAILQ_INIT(&ctx->work);
    pthread_mutex_init(&ctx->work_mutex, NULL);
    pthread_cond_init(&ctx->work_cond, NULL);
    pthread_cond_init(&ctx->idle_cond, NULL);
    ctx->running = true;

    // start threadpool
    for (int i = 0; i < threads; i++) {
        if ((err = pt_ctx_add_thread(ctx)))
            JUMP_ERROR(err);
    }

    // ok
    *ctx_ptr = ctx;

    return 0;
        
error:
    // cleanup
    pt_ctx_destroy(ctx);

    return err;
}

int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg)
{
    struct pt_work *work;

    // check state
    // XXX: this is kind of retarded, because pt_ctx_shutdown/work should only be called from the same thread...
    if (!ctx->running)
        RETURN_ERROR(PT_ERR_CTX_SHUTDOWN);

    // alloc
    if ((work = calloc(1, sizeof(*work))) == NULL)
        RETURN_ERROR(PT_ERR_MEM);

    // init
    work->func = func;
    work->arg = arg;

    // enqueue
    pt_work_enqueue(ctx, work);

    // ok
    return 0;
}

int pt_ctx_shutdown (struct pt_ctx *ctx)
{
    struct pt_thread *thread;

    // finish work
    pt_work_shutdown(ctx);

    // shut down each thread
    TAILQ_FOREACH(thread, &ctx->threads, ctx_threads)
        // XXX: handle errors of some kind?
        pt_thread_shutdown(thread);

    // then drop
    pt_ctx_destroy(ctx);

    return 0;
}

void pt_ctx_destroy (struct pt_ctx *ctx)
{
    struct pt_thread *thread;

    // kill threads
    while ((thread = TAILQ_FIRST(&ctx->threads)))
        pt_thread_destroy(thread);

    // destroy mutex/conds
    if (pthread_cond_destroy(&ctx->idle_cond))
        perror("pthread_cond_destroy(idle_cond)");
    
    if (pthread_cond_destroy(&ctx->work_cond))
        perror("pthread_cond_destroy(work_cond)");

    if (pthread_mutex_destroy(&ctx->work_mutex))
        perror("pthread_mutex_destroy(work_mutex)");
   

    free(ctx);
}