--- a/Makefile Thu Dec 31 14:01:37 2009 +0200
+++ b/Makefile Thu Dec 31 16:40:13 2009 +0200
@@ -7,7 +7,7 @@
CPPFLAGS = -Iinclude -Isrc/
# libraries to use
-LOADLIBES = -lpng
+LOADLIBES = -lpng -lpthread
# output name
DIST_NAME = 78949E-as2
@@ -16,7 +16,7 @@
all: depend lib/libpngtile.so bin/util
lib/libpngtile.so : \
- build/obj/lib/image.o build/obj/lib/cache.o build/obj/lib/tile.o build/obj/lib/error.o \
+ build/obj/lib/ctx.o build/obj/lib/image.o build/obj/lib/cache.o build/obj/lib/tile.o build/obj/lib/error.o \
build/obj/shared/util.o build/obj/shared/log.o
lib/pypngtile.so : \
@@ -58,8 +58,9 @@
build/obj/shared/%.o : src/shared/%.c
$(CC) -c -fPIC $(CPPFLAGS) $(CFLAGS) $< -o $@
+# XXX: hax in -pthread
build/obj/lib/%.o : src/lib/%.c
- $(CC) -c -fPIC $(CPPFLAGS) $(CFLAGS) $< -o $@
+ $(CC) -c -fPIC -pthread $(CPPFLAGS) $(CFLAGS) $< -o $@
# general binary objects
build/obj/%.o : src/%.c
--- a/include/pngtile.h Thu Dec 31 14:01:37 2009 +0200
+++ b/include/pngtile.h Thu Dec 31 16:40:13 2009 +0200
@@ -68,9 +68,22 @@
};
/**
- * TODO: impl
+ * Construct a new pt_ctx for use with further pt_image's.
+ *
+ * @param ctx_ptr returned pt_ctx handle
+ * @param threads number of worker threads to use for parralel operations, or zero to disable
*/
-int pt_ctx_new (struct pt_ctx **ctx_ptr);
+int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads);
+
+/**
+ * Shut down the given pt_ctx, waiting for any ongoing/pending operations to finish.
+ */
+int pt_ctx_shutdown (struct pt_ctx *ctx);
+
+/**
+ * Release the given pt_ctx without waiting for any ongoing operations to finish.
+ */
+void pt_ctx_destroy (struct pt_ctx *ctx);
/**
* Open a new pt_image for use.
@@ -100,6 +113,12 @@
int pt_image_update (struct pt_image *image);
/**
+ * Load the image's cache in read-only mode without trying to update it.
+ */
+// XXX: rename to pt_image_open?
+// TODO: int pt_image_load (struct pt_image *image);
+
+/**
* Render a PNG tile to a FILE*.
*
* The PNG data will be written to the given stream, which will be flushed, but not closed.
@@ -119,6 +138,19 @@
int pt_image_tile_mem (struct pt_image *image, const struct pt_tile_info *info, char **buf_ptr, size_t *len_ptr);
/**
+ * Render a PNG tile to FILE* in a parralel manner.
+ *
+ * The PNG data will be written to \a out, which will be fclose()'d once done.
+ *
+ * This function may return before the PNG has been rendered.
+ *
+ * @param image render from image's cache. The cache must have been opened previously!
+ * @param info tile parameters
+ * @param out IO stream to write PNG data to, and close once done
+ */
+int pt_image_tile_async (struct pt_image *image, const struct pt_tile_info *info, FILE *out);
+
+/**
* Release the given pt_image without any clean shutdown
*/
void pt_image_destroy (struct pt_image *image);
@@ -151,6 +183,9 @@
PT_ERR_TILE_CLIP,
+ PT_ERR_PTHREAD_CREATE,
+ PT_ERR_CTX_SHUTDOWN,
+
PT_ERR_MAX,
};
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ctx.c Thu Dec 31 16:40:13 2009 +0200
@@ -0,0 +1,280 @@
+#include "ctx.h"
+#include "error.h"
+
+#include <stdlib.h>
+#include <signal.h>
+#include <assert.h>
+#include <stdio.h> // for perror
+
+/**
+ * Enqueue the given piece of work
+ *
+ * This function always succeeds.
+ */
+static void pt_work_enqueue (struct pt_ctx *ctx, struct pt_work *work)
+{
+ // acquire
+ assert(!pthread_mutex_lock(&ctx->work_mutex));
+
+ // enqueue work
+ TAILQ_INSERT_TAIL(&ctx->work, work, ctx_work);
+
+ // if there's a thread waiting, wake one up. Otherwise, some thread will find it once it finishes its current work
+ assert(!pthread_cond_signal(&ctx->work_cond));
+
+ // release
+ assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Dequeue a piece of work
+ */
+static void pt_work_dequeue (struct pt_ctx *ctx, struct pt_work **work_ptr)
+{
+ // acquire
+ assert(!pthread_mutex_lock(&ctx->work_mutex));
+
+ // idle?
+ if (TAILQ_EMPTY(&ctx->work))
+ assert(!pthread_cond_signal(&ctx->idle_cond));
+
+ // wait for work
+ while (TAILQ_EMPTY(&ctx->work))
+ assert(!pthread_cond_wait(&ctx->work_cond, &ctx->work_mutex));
+
+ // pop work
+ *work_ptr = TAILQ_FIRST(&ctx->work);
+ TAILQ_REMOVE(&ctx->work, *work_ptr, ctx_work);
+
+ // release
+ assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Wait for work queue to become empty
+ */
+static void pt_work_wait_idle (struct pt_ctx *ctx)
+{
+ // acquire
+ assert(!pthread_mutex_lock(&ctx->work_mutex));
+
+ // wait for it to drain...
+ while (!TAILQ_EMPTY(&ctx->work))
+ assert(!pthread_cond_wait(&ctx->idle_cond, &ctx->work_mutex));
+
+ // release
+ assert(!pthread_mutex_unlock(&ctx->work_mutex));
+}
+
+/**
+ * Execute a piece of work
+ */
+static void pt_work_execute (struct pt_work *work)
+{
+ work->func(work->arg);
+}
+
+/**
+ * Release work state once done
+ */
+static void pt_work_release (struct pt_work *work)
+{
+ free(work);
+}
+
+/**
+ * Worker thread entry point
+ */
+static void* pt_thread_main (void *arg)
+{
+ struct pt_thread *thread = arg;
+ struct pt_work *work;
+
+ // if only life were so simple...
+ while (true) {
+ pt_work_dequeue(thread->ctx, &work);
+ pt_work_execute(work);
+ pt_work_release(work);
+ }
+
+ return NULL;
+}
+
+/**
+ * Shut down a pt_thread, waiting for it to finish.
+ *
+ * Does not remove the thread from the pool or release the pt_thread.
+ */
+static void pt_thread_shutdown (struct pt_thread *thread)
+{
+ // signal it to stop at next cancel point (i.e. when waiting for work)
+ if (pthread_cancel(thread->tid))
+ perror("pthread_cancel");
+
+ // reap
+ if (pthread_join(thread->tid, NULL))
+ perror("pthread_join");
+
+ // mark
+ thread->tid = 0;
+}
+
+/**
+ * Release pt_thread state, aborting thread if running.
+ *
+ * This is guaranteed to remove the thread from the ctx_threads if it was added.
+ */
+static void pt_thread_destroy (struct pt_thread *thread)
+{
+ if (thread->tid) {
+ // detach
+ if (pthread_detach(thread->tid))
+ perror("pthread_detach");
+
+ // abort thread
+ if (pthread_kill(thread->tid, SIGTERM))
+ perror("pthread_detach");
+
+ }
+
+ // remove from pool
+ TAILQ_REMOVE(&thread->ctx->threads, thread, ctx_threads);
+
+ free(thread);
+}
+
+
+/**
+ * Start a new thread and add it to the thread pool
+ */
+static int pt_ctx_add_thread (struct pt_ctx *ctx)
+{
+ struct pt_thread *thread;
+ int err;
+
+ // alloc
+ if ((thread = calloc(1, sizeof(*thread))) == NULL)
+ return -PT_ERR_MEM;
+
+ // init
+ thread->ctx = ctx;
+
+ // start thread, default attrs
+ if (pthread_create(&thread->tid, NULL, pt_thread_main, thread))
+ JUMP_SET_ERROR(err, PT_ERR_PTHREAD_CREATE);
+
+ // add to pool
+ TAILQ_INSERT_TAIL(&ctx->threads, thread, ctx_threads);
+
+ // ok
+ return 0;
+
+error:
+ // drop, don't try and remove from tailq...
+ free(thread);
+
+ return err;
+}
+
+int pt_ctx_new (struct pt_ctx **ctx_ptr, int threads)
+{
+ struct pt_ctx *ctx;
+ int err;
+
+ // alloc
+ if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
+ return -PT_ERR_MEM;
+
+ // init
+ TAILQ_INIT(&ctx->threads);
+ TAILQ_INIT(&ctx->work);
+ pthread_mutex_init(&ctx->work_mutex, NULL);
+ pthread_cond_init(&ctx->work_cond, NULL);
+ pthread_cond_init(&ctx->idle_cond, NULL);
+
+ // start threadpool
+ for (int i = 0; i < threads; i++) {
+ if ((err = pt_ctx_add_thread(ctx)))
+ JUMP_ERROR(err);
+ }
+
+ ctx->running = true;
+
+ // ok
+ *ctx_ptr = ctx;
+
+ return 0;
+
+error:
+ // cleanup
+ pt_ctx_destroy(ctx);
+
+ return err;
+}
+
+int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg)
+{
+ struct pt_work *work;
+
+ // check state
+ if (!ctx->running)
+ RETURN_ERROR(PT_ERR_CTX_SHUTDOWN);
+
+ // construct
+ if ((work = calloc(1, sizeof(*work))) == NULL)
+ RETURN_ERROR(PT_ERR_MEM);
+
+ // init
+ work->func = func;
+ work->arg = arg;
+
+ // enqueue
+ pt_work_enqueue(ctx, work);
+
+ // ok
+ return 0;
+}
+
+int pt_ctx_shutdown (struct pt_ctx *ctx)
+{
+ struct pt_thread *thread;
+
+ // stop accepting new work
+ ctx->running = false;
+
+ // wait for work queue to empty
+ pt_work_wait_idle(ctx);
+
+ // shut down each thread
+ TAILQ_FOREACH(thread, &ctx->threads, ctx_threads)
+ // XXX: handle errors of some kind?
+ pt_thread_shutdown(thread);
+
+ // then drop
+ pt_ctx_destroy(ctx);
+
+ return 0;
+}
+
+void pt_ctx_destroy (struct pt_ctx *ctx)
+{
+ struct pt_thread *thread;
+
+ // kill threads
+ while ((thread = TAILQ_FIRST(&ctx->threads)))
+ pt_thread_destroy(thread);
+
+ // destroy mutex/conds
+ if (pthread_cond_destroy(&ctx->idle_cond))
+ perror("pthread_cond_destroy(idle_cond)");
+
+ if (pthread_cond_destroy(&ctx->work_cond))
+ perror("pthread_cond_destroy(work_cond)");
+
+ if (pthread_mutex_destroy(&ctx->work_mutex))
+ perror("pthread_mutex_destroy(work_mutex)");
+
+
+ free(ctx);
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ctx.h Thu Dec 31 16:40:13 2009 +0200
@@ -0,0 +1,75 @@
+#ifndef PNGTILE_CTX_H
+#define PNGTILE_CTX_H
+
+/**
+ * Shared context between images, used to provide a threadpool for parralelizing tile operations
+ */
+#include "pngtile.h"
+
+#include <pthread.h>
+#include <sys/queue.h>
+#include <stdbool.h>
+
+/**
+ * Worker thread
+ */
+struct pt_thread {
+ /** Shared context */
+ struct pt_ctx *ctx;
+
+ /** Thread handle */
+ pthread_t tid;
+
+ /** @see pt_ctx::threads */
+ TAILQ_ENTRY(pt_thread) ctx_threads;
+};
+
+/**
+ * Work function
+ */
+typedef void (*pt_work_func) (void *arg);
+
+/**
+ * Work that needs to be done
+ */
+struct pt_work {
+ /** Work func */
+ pt_work_func func;
+
+ /** Work info */
+ void *arg;
+
+ /** @see pt_ctx::work */
+ TAILQ_ENTRY(pt_work) ctx_work;
+};
+
+/**
+ * Shared context
+ */
+struct pt_ctx {
+ /** Accepting new work */
+ bool running;
+
+ /** Threadpool */
+ TAILQ_HEAD(pt_ctx_threads, pt_thread) threads;
+
+ /** Pending work */
+ TAILQ_HEAD(pt_ctx_work, pt_work) work;
+
+ /** Control access to ::work */
+ pthread_mutex_t work_mutex;
+
+ /** Wait for work to become available */
+ pthread_cond_t work_cond;
+
+ /** Thread is idle */
+ pthread_cond_t idle_cond;
+};
+
+/**
+ * Enqueue a work unit
+ */
+int pt_ctx_work (struct pt_ctx *ctx, pt_work_func func, void *arg);
+
+
+#endif
--- a/src/lib/error.c Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/error.c Thu Dec 31 16:40:13 2009 +0200
@@ -26,6 +26,9 @@
[PT_ERR_CACHE_MMAP] = "mmap(.cache)",
[PT_ERR_CACHE_RENAME_TMP] = "rename(.tmp, .cache)",
+ [PT_ERR_PTHREAD_CREATE] = "pthread_create",
+ [PT_ERR_CTX_SHUTDOWN] = "pt_ctx is shutting down",
+
[PT_ERR_TILE_CLIP] = "Tile outside of image",
};
--- a/src/lib/image.c Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/image.c Thu Dec 31 16:40:13 2009 +0200
@@ -1,8 +1,10 @@
#include "image.h"
+#include "ctx.h"
#include "cache.h"
#include "tile.h"
#include "error.h"
#include "shared/util.h"
+#include "shared/log.h"
#include <stdlib.h>
#include <errno.h>
@@ -230,11 +232,11 @@
int err;
// init
- if ((err = pt_tile_init_file(&tile, info, out)))
+ if ((err = pt_tile_init_file(&tile, image->cache, info, out)))
return err;
// render
- if ((err = pt_tile_render(&tile, image->cache)))
+ if ((err = pt_tile_render(&tile)))
JUMP_ERROR(err);
// ok
@@ -252,11 +254,11 @@
int err;
// init
- if ((err = pt_tile_init_mem(&tile, info)))
+ if ((err = pt_tile_init_mem(&tile, image->cache, info)))
return err;
// render
- if ((err = pt_tile_render(&tile, image->cache)))
+ if ((err = pt_tile_render(&tile)))
JUMP_ERROR(err);
// ok
@@ -271,6 +273,46 @@
return err;
}
+static void _pt_image_tile_async (void *arg)
+{
+ struct pt_tile *tile = arg;
+ int err;
+
+ // do render op
+ if ((err = pt_tile_render(tile)))
+ log_warn_errno("pt_tile_render: %s", pt_strerror(err));
+
+ // signal done
+ if (fclose(tile->out.file))
+ log_warn_errno("fclose");
+}
+
+int pt_image_tile_async (struct pt_image *image, const struct pt_tile_info *info, FILE *out)
+{
+ struct pt_tile *tile;
+ int err;
+
+ // alloc
+ if ((err = pt_tile_new(&tile)))
+ return err;
+
+ // init
+ if ((err = pt_tile_init_file(tile, image->cache, info, out)))
+ JUMP_ERROR(err);
+
+ // enqueue work
+ if ((err = pt_ctx_work(image->ctx, _pt_image_tile_async, tile)))
+ JUMP_ERROR(err);
+
+ // ok, running
+ return 0;
+
+error:
+ pt_tile_destroy(tile);
+
+ return err;
+}
+
void pt_image_destroy (struct pt_image *image)
{
free(image->path);
@@ -280,3 +322,4 @@
free(image);
}
+
--- a/src/lib/tile.c Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/tile.c Thu Dec 31 16:40:13 2009 +0200
@@ -4,28 +4,38 @@
#include <stdlib.h>
-static void pt_tile_init (struct pt_tile *tile, const struct pt_tile_info *info, enum pt_tile_output out_type)
+int pt_tile_new (struct pt_tile **tile_ptr)
{
- memset(tile, 0, sizeof(*tile));
-
+ struct pt_tile *tile;
+
+ if ((tile = calloc(1, sizeof(*tile))) == NULL)
+ return -PT_ERR_MEM;
+
+ *tile_ptr = tile;
+
+ return 0;
+}
+
+static void pt_tile_init (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, enum pt_tile_output out_type)
+{
// init
+ tile->cache = cache;
tile->info = *info;
tile->out_type = out_type;
}
-
-int pt_tile_init_file (struct pt_tile *tile, const struct pt_tile_info *info, FILE *out)
+int pt_tile_init_file (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, FILE *out)
{
- pt_tile_init(tile, info, PT_TILE_OUT_FILE);
+ pt_tile_init(tile, cache, info, PT_TILE_OUT_FILE);
tile->out.file = out;
return 0;
}
-int pt_tile_init_mem (struct pt_tile *tile, const struct pt_tile_info *info)
+int pt_tile_init_mem (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info)
{
- pt_tile_init(tile, info, PT_TILE_OUT_MEM);
+ pt_tile_init(tile, cache, info, PT_TILE_OUT_MEM);
// init buffer
if ((tile->out.mem.base = malloc(PT_TILE_BUF_SIZE)) == NULL)
@@ -68,7 +78,7 @@
}
-int pt_tile_render (struct pt_tile *tile, struct pt_cache *cache)
+int pt_tile_render (struct pt_tile *tile)
{
png_structp png = NULL;
png_infop info = NULL;
@@ -104,7 +114,7 @@
}
// render tile
- if ((err = pt_cache_tile_png(cache, png, info, &tile->info)))
+ if ((err = pt_cache_tile_png(tile->cache, png, info, &tile->info)))
JUMP_ERROR(err);
// done
@@ -133,3 +143,9 @@
}
}
+void pt_tile_destroy (struct pt_tile *tile)
+{
+ pt_tile_abort(tile);
+
+ free(tile);
+}
--- a/src/lib/tile.h Thu Dec 31 14:01:37 2009 +0200
+++ b/src/lib/tile.h Thu Dec 31 16:40:13 2009 +0200
@@ -20,6 +20,9 @@
/** Per-tile-render state */
struct pt_tile {
+ /** Cache to render from */
+ struct pt_cache *cache;
+
/** Render spec */
struct pt_tile_info info;
@@ -39,23 +42,33 @@
};
/**
+ * Alloc a new pt_tile, which must be initialized using pt_tile_init_*
+ */
+int pt_tile_new (struct pt_tile **tile_ptr);
+
+/**
* Initialize to render with given params, writing output to given FILE*
*/
-int pt_tile_init_file (struct pt_tile *tile, const struct pt_tile_info *info, FILE *out);
+int pt_tile_init_file (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info, FILE *out);
/**
* Initialize to render with given params, writing output to a memory buffer
*/
-int pt_tile_init_mem (struct pt_tile *tile, const struct pt_tile_info *info);
+int pt_tile_init_mem (struct pt_tile *tile, struct pt_cache *cache, const struct pt_tile_info *info);
/**
* Render PNG data from given cache according to parameters given to pt_tile_init_*
*/
-int pt_tile_render (struct pt_tile *tile, struct pt_cache *cache);
+int pt_tile_render (struct pt_tile *tile);
/**
* Abort any failed render process, cleaning up.
*/
void pt_tile_abort (struct pt_tile *tile);
+/**
+ * Destroy given pt_tile, aborting it and freeing it
+ */
+void pt_tile_destroy (struct pt_tile *tile);
+
#endif
--- a/src/util/main.c Thu Dec 31 14:01:37 2009 +0200
+++ b/src/util/main.c Thu Dec 31 16:40:13 2009 +0200
@@ -19,6 +19,7 @@
{ "height", true, NULL, 'H' },
{ "x", true, NULL, 'x' },
{ "y", true, NULL, 'y' },
+ { "threads", true, NULL, 'j' },
{ 0, 0, 0, 0 }
};
@@ -40,6 +41,7 @@
"\t-H, --height set tile height\n"
"\t-x, --x set tile x offset\n"
"\t-y, --y set tile z offset\n"
+ "\t-j, --threads number of threads\n"
);
}
@@ -48,7 +50,8 @@
int opt;
bool force_update = false;
struct pt_tile_info ti = {0, 0, 0, 0};
- int err;
+ int threads = 2;
+ int tmp, err;
// parse arguments
while ((opt = getopt_long(argc, argv, "hqvDUW:H:x:y:", options, NULL)) != -1) {
@@ -90,6 +93,12 @@
case 'y':
ti.y = strtol(optarg, NULL, 0); break;
+ case 'j':
+ if ((tmp = strtol(optarg, NULL, 0)) < 1)
+ FATAL("Invalid value for -j/--threads");
+
+ threads = tmp; break;
+
case '?':
// useage error
help(argv[0]);
@@ -112,6 +121,14 @@
struct pt_image *image = NULL;
enum pt_cache_status status;
+ // build ctx
+ log_debug("Construct pt_ctx with %d threads", threads);
+
+ if ((err = pt_ctx_new(&ctx, threads)))
+ EXIT_ERROR(EXIT_FAILURE, "pt_ctx_new: threads=%d", threads);
+
+
+ // process each image in turn
log_debug("Processing %d images...", argc - optind);
for (int i = optind; i < argc; i++) {
@@ -167,18 +184,25 @@
// render tile?
if (ti.width && ti.height) {
- log_debug("Render tile %zux%zu@(%zu,%zu) -> stdout", ti.width, ti.height, ti.x, ti.y);
+ log_debug("Async render tile %zux%zu@(%zu,%zu) -> stdout", ti.width, ti.height, ti.x, ti.y);
- if ((err = pt_image_tile_file(image, &ti, stdout)))
+ if ((err = pt_image_tile_async(image, &ti, stdout)))
log_errno("pt_image_tile: %s: %s", img_path, pt_strerror(err));
}
error:
// cleanup
- pt_image_destroy(image);
+ // XXX: leak because of async: pt_image_destroy(image);
+ ;
}
- // XXX: done
+ log_info("Waiting for images to finish...");
+
+ // wait for tile operations to finish...
+ pt_ctx_shutdown(ctx);
+
+ log_info("Done");
+
return 0;
}