terom@19: #include "ctx.h" terom@19: #include "error.h" terom@33: #include "shared/log.h" terom@19: terom@19: #include terom@19: #include terom@19: #include terom@19: #include // for perror terom@19: terom@21: /** terom@21: * Wrapper around pthread_mutex_unlock for use with pthread_cleanup_push terom@21: */ terom@20: static void pt_mutex_unlock (void *arg) terom@20: { terom@20: pthread_mutex_t *mutex = arg; terom@33: terom@33: log_debug("cleanup"); terom@20: terom@20: assert(!pthread_mutex_unlock(mutex)); terom@20: } terom@20: terom@19: /** terom@19: * Enqueue the given piece of work terom@19: * terom@19: * This function always succeeds. terom@19: */ terom@19: static void pt_work_enqueue (struct pt_ctx *ctx, struct pt_work *work) terom@19: { terom@19: // acquire terom@19: assert(!pthread_mutex_lock(&ctx->work_mutex)); terom@19: terom@19: // enqueue work terom@19: TAILQ_INSERT_TAIL(&ctx->work, work, ctx_work); terom@19: terom@19: // if there's a thread waiting, wake one up. Otherwise, some thread will find it once it finishes its current work terom@19: assert(!pthread_cond_signal(&ctx->work_cond)); terom@19: terom@19: // release terom@19: assert(!pthread_mutex_unlock(&ctx->work_mutex)); terom@19: } terom@19: terom@19: /** terom@33: * Dequeue a piece of work, returning true while there was still work to do terom@19: */ terom@33: static bool pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr) terom@19: { terom@33: bool ret; terom@33: terom@20: // acquire, cancel-safe terom@20: pthread_cleanup_push(pt_mutex_unlock, &ctx->work_mutex); terom@19: assert(!pthread_mutex_lock(&ctx->work_mutex)); terom@19: terom@19: // wait for work terom@33: while (ctx->running && TAILQ_EMPTY(&ctx->work)) terom@20: // we can expect to get pthread_cancel'd here terom@19: assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex)); terom@33: terom@33: // still got work? terom@33: if (!TAILQ_EMPTY(&ctx->work)) { terom@33: // pop work terom@33: *work_ptr = TAILQ_FIRST(&ctx->work); terom@33: TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work); terom@19: terom@33: log_debug("got work"); terom@33: terom@33: ret = true; terom@33: terom@33: } else { terom@33: assert(!ctx->running); terom@33: terom@33: // work empty, idle terom@33: assert(!pthread_cond_signal(&ctx->idle_cond)); terom@33: terom@33: log_debug("idle/dead"); terom@33: terom@33: // no more work terom@33: ret = false; terom@33: } terom@19: terom@19: // release terom@20: pthread_cleanup_pop(true); terom@33: terom@33: return ret; terom@19: } terom@19: terom@19: /** terom@33: * Wait for work queue to empty and workers to finish terom@19: */ terom@33: static void pt_work_shutdown (struct pt_ctx *ctx) terom@19: { terom@19: // acquire terom@19: assert(!pthread_mutex_lock(&ctx->work_mutex)); terom@19: terom@33: // indicate to terminate terom@33: ctx->running = false; terom@33: terom@33: // wake up any idle workers terom@33: assert(!pthread_cond_broadcast(&ctx->work_cond)); terom@33: terom@19: // wait for it to drain... terom@19: while (!TAILQ_EMPTY(&ctx->work)) terom@19: assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex)); terom@19: terom@19: // release terom@19: assert(!pthread_mutex_unlock(&ctx->work_mutex)); terom@19: } terom@19: terom@19: /** terom@19: * Execute a piece of work terom@19: */ terom@19: static void pt_work_execute (struct pt_work *work) terom@19: { terom@19: work->func(work->arg); terom@19: } terom@19: terom@19: /** terom@19: * Release work state once done terom@19: */ terom@19: static void pt_work_release (struct pt_work *work) terom@19: { terom@19: free(work); terom@19: } terom@19: terom@19: /** terom@19: * Worker thread entry point terom@19: */ terom@19: static void* pt_thread_main (void *arg) terom@19: { terom@19: struct pt_thread *thread = arg; terom@19: struct pt_work *work; terom@33: terom@33: log_debug("start"); terom@19: terom@19: // if only life were so simple... terom@33: while (pt_work_dequeue(thread->ctx, &work)) { terom@33: log_debug("work_execute"); terom@19: pt_work_execute(work); terom@19: pt_work_release(work); terom@33: log_debug("work_done"); terom@19: } terom@19: terom@33: log_debug("exit"); terom@33: terom@19: return NULL; terom@19: } terom@19: terom@19: /** terom@19: * Shut down a pt_thread, waiting for it to finish. terom@19: * terom@19: * Does not remove the thread from the pool or release the pt_thread. terom@19: */ terom@19: static void pt_thread_shutdown (struct pt_thread *thread) terom@19: { terom@33: // wait for it to finish terom@19: if (pthread_join(thread->tid, NULL)) terom@19: perror("pthread_join"); terom@19: terom@19: // mark terom@19: thread->tid = 0; terom@19: } terom@19: terom@19: /** terom@19: * Release pt_thread state, aborting thread if running. terom@19: * terom@19: * This is guaranteed to remove the thread from the ctx_threads if it was added. terom@19: */ terom@19: static void pt_thread_destroy (struct pt_thread *thread) terom@19: { terom@19: if (thread->tid) { terom@19: // detach terom@19: if (pthread_detach(thread->tid)) terom@19: perror("pthread_detach"); terom@19: terom@19: // abort thread terom@19: if (pthread_kill(thread->tid, SIGTERM)) terom@19: perror("pthread_detach"); terom@19: terom@19: } terom@19: terom@19: // remove from pool terom@19: TAILQ_REMOVE(&thread->ctx->threads, thread, ctx_threads); terom@19: terom@19: free(thread); terom@19: } terom@19: terom@19: terom@19: /** terom@19: * Start a new thread and add it to the thread pool terom@19: */ terom@19: static int pt_ctx_add_thread (struct pt_ctx *ctx) terom@19: { terom@19: struct pt_thread *thread; terom@19: int err; terom@19: terom@19: // alloc terom@19: if ((thread = calloc(1, sizeof(*thread))) == NULL) terom@19: return -PT_ERR_MEM; terom@19: terom@19: // init terom@19: thread->ctx = ctx; terom@19: terom@19: // start thread, default attrs terom@19: if (pthread_create(&thread->tid, NULL, pt_thread_main, thread)) terom@19: JUMP_SET_ERROR(err, PT_ERR_PTHREAD_CREATE); terom@19: terom@19: // add to pool terom@19: TAILQ_INSERT_TAIL(&ctx->threads, thread, ctx_threads); terom@19: terom@19: // ok terom@19: return 0; terom@19: terom@19: error: terom@19: // drop, don't try and remove from tailq... terom@19: free(thread); terom@19: terom@19: return err; terom@19: } terom@19: terom@19: int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads) terom@19: { terom@19: struct pt_ctx *ctx; terom@19: int err; terom@19: terom@19: // alloc terom@19: if ((ctx = calloc(1, sizeof(*ctx))) == NULL) terom@19: return -PT_ERR_MEM; terom@19: terom@19: // init terom@19: TAILQ_INIT(&ctx->threads); terom@19: TAILQ_INIT(&ctx->work); terom@19: pthread_mutex_init(&ctx->work_mutex, NULL); terom@19: pthread_cond_init(&ctx->work_cond, NULL); terom@19: pthread_cond_init(&ctx->idle_cond, NULL); terom@33: ctx->running = true; terom@19: terom@19: // start threadpool terom@19: for (int i = 0; i < threads; i++) { terom@19: if ((err = pt_ctx_add_thread(ctx))) terom@19: JUMP_ERROR(err); terom@19: } terom@19: terom@19: // ok terom@19: *ctx_ptr = ctx; terom@19: terom@19: return 0; terom@19: terom@19: error: terom@19: // cleanup terom@19: pt_ctx_destroy(ctx); terom@19: terom@19: return err; terom@19: } terom@19: terom@19: int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg) terom@19: { terom@19: struct pt_work *work; terom@19: terom@19: // check state terom@21: // XXX: this is kind of retarded, because pt_ctx_shutdown/work should only be called from the same thread... terom@19: if (!ctx->running) terom@19: RETURN_ERROR(PT_ERR_CTX_SHUTDOWN); terom@19: terom@21: // alloc terom@19: if ((work = calloc(1, sizeof(*work))) == NULL) terom@19: RETURN_ERROR(PT_ERR_MEM); terom@19: terom@19: // init terom@19: work->func = func; terom@19: work->arg = arg; terom@19: terom@19: // enqueue terom@19: pt_work_enqueue(ctx, work); terom@19: terom@19: // ok terom@19: return 0; terom@19: } terom@19: terom@19: int pt_ctx_shutdown (struct pt_ctx *ctx) terom@19: { terom@19: struct pt_thread *thread; terom@19: terom@33: // finish work terom@33: pt_work_shutdown(ctx); terom@19: terom@19: // shut down each thread terom@19: TAILQ_FOREACH(thread, &ctx->threads, ctx_threads) terom@19: // XXX: handle errors of some kind? terom@19: pt_thread_shutdown(thread); terom@19: terom@19: // then drop terom@19: pt_ctx_destroy(ctx); terom@19: terom@19: return 0; terom@19: } terom@19: terom@19: void pt_ctx_destroy (struct pt_ctx *ctx) terom@19: { terom@19: struct pt_thread *thread; terom@19: terom@19: // kill threads terom@19: while ((thread = TAILQ_FIRST(&ctx->threads))) terom@19: pt_thread_destroy(thread); terom@19: terom@19: // destroy mutex/conds terom@19: if (pthread_cond_destroy(&ctx->idle_cond)) terom@19: perror("pthread_cond_destroy(idle_cond)"); terom@19: terom@19: if (pthread_cond_destroy(&ctx->work_cond)) terom@19: perror("pthread_cond_destroy(work_cond)"); terom@19: terom@19: if (pthread_mutex_destroy(&ctx->work_mutex)) terom@19: perror("pthread_mutex_destroy(work_mutex)"); terom@19: terom@19: terom@19: free(ctx); terom@19: } terom@19: