implement pt_ctx threadpool and pt_image_tile_async
authorTero Marttila <terom@fixme.fi>
Thu, 31 Dec 2009 16:40:13 +0200
changeset 19 ebcc49de97d0
parent 18 f92a24ab046e
child 20 f0d1011e8874
implement pt_ctx threadpool and pt_image_tile_async
Makefile
include/pngtile.h
src/lib/ctx.c
src/lib/ctx.h
src/lib/error.c
src/lib/image.c
src/lib/tile.c
src/lib/tile.h
src/util/main.c
--- a/Makefile	Thu Dec 31 14:01:37 2009 +0200
+++ b/Makefile	Thu Dec 31 16:40:13 2009 +0200
@@ -7,7 +7,7 @@
 CPPFLAGS = -Iinclude -Isrc/
 
 # libraries to use
-LOADLIBES = -lpng
+LOADLIBES = -lpng -lpthread
 
 # output name
 DIST_NAME = 78949E-as2
@@ -16,7 +16,7 @@
 all: depend lib/libpngtile.so bin/util
 
 lib/libpngtile.so : \
-	build/obj/lib/image.o build/obj/lib/cache.o build/obj/lib/tile.o build/obj/lib/error.o \
+	build/obj/lib/ctx.o build/obj/lib/image.o build/obj/lib/cache.o build/obj/lib/tile.o build/obj/lib/error.o \
 	build/obj/shared/util.o build/obj/shared/log.o
 
 lib/pypngtile.so : \
@@ -58,8 +58,9 @@
 build/obj/shared/%.o : src/shared/%.c
 	$(CC) -c -fPIC $(CPPFLAGS) $(CFLAGS) $< -o $@
 
+# XXX: hax in -pthread
 build/obj/lib/%.o : src/lib/%.c
-	$(CC) -c -fPIC $(CPPFLAGS) $(CFLAGS) $< -o $@
+	$(CC) -c -fPIC -pthread $(CPPFLAGS) $(CFLAGS) $< -o $@
 
 # general binary objects
 build/obj/%.o : src/%.c
--- a/include/pngtile.h	Thu Dec 31 14:01:37 2009 +0200
+++ b/include/pngtile.h	Thu Dec 31 16:40:13 2009 +0200
@@ -68,9 +68,22 @@
 };
 
 /**
- * TODO: impl
+ * Construct a new pt_ctx for use with further pt_image's.
+ *
+ * @param ctx_ptr returned pt_ctx handle
+ * @param threads number of worker threads to use for parralel operations, or zero to disable
  */
-int pt_ctx_new (struct pt_ctx **ctx_ptr);
+int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads);
+
+/**
+ * Shut down the given pt_ctx, waiting for any ongoing/pending operations to finish.
+ */
+int pt_ctx_shutdown (struct pt_ctx *ctx);
+
+/**
+ * Release the given pt_ctx without waiting for any ongoing operations to finish.
+ */
+void pt_ctx_destroy (struct pt_ctx *ctx);
 
 /**
  * Open a new pt_image for use.
@@ -100,6 +113,12 @@
 int pt_image_update (struct pt_image *image);
 
 /**
+ * Load the image's cache in read-only mode without trying to update it.
+ */
+// XXX: rename to pt_image_open?
+// TODO: int pt_image_load (struct pt_image *image);
+
+/**
  * Render a PNG tile to a FILE*.
  *
  * The PNG data will be written to the given stream, which will be flushed, but not closed.
@@ -119,6 +138,19 @@
 int pt_image_tile_mem (struct pt_image *image, const struct pt_tile_info *info, char **buf_ptr, size_t *len_ptr);
 
 /**
+ * Render a PNG tile to FILE* in a parralel manner.
+ *
+ * The PNG data will be written to \a out, which will be fclose()'d once done.
+ *
+ * This function may return before the PNG has been rendered.
+ *
+ * @param image render from image's cache. The cache must have been opened previously!
+ * @param info tile parameters
+ * @param out IO stream to write PNG data to, and close once done
+ */
+int pt_image_tile_async (struct pt_image *image, const struct pt_tile_info *info, FILE *out);
+
+/**
  * Release the given pt_image without any clean shutdown
  */
 void pt_image_destroy (struct pt_image *image);
@@ -151,6 +183,9 @@
 
     PT_ERR_TILE_CLIP,
 
+    PT_ERR_PTHREAD_CREATE,
+    PT_ERR_CTX_SHUTDOWN,
+
     PT_ERR_MAX,
 };
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ctx.c	Thu Dec 31 16:40:13 2009 +0200
@@ -0,0 +1,280 @@
+#include "ctx.h"
+#include "error.h"
+
+#include <stdlib.h>
+#include <signal.h>
+#include <assert.h>
+#include <stdio.h> // for perror
+
+/**
+ * Enqueue the given piece of work
+ *
+ * This function always succeeds.
+ */
+static void pt_work_enqueue (struct pt_ctx *ctx, struct pt_work *work)
+{
+    // acquire
+    assert(!pthread_mutex_lock(&ctx->work_mutex));
+
+    // enqueue work
+    TAILQ_INSERT_TAIL(&ctx->work, work, ctx_work);
+
+    // if there's a thread waiting, wake one up. Otherwise, some thread will find it once it finishes its current work
+    assert(!pthread_cond_signal(&ctx->work_cond));
+
+    // release 
+    assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Dequeue a piece of work
+ */
+static void pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
+{
+    // acquire
+    assert(!pthread_mutex_lock(&ctx->work_mutex));
+
+    // idle?
+    if (TAILQ_EMPTY(&ctx->work))
+        assert(!pthread_cond_signal(&ctx->idle_cond));
+
+    // wait for work
+    while (TAILQ_EMPTY(&ctx->work))
+        assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex));
+
+    // pop work
+    *work_ptr = TAILQ_FIRST(&ctx->work);
+    TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
+
+    // release
+    assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Wait for work queue to become empty
+ */
+static void pt_work_wait_idle (struct pt_ctx *ctx)
+{
+    // acquire
+    assert(!pthread_mutex_lock(&ctx->work_mutex));
+    
+    // wait for it to drain...
+    while (!TAILQ_EMPTY(&ctx->work))
+        assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex));
+
+    // release
+    assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Execute a piece of work
+ */
+static void pt_work_execute (struct pt_work *work)
+{
+    work->func(work->arg);
+}
+
+/**
+ * Release work state once done
+ */
+static void pt_work_release (struct pt_work *work)
+{
+    free(work);
+}
+
+/**
+ * Worker thread entry point
+ */
+static void* pt_thread_main (void *arg)
+{
+    struct pt_thread *thread = arg;
+    struct pt_work *work;
+
+    // if only life were so simple...
+    while (true) {
+        pt_work_dequeue(thread->ctx, &work);
+        pt_work_execute(work);
+        pt_work_release(work);
+    }
+
+    return NULL;
+}
+
+/**
+ * Shut down a pt_thread, waiting for it to finish.
+ *
+ * Does not remove the thread from the pool or release the pt_thread.
+ */
+static void pt_thread_shutdown (struct pt_thread *thread)
+{
+    // signal it to stop at next cancel point (i.e. when waiting for work)
+    if (pthread_cancel(thread->tid))
+        perror("pthread_cancel");
+
+    // reap
+    if (pthread_join(thread->tid, NULL))
+        perror("pthread_join");
+
+    // mark
+    thread->tid = 0;
+}
+
+/**
+ * Release pt_thread state, aborting thread if running.
+ *
+ * This is guaranteed to remove the thread from the ctx_threads if it was added.
+ */
+static void pt_thread_destroy (struct pt_thread *thread)
+{
+    if (thread->tid) {
+        // detach
+        if (pthread_detach(thread->tid))
+            perror("pthread_detach");
+
+        // abort thread
+        if (pthread_kill(thread->tid, SIGTERM))
+            perror("pthread_detach");
+
+    }
+        
+    // remove from pool
+    TAILQ_REMOVE(&thread->ctx->threads, thread, ctx_threads);
+    
+    free(thread);
+}
+
+
+/**
+ * Start a new thread and add it to the thread pool
+ */
+static int pt_ctx_add_thread (struct pt_ctx *ctx)
+{
+    struct pt_thread *thread;
+    int err;
+
+    // alloc
+    if ((thread = calloc(1, sizeof(*thread))) == NULL)
+        return -PT_ERR_MEM;
+
+    // init
+    thread->ctx = ctx;
+
+    // start thread, default attrs
+    if (pthread_create(&thread->tid, NULL, pt_thread_main, thread))
+        JUMP_SET_ERROR(err, PT_ERR_PTHREAD_CREATE);
+
+    // add to pool
+    TAILQ_INSERT_TAIL(&ctx->threads, thread, ctx_threads);
+
+    // ok
+    return 0;
+
+error:
+    // drop, don't try and remove from tailq...
+    free(thread);
+
+    return err;
+}
+
+int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads)
+{
+    struct pt_ctx *ctx;
+    int err;
+
+    // alloc
+    if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
+        return -PT_ERR_MEM;
+
+    // init
+    TAILQ_INIT(&ctx->threads);
+    TAILQ_INIT(&ctx->work);
+    pthread_mutex_init(&ctx->work_mutex, NULL);
+    pthread_cond_init(&ctx->work_cond, NULL);
+    pthread_cond_init(&ctx->idle_cond, NULL);
+
+    // start threadpool
+    for (int i = 0; i < threads; i++) {
+        if ((err = pt_ctx_add_thread(ctx)))
+            JUMP_ERROR(err);
+    }
+    
+    ctx->running = true;
+
+    // ok
+    *ctx_ptr = ctx;
+
+    return 0;
+        
+error:
+    // cleanup
+    pt_ctx_destroy(ctx);
+
+    return err;
+}
+
+int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg)
+{
+    struct pt_work *work;
+
+    // check state
+    if (!ctx->running)
+        RETURN_ERROR(PT_ERR_CTX_SHUTDOWN);
+
+    // construct
+    if ((work = calloc(1, sizeof(*work))) == NULL)
+        RETURN_ERROR(PT_ERR_MEM);
+
+    // init
+    work->func = func;
+    work->arg = arg;
+
+    // enqueue
+    pt_work_enqueue(ctx, work);
+
+    // ok
+    return 0;
+}
+
+int pt_ctx_shutdown (struct pt_ctx *ctx)
+{
+    struct pt_thread *thread;
+
+    // stop accepting new work
+    ctx->running = false;
+
+    // wait for work queue to empty
+    pt_work_wait_idle(ctx);
+
+    // shut down each thread
+    TAILQ_FOREACH(thread, &ctx->threads, ctx_threads)
+        // XXX: handle errors of some kind?
+        pt_thread_shutdown(thread);
+
+    // then drop
+    pt_ctx_destroy(ctx);
+
+    return 0;
+}
+
+void pt_ctx_destroy (struct pt_ctx *ctx)
+{
+    struct pt_thread *thread;
+
+    // kill threads
+    while ((thread = TAILQ_FIRST(&ctx->threads)))
+        pt_thread_destroy(thread);
+
+    // destroy mutex/conds
+    if (pthread_cond_destroy(&ctx->idle_cond))
+        perror("pthread_cond_destroy(idle_cond)");
+    
+    if (pthread_cond_destroy(&ctx->work_cond))
+        perror("pthread_cond_destroy(work_cond)");
+
+    if (pthread_mutex_destroy(&ctx->work_mutex))
+        perror("pthread_mutex_destroy(work_mutex)");
+   
+
+    free(ctx);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ctx.h	Thu Dec 31 16:40:13 2009 +0200
@@ -0,0 +1,75 @@
+#ifndef PNGTILE_CTX_H
+#define PNGTILE_CTX_H
+
+/**
+ * Shared context between images, used to provide a threadpool for parralelizing tile operations
+ */
+#include "pngtile.h"
+
+#include <pthread.h>
+#include <sys/queue.h>
+#include <stdbool.h>
+
+/**
+ * Worker thread
+ */
+struct pt_thread {
+    /** Shared context */
+    struct pt_ctx *ctx;
+
+    /** Thread handle */
+    pthread_t tid;
+
+    /** @see pt_ctx::threads */
+    TAILQ_ENTRY(pt_thread) ctx_threads;
+};
+
+/**
+ * Work function
+ */
+typedef void (*pt_work_func) (void *arg);
+
+/**
+ * Work that needs to be done
+ */
+struct pt_work {
+    /** Work func */
+    pt_work_func func;
+
+    /** Work info */
+    void *arg;
+    
+    /** @see pt_ctx::work */
+    TAILQ_ENTRY(pt_work) ctx_work;
+};
+
+/**
+ * Shared context
+ */
+struct pt_ctx {
+    /** Accepting new work */
+    bool running;
+
+    /** Threadpool */
+    TAILQ_HEAD(pt_ctx_threads, pt_thread) threads;
+
+    /** Pending work */
+    TAILQ_HEAD(pt_ctx_work, pt_work) work;
+
+    /** Control access to ::work */
+    pthread_mutex_t work_mutex;
+
+    /** Wait for work to become available */
+    pthread_cond_t work_cond;
+
+    /** Thread is idle */
+    pthread_cond_t idle_cond;
+};
+
+/**
+ * Enqueue a work unit
+ */
+int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg);
+
+
+#endif
--- a/src/lib/error.c	Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/error.c	Thu Dec 31 16:40:13 2009 +0200
@@ -26,6 +26,9 @@
     [PT_ERR_CACHE_MMAP]         = "mmap(.cache)",
     [PT_ERR_CACHE_RENAME_TMP]   = "rename(.tmp, .cache)",
 
+    [PT_ERR_PTHREAD_CREATE]     = "pthread_create",
+    [PT_ERR_CTX_SHUTDOWN]       = "pt_ctx is shutting down",
+
     [PT_ERR_TILE_CLIP]          = "Tile outside of image",
 };
 
--- a/src/lib/image.c	Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/image.c	Thu Dec 31 16:40:13 2009 +0200
@@ -1,8 +1,10 @@
 #include "image.h"
+#include "ctx.h"
 #include "cache.h"
 #include "tile.h"
 #include "error.h"
 #include "shared/util.h"
+#include "shared/log.h"
 
 #include <stdlib.h>
 #include <errno.h>
@@ -230,11 +232,11 @@
     int err;
 
     // init
-    if ((err = pt_tile_init_file(&tile, info, out)))
+    if ((err = pt_tile_init_file(&tile, image->cache, info, out)))
         return err;
 
     // render
-    if ((err = pt_tile_render(&tile, image->cache)))
+    if ((err = pt_tile_render(&tile)))
         JUMP_ERROR(err);
 
     // ok
@@ -252,11 +254,11 @@
     int err;
 
     // init
-    if ((err = pt_tile_init_mem(&tile, info)))
+    if ((err = pt_tile_init_mem(&tile, image->cache, info)))
         return err;
 
     // render
-    if ((err = pt_tile_render(&tile, image->cache)))
+    if ((err = pt_tile_render(&tile)))
         JUMP_ERROR(err);
 
     // ok
@@ -271,6 +273,46 @@
     return err;
 }
 
+static void _pt_image_tile_async (void *arg)
+{
+    struct pt_tile *tile = arg;
+    int err;
+
+    // do render op
+    if ((err = pt_tile_render(tile)))
+        log_warn_errno("pt_tile_render: %s", pt_strerror(err));
+
+    // signal done
+    if (fclose(tile->out.file))
+        log_warn_errno("fclose");
+}
+
+int pt_image_tile_async (struct pt_image *image, const struct pt_tile_info *info, FILE *out)
+{
+    struct pt_tile *tile;
+    int err;
+
+    // alloc
+    if ((err = pt_tile_new(&tile)))
+        return err;
+
+    // init
+    if ((err = pt_tile_init_file(tile, image->cache, info, out)))
+        JUMP_ERROR(err);
+    
+    // enqueue work
+    if ((err = pt_ctx_work(image->ctx, _pt_image_tile_async, tile)))
+        JUMP_ERROR(err);
+
+    // ok, running
+    return 0;
+
+error:
+    pt_tile_destroy(tile);
+
+    return err;
+}
+
 void pt_image_destroy (struct pt_image *image)
 {
     free(image->path);
@@ -280,3 +322,4 @@
 
     free(image);
 }
+
--- a/src/lib/tile.c	Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/tile.c	Thu Dec 31 16:40:13 2009 +0200
@@ -4,28 +4,38 @@
 
 #include <stdlib.h>
 
-static void pt_tile_init (struct pt_tile *tile, const struct pt_tile_info *info, enum pt_tile_output out_type)
+int pt_tile_new (struct pt_tile **tile_ptr)
 {
-    memset(tile, 0, sizeof(*tile));
-    
+    struct pt_tile *tile;
+
+    if ((tile = calloc(1, sizeof(*tile))) == NULL)
+        return -PT_ERR_MEM;
+
+    *tile_ptr = tile;
+
+    return 0;
+}
+
+static void pt_tile_init (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, enum pt_tile_output out_type)
+{
     // init
+    tile->cache = cache;
     tile->info = *info;
     tile->out_type = out_type;
 }
 
-
-int pt_tile_init_file (struct pt_tile *tile, const struct pt_tile_info *info, FILE *out)
+int pt_tile_init_file (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, FILE *out)
 {
-    pt_tile_init(tile, info, PT_TILE_OUT_FILE);
+    pt_tile_init(tile, cache, info, PT_TILE_OUT_FILE);
 
     tile->out.file = out;
 
     return 0;
 }
 
-int pt_tile_init_mem (struct pt_tile *tile, const struct pt_tile_info *info)
+int pt_tile_init_mem (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info)
 {
-    pt_tile_init(tile, info, PT_TILE_OUT_MEM);
+    pt_tile_init(tile, cache, info, PT_TILE_OUT_MEM);
     
     // init buffer
     if ((tile->out.mem.base = malloc(PT_TILE_BUF_SIZE)) == NULL)
@@ -68,7 +78,7 @@
 }
 
 
-int pt_tile_render (struct pt_tile *tile, struct pt_cache *cache)
+int pt_tile_render (struct pt_tile *tile)
 {
     png_structp png = NULL;
     png_infop info = NULL;
@@ -104,7 +114,7 @@
     }
 
     // render tile
-    if ((err = pt_cache_tile_png(cache, png, info, &tile->info)))
+    if ((err = pt_cache_tile_png(tile->cache, png, info, &tile->info)))
         JUMP_ERROR(err);
 
     // done
@@ -133,3 +143,9 @@
     }
 }
 
+void pt_tile_destroy (struct pt_tile *tile)
+{
+    pt_tile_abort(tile);
+
+    free(tile);
+}
--- a/src/lib/tile.h	Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/tile.h	Thu Dec 31 16:40:13 2009 +0200
@@ -20,6 +20,9 @@
 
 /** Per-tile-render state */
 struct pt_tile {
+    /** Cache to render from */
+    struct pt_cache *cache;
+
     /** Render spec */
     struct pt_tile_info info;
 
@@ -39,23 +42,33 @@
 };
 
 /**
+ * Alloc a new pt_tile, which must be initialized using pt_tile_init_*
+ */
+int pt_tile_new (struct pt_tile **tile_ptr);
+
+/**
  * Initialize to render with given params, writing output to given FILE*
  */
-int pt_tile_init_file (struct pt_tile *tile, const struct pt_tile_info *info, FILE *out);
+int pt_tile_init_file (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, FILE *out);
 
 /**
  * Initialize to render with given params, writing output to a memory buffer
  */
-int pt_tile_init_mem (struct pt_tile *tile, const struct pt_tile_info *info);
+int pt_tile_init_mem (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info);
 
 /**
  * Render PNG data from given cache according to parameters given to pt_tile_init_*
  */
-int pt_tile_render (struct pt_tile *tile, struct pt_cache *cache);
+int pt_tile_render (struct pt_tile *tile);
 
 /**
  * Abort any failed render process, cleaning up.
  */
 void pt_tile_abort (struct pt_tile *tile);
 
+/**
+ * Destroy given pt_tile, aborting it and freeing it
+ */
+void pt_tile_destroy (struct pt_tile *tile);
+
 #endif
--- a/src/util/main.c	Thu Dec 31 14:01:37 2009 +0200
+++ b/src/util/main.c	Thu Dec 31 16:40:13 2009 +0200
@@ -19,6 +19,7 @@
     { "height",         true,   NULL,   'H' },
     { "x",              true,   NULL,   'x' },
     { "y",              true,   NULL,   'y' },
+    { "threads",        true,   NULL,   'j' },
     { 0,                0,      0,      0   }
 };
 
@@ -40,6 +41,7 @@
         "\t-H, --height         set tile height\n"
         "\t-x, --x              set tile x offset\n"
         "\t-y, --y              set tile z offset\n"
+        "\t-j, --threads        number of threads\n"
     );
 }
 
@@ -48,7 +50,8 @@
     int opt;
     bool force_update = false;
     struct pt_tile_info ti = {0, 0, 0, 0};
-    int err;
+    int threads = 2;
+    int tmp, err;
     
     // parse arguments
     while ((opt = getopt_long(argc, argv, "hqvDUW:H:x:y:", options, NULL)) != -1) {
@@ -90,6 +93,12 @@
             case 'y':
                 ti.y = strtol(optarg, NULL, 0); break;
 
+            case 'j':
+                if ((tmp = strtol(optarg, NULL, 0)) < 1)
+                    FATAL("Invalid value for -j/--threads");
+
+                threads = tmp; break;
+
             case '?':
                 // useage error
                 help(argv[0]);
@@ -112,6 +121,14 @@
     struct pt_image *image = NULL;
     enum pt_cache_status status;
 
+    // build ctx
+    log_debug("Construct pt_ctx with %d threads", threads);
+
+    if ((err = pt_ctx_new(&ctx, threads)))
+        EXIT_ERROR(EXIT_FAILURE, "pt_ctx_new: threads=%d", threads);
+
+    
+    // process each image in turn
     log_debug("Processing %d images...", argc - optind);
 
     for (int i = optind; i < argc; i++) {
@@ -167,18 +184,25 @@
 
         // render tile?
         if (ti.width && ti.height) {
-            log_debug("Render tile %zux%zu@(%zu,%zu) -> stdout", ti.width, ti.height, ti.x, ti.y);
+            log_debug("Async render tile %zux%zu@(%zu,%zu) -> stdout", ti.width, ti.height, ti.x, ti.y);
 
-            if ((err = pt_image_tile_file(image, &ti, stdout)))
+            if ((err = pt_image_tile_async(image, &ti, stdout)))
                 log_errno("pt_image_tile: %s: %s", img_path, pt_strerror(err));
         }
 
 error:
         // cleanup
-        pt_image_destroy(image);
+        // XXX: leak because of async: pt_image_destroy(image);
+        ;
     }
 
-    // XXX: done
+    log_info("Waiting for images to finish...");
+
+    // wait for tile operations to finish...
+    pt_ctx_shutdown(ctx);
+
+    log_info("Done");
+
     return 0;
 }