don't use pthread_cancel to shutdown threads, it doesn't work...
authorTero Marttila <terom@fixme.fi>
Wed, 06 Jan 2010 16:05:02 +0200
changeset 33 0ed40e11b0e8
parent 32 aa168c7da551
child 34 a387bc77ad52
don't use pthread_cancel to shutdown threads, it doesn't work...
src/lib/ctx.c
--- a/src/lib/ctx.c	Wed Jan 06 14:45:55 2010 +0200
+++ b/src/lib/ctx.c	Wed Jan 06 16:05:02 2010 +0200
@@ -1,5 +1,6 @@
 #include "ctx.h"
 #include "error.h"
+#include "shared/log.h"
 
 #include <stdlib.h>
 #include <signal.h>
@@ -12,6 +13,8 @@
 static void pt_mutex_unlock (void *arg)
 {
     pthread_mutex_t *mutex = arg;
+    
+    log_debug("cleanup");
 
     assert(!pthread_mutex_unlock(mutex));
 }
@@ -37,39 +40,63 @@
 }
 
 /**
- * Dequeue a piece of work
+ * Dequeue a piece of work, returning true while there was still work to do
  */
-static void pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
+static bool pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
 {
+    bool ret;
+
     // acquire, cancel-safe
     pthread_cleanup_push(pt_mutex_unlock, &ctx->work_mutex);
     assert(!pthread_mutex_lock(&ctx->work_mutex));
 
-    // idle?
-    if (TAILQ_EMPTY(&ctx->work))
-        assert(!pthread_cond_signal(&ctx->idle_cond));
-
     // wait for work
-    while (TAILQ_EMPTY(&ctx->work))
+    while (ctx->running && TAILQ_EMPTY(&ctx->work))
         // we can expect to get pthread_cancel'd here
         assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex));
+    
+    // still got work?
+    if (!TAILQ_EMPTY(&ctx->work)) {
+        // pop work
+        *work_ptr = TAILQ_FIRST(&ctx->work);
+        TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
 
-    // pop work
-    *work_ptr = TAILQ_FIRST(&ctx->work);
-    TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
+        log_debug("got work");
+
+        ret = true;
+
+    } else {
+        assert(!ctx->running);
+
+        // work empty, idle
+        assert(!pthread_cond_signal(&ctx->idle_cond));
+        
+        log_debug("idle/dead");
+
+        // no more work
+        ret = false;
+    }
 
     // release
     pthread_cleanup_pop(true);
+
+    return ret;
 }
 
 /**
- * Wait for work queue to become empty
+ * Wait for work queue to empty and workers to finish
  */
-static void pt_work_wait_idle (struct pt_ctx *ctx)
+static void pt_work_shutdown (struct pt_ctx *ctx)
 {
     // acquire
     assert(!pthread_mutex_lock(&ctx->work_mutex));
     
+    // indicate to terminate
+    ctx->running = false;
+
+    // wake up any idle workers
+    assert(!pthread_cond_broadcast(&ctx->work_cond));
+    
     // wait for it to drain...
     while (!TAILQ_EMPTY(&ctx->work))
         assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex));
@@ -101,14 +128,19 @@
 {
     struct pt_thread *thread = arg;
     struct pt_work *work;
+    
+    log_debug("start");
 
     // if only life were so simple...
-    while (true) {
-        pt_work_dequeue(thread->ctx, &work);
+    while (pt_work_dequeue(thread->ctx, &work)) {
+        log_debug("work_execute");
         pt_work_execute(work);
         pt_work_release(work);
+        log_debug("work_done");
     }
 
+    log_debug("exit");
+
     return NULL;
 }
 
@@ -119,11 +151,7 @@
  */
 static void pt_thread_shutdown (struct pt_thread *thread)
 {
-    // signal it to stop at next cancel point (i.e. when waiting for work)
-    if (pthread_cancel(thread->tid))
-        perror("pthread_cancel");
-
-    // reap
+    // wait for it to finish
     if (pthread_join(thread->tid, NULL))
         perror("pthread_join");
 
@@ -203,14 +231,13 @@
     pthread_mutex_init(&ctx->work_mutex, NULL);
     pthread_cond_init(&ctx->work_cond, NULL);
     pthread_cond_init(&ctx->idle_cond, NULL);
+    ctx->running = true;
 
     // start threadpool
     for (int i = 0; i < threads; i++) {
         if ((err = pt_ctx_add_thread(ctx)))
             JUMP_ERROR(err);
     }
-    
-    ctx->running = true;
 
     // ok
     *ctx_ptr = ctx;
@@ -252,11 +279,8 @@
 {
     struct pt_thread *thread;
 
-    // stop accepting new work
-    ctx->running = false;
-
-    // wait for work queue to empty
-    pt_work_wait_idle(ctx);
+    // finish work
+    pt_work_shutdown(ctx);
 
     // shut down each thread
     TAILQ_FOREACH(thread, &ctx->threads, ctx_threads)