# HG changeset patch # User Tero Marttila # Date 1244741999 -10800 # Node ID b1b0939517e751ef31835242d899bd219ecd1ffb # Parent a7a18893730d27e1a2db9dc27d4c9e738b8d55bc initial implementation of threaded rendering of a folder's images diff -r a7a18893730d -r b1b0939517e7 degal/command.py --- a/degal/command.py Thu Jun 11 00:37:01 2009 +0300 +++ b/degal/command.py Thu Jun 11 20:39:59 2009 +0300 @@ -2,6 +2,8 @@ Command implementations """ +import task + import inspect, logging, traceback class CommandList (object) : @@ -67,6 +69,9 @@ self.config = config self.gallery = gallery + # set up a task executor for this + self.tasks = task.TaskManager(config) + def __call__ (self, *args, **kwargs) : """ Run the command in this context diff -r a7a18893730d -r b1b0939517e7 degal/commands/main.py --- a/degal/commands/main.py Thu Jun 11 00:37:01 2009 +0300 +++ b/degal/commands/main.py Thu Jun 11 20:39:59 2009 +0300 @@ -1,14 +1,12 @@ from degal.command import command from degal import templates +from degal.task import threadable_task -def render_image (ctx, image) : +def render_image_html (ctx, image) : """ Render the thumbnails and .html for one image """ - # log image path - ctx.log_info("%s", image) - # render output tpl = templates.master(ctx.gallery, image.title, image.html, templates.image_page(image) @@ -17,6 +15,31 @@ # write output tpl.render_file(image.html) +def render_image_task (image) : + """ + Render the image output for the given image + """ + + # XXX: is this really threadsafe? + image.update() + + # return value is the image itself + return image + +def render_folder_images (ctx, images) : + """ + Render the given series of images + """ + + # render them in parallel as required + for image in ctx.tasks.execute([ + threadable_task(render_image_task, image) for image in images + ]) : + # log image path + ctx.log_info("%s", image) + + render_image_html(ctx, image) + def render_folder_page (ctx, folder) : """ Render the .html output for one folder @@ -34,9 +57,12 @@ # write output tpl.render_file(html_file) - def render_folder (ctx, folder) : + """ + Render the HTML/images for this folder if needed, and recrurse into subfolders. + """ + # index images that require updating image_count = len(folder.images) new_images = list(folder.index_images(for_update=(not ctx.config.force_update))) @@ -48,12 +74,7 @@ ctx.log_info("%s - render %d/%d images", folder, len(new_images), image_count) # update images - for image in new_images : - # update thumbs - image.update() - - # render output - render_image(ctx, image) + render_folder_images(ctx, new_images) else : ctx.log_info("%s - up to date", folder) diff -r a7a18893730d -r b1b0939517e7 degal/config.py --- a/degal/config.py Thu Jun 11 00:37:01 2009 +0300 +++ b/degal/config.py Thu Jun 11 20:39:59 2009 +0300 @@ -19,6 +19,9 @@ # force-update items force_update = False + # number of task threads to use, may be logical False + task_threads = 4 + # minimum logging level log_level = logging.INFO diff -r a7a18893730d -r b1b0939517e7 degal/task.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/degal/task.py Thu Jun 11 20:39:59 2009 +0300 @@ -0,0 +1,229 @@ +""" + Parallel execution of work, using threads. + + The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager, + which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for + execution by a TaskThread. + + Example: + + for retval in tasks.execute(threadable_task(foo_func, item) for item in items) : + print retval +""" + +from __future__ import with_statement + +import threading, Queue +import contextlib + +class Task (object) : + """ + A task describes some work to do + """ + + # this task can be executed in a Python thread of its own + threadable = False + + def __init__ (self, func, *args, **kwargs) : + """ + Set this Task up to execute the given function/arguments + """ + + self.func = func + self.args = args + self.kwargs = kwargs + self.retval = self.exc_obj = None + + def _execute (self) : + """ + Low-level task execution function. Calls `execute` and stores the result/error + """ + + try : + # attempt to store normal retval + self.retval = self.execute() + + except Exception, exc_obj : + # trap all errors + self.exc_obj = exc_obj + + def execute (self) : + """ + Execute the task and return any results. + + If the task is threadable, then this method must not access any external state! + + By default, this will execute and return the function given to __init__ + """ + + return self.func(*self.args, **self.kwargs) + +class ThreadableTask (Task) : + """ + A task that is marked as threadable by default + """ + + threadable = True + +class TaskQueue (object) : + """ + A threadsafe queue of waiting tasks, keeping track of executing tasks. + """ + + def __init__ (self) : + """ + Setup + """ + + # queue of incoming, to-be-executed tasks + self.in_queue = Queue.Queue() + + # queue of outgoing, was-executed tasks + self.out_queue = Queue.Queue(); + + + @contextlib.contextmanager + def get_task (self) : + """ + Intended to be used as the target of a 'with' statement to get a task from the queue and execute it. + """ + + # XXX: shutdown signal? + task = self.in_queue.get() + + try : + # process it + yield task + + finally : + # mark it as complete + self.in_queue.task_done() + + def execute_task (self, task) : + """ + Execute the given task, and put the result in our output queue + """ + + # internal execute method + task._execute() + + # return value + self.out_queue.put(task) + + def put_task (self, task) : + """ + Add a task on to the queue. + """ + + self.in_queue.put(task) + + def get_result (self) : + """ + Wait for a single result from this queue. + + XXX: nothing to garuntee that this won't block + """ + + return self.out_queue.get() + +class TaskThread (threading.Thread) : + """ + A Thread that executes Tasks + """ + + def __init__ (self, queue) : + """ + Setup thread and start waiting for requests + """ + + super(TaskThread, self).__init__() + + self.queue = queue + + # run + self.start() + + def run (self) : + """ + Thread main + """ + + # XXX: provide shutdown method + while True : + # get a task to execute + with self.queue.get_task() as task : + # execute it + self.queue.execute_task(task) + +class TaskManager (object) : + """ + Provides the machinery to take tasks and execute them + """ + + def __init__ (self, config) : + """ + Initialize with given config + """ + + # setup queue + self.queue = TaskQueue() + + # setup task threads + self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)] + + def submit (self, task) : + """ + Submit given task for execution. + + This garuntees that the task will eventually be returned via our queue. + """ + + if self.threads and task.threadable : + # schedule for later execution by a thread + self.queue.put_task(task) + + else : + # execute directly + self.queue.execute_task(task) + + def consume (self) : + """ + Consume a single result from the task queue, either returning the return value, or raising the task's error. + """ + + # get one task + task = self.queue.get_result() + + if task.exc_obj : + # XXX: is this threadsafe? + raise task.exc_obj + + else : + # successful execution + return task.retval + + + def execute (self, tasks) : + """ + Schedule a series of tasks for execution, and yield the return values as they become available. + + If a task raises an error, it will be re-raised. + + Waits for all the tasks to execute. + """ + + # number of tasks submitted; how many results we expect + task_count = len(tasks) + + # submit them + for task in tasks : + self.submit(task) + + # yield results + for count in xrange(task_count) : + yield self.consume() + +# pretty aliases +task = Task +threadable_task = ThreadableTask +