don't use pthread_cancel to shutdown threads, it doesn't work...
--- a/src/lib/ctx.c Wed Jan 06 14:45:55 2010 +0200
+++ b/src/lib/ctx.c Wed Jan 06 16:05:02 2010 +0200
@@ -1,5 +1,6 @@
#include "ctx.h"
#include "error.h"
+#include "shared/log.h"
#include <stdlib.h>
#include <signal.h>
@@ -12,6 +13,8 @@
static void pt_mutex_unlock (void *arg)
{
pthread_mutex_t *mutex = arg;
+
+ log_debug("cleanup");
assert(!pthread_mutex_unlock(mutex));
}
@@ -37,39 +40,63 @@
}
/**
- * Dequeue a piece of work
+ * Dequeue a piece of work, returning true while there was still work to do
*/
-static void pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
+static bool pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
{
+ bool ret;
+
// acquire, cancel-safe
pthread_cleanup_push(pt_mutex_unlock, &ctx->work_mutex);
assert(!pthread_mutex_lock(&ctx->work_mutex));
- // idle?
- if (TAILQ_EMPTY(&ctx->work))
- assert(!pthread_cond_signal(&ctx->idle_cond));
-
// wait for work
- while (TAILQ_EMPTY(&ctx->work))
+ while (ctx->running && TAILQ_EMPTY(&ctx->work))
// we can expect to get pthread_cancel'd here
assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex));
+
+ // still got work?
+ if (!TAILQ_EMPTY(&ctx->work)) {
+ // pop work
+ *work_ptr = TAILQ_FIRST(&ctx->work);
+ TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
- // pop work
- *work_ptr = TAILQ_FIRST(&ctx->work);
- TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
+ log_debug("got work");
+
+ ret = true;
+
+ } else {
+ assert(!ctx->running);
+
+ // work empty, idle
+ assert(!pthread_cond_signal(&ctx->idle_cond));
+
+ log_debug("idle/dead");
+
+ // no more work
+ ret = false;
+ }
// release
pthread_cleanup_pop(true);
+
+ return ret;
}
/**
- * Wait for work queue to become empty
+ * Wait for work queue to empty and workers to finish
*/
-static void pt_work_wait_idle (struct pt_ctx *ctx)
+static void pt_work_shutdown (struct pt_ctx *ctx)
{
// acquire
assert(!pthread_mutex_lock(&ctx->work_mutex));
+ // indicate to terminate
+ ctx->running = false;
+
+ // wake up any idle workers
+ assert(!pthread_cond_broadcast(&ctx->work_cond));
+
// wait for it to drain...
while (!TAILQ_EMPTY(&ctx->work))
assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex));
@@ -101,14 +128,19 @@
{
struct pt_thread *thread = arg;
struct pt_work *work;
+
+ log_debug("start");
// if only life were so simple...
- while (true) {
- pt_work_dequeue(thread->ctx, &work);
+ while (pt_work_dequeue(thread->ctx, &work)) {
+ log_debug("work_execute");
pt_work_execute(work);
pt_work_release(work);
+ log_debug("work_done");
}
+ log_debug("exit");
+
return NULL;
}
@@ -119,11 +151,7 @@
*/
static void pt_thread_shutdown (struct pt_thread *thread)
{
- // signal it to stop at next cancel point (i.e. when waiting for work)
- if (pthread_cancel(thread->tid))
- perror("pthread_cancel");
-
- // reap
+ // wait for it to finish
if (pthread_join(thread->tid, NULL))
perror("pthread_join");
@@ -203,14 +231,13 @@
pthread_mutex_init(&ctx->work_mutex, NULL);
pthread_cond_init(&ctx->work_cond, NULL);
pthread_cond_init(&ctx->idle_cond, NULL);
+ ctx->running = true;
// start threadpool
for (int i = 0; i < threads; i++) {
if ((err = pt_ctx_add_thread(ctx)))
JUMP_ERROR(err);
}
-
- ctx->running = true;
// ok
*ctx_ptr = ctx;
@@ -252,11 +279,8 @@
{
struct pt_thread *thread;
- // stop accepting new work
- ctx->running = false;
-
- // wait for work queue to empty
- pt_work_wait_idle(ctx);
+ // finish work
+ pt_work_shutdown(ctx);
// shut down each thread
TAILQ_FOREACH(thread, &ctx->threads, ctx_threads)