--- a/degal/command.py Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/command.py Thu Jun 11 20:39:59 2009 +0300
@@ -2,6 +2,8 @@
Command implementations
"""
+import task
+
import inspect, logging, traceback
class CommandList (object) :
@@ -67,6 +69,9 @@
self.config = config
self.gallery = gallery
+ # set up a task executor for this
+ self.tasks = task.TaskManager(config)
+
def __call__ (self, *args, **kwargs) :
"""
Run the command in this context
--- a/degal/commands/main.py Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/commands/main.py Thu Jun 11 20:39:59 2009 +0300
@@ -1,14 +1,12 @@
from degal.command import command
from degal import templates
+from degal.task import threadable_task
-def render_image (ctx, image) :
+def render_image_html (ctx, image) :
"""
Render the thumbnails and .html for one image
"""
- # log image path
- ctx.log_info("%s", image)
-
# render output
tpl = templates.master(ctx.gallery, image.title, image.html,
templates.image_page(image)
@@ -17,6 +15,31 @@
# write output
tpl.render_file(image.html)
+def render_image_task (image) :
+ """
+ Render the image output for the given image
+ """
+
+ # XXX: is this really threadsafe?
+ image.update()
+
+ # return value is the image itself
+ return image
+
+def render_folder_images (ctx, images) :
+ """
+ Render the given series of images
+ """
+
+ # render them in parallel as required
+ for image in ctx.tasks.execute([
+ threadable_task(render_image_task, image) for image in images
+ ]) :
+ # log image path
+ ctx.log_info("%s", image)
+
+ render_image_html(ctx, image)
+
def render_folder_page (ctx, folder) :
"""
Render the .html output for one folder
@@ -34,9 +57,12 @@
# write output
tpl.render_file(html_file)
-
def render_folder (ctx, folder) :
+ """
+ Render the HTML/images for this folder if needed, and recrurse into subfolders.
+ """
+
# index images that require updating
image_count = len(folder.images)
new_images = list(folder.index_images(for_update=(not ctx.config.force_update)))
@@ -48,12 +74,7 @@
ctx.log_info("%s - render %d/%d images", folder, len(new_images), image_count)
# update images
- for image in new_images :
- # update thumbs
- image.update()
-
- # render output
- render_image(ctx, image)
+ render_folder_images(ctx, new_images)
else :
ctx.log_info("%s - up to date", folder)
--- a/degal/config.py Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/config.py Thu Jun 11 20:39:59 2009 +0300
@@ -19,6 +19,9 @@
# force-update items
force_update = False
+ # number of task threads to use, may be logical False
+ task_threads = 4
+
# minimum logging level
log_level = logging.INFO
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/task.py Thu Jun 11 20:39:59 2009 +0300
@@ -0,0 +1,229 @@
+"""
+ Parallel execution of work, using threads.
+
+ The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
+ which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for
+ execution by a TaskThread.
+
+ Example:
+
+ for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
+ print retval
+"""
+
+from __future__ import with_statement
+
+import threading, Queue
+import contextlib
+
+class Task (object) :
+ """
+ A task describes some work to do
+ """
+
+ # this task can be executed in a Python thread of its own
+ threadable = False
+
+ def __init__ (self, func, *args, **kwargs) :
+ """
+ Set this Task up to execute the given function/arguments
+ """
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ self.retval = self.exc_obj = None
+
+ def _execute (self) :
+ """
+ Low-level task execution function. Calls `execute` and stores the result/error
+ """
+
+ try :
+ # attempt to store normal retval
+ self.retval = self.execute()
+
+ except Exception, exc_obj :
+ # trap all errors
+ self.exc_obj = exc_obj
+
+ def execute (self) :
+ """
+ Execute the task and return any results.
+
+ If the task is threadable, then this method must not access any external state!
+
+ By default, this will execute and return the function given to __init__
+ """
+
+ return self.func(*self.args, **self.kwargs)
+
+class ThreadableTask (Task) :
+ """
+ A task that is marked as threadable by default
+ """
+
+ threadable = True
+
+class TaskQueue (object) :
+ """
+ A threadsafe queue of waiting tasks, keeping track of executing tasks.
+ """
+
+ def __init__ (self) :
+ """
+ Setup
+ """
+
+ # queue of incoming, to-be-executed tasks
+ self.in_queue = Queue.Queue()
+
+ # queue of outgoing, was-executed tasks
+ self.out_queue = Queue.Queue();
+
+
+ @contextlib.contextmanager
+ def get_task (self) :
+ """
+ Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
+ """
+
+ # XXX: shutdown signal?
+ task = self.in_queue.get()
+
+ try :
+ # process it
+ yield task
+
+ finally :
+ # mark it as complete
+ self.in_queue.task_done()
+
+ def execute_task (self, task) :
+ """
+ Execute the given task, and put the result in our output queue
+ """
+
+ # internal execute method
+ task._execute()
+
+ # return value
+ self.out_queue.put(task)
+
+ def put_task (self, task) :
+ """
+ Add a task on to the queue.
+ """
+
+ self.in_queue.put(task)
+
+ def get_result (self) :
+ """
+ Wait for a single result from this queue.
+
+ XXX: nothing to garuntee that this won't block
+ """
+
+ return self.out_queue.get()
+
+class TaskThread (threading.Thread) :
+ """
+ A Thread that executes Tasks
+ """
+
+ def __init__ (self, queue) :
+ """
+ Setup thread and start waiting for requests
+ """
+
+ super(TaskThread, self).__init__()
+
+ self.queue = queue
+
+ # run
+ self.start()
+
+ def run (self) :
+ """
+ Thread main
+ """
+
+ # XXX: provide shutdown method
+ while True :
+ # get a task to execute
+ with self.queue.get_task() as task :
+ # execute it
+ self.queue.execute_task(task)
+
+class TaskManager (object) :
+ """
+ Provides the machinery to take tasks and execute them
+ """
+
+ def __init__ (self, config) :
+ """
+ Initialize with given config
+ """
+
+ # setup queue
+ self.queue = TaskQueue()
+
+ # setup task threads
+ self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
+
+ def submit (self, task) :
+ """
+ Submit given task for execution.
+
+ This garuntees that the task will eventually be returned via our queue.
+ """
+
+ if self.threads and task.threadable :
+ # schedule for later execution by a thread
+ self.queue.put_task(task)
+
+ else :
+ # execute directly
+ self.queue.execute_task(task)
+
+ def consume (self) :
+ """
+ Consume a single result from the task queue, either returning the return value, or raising the task's error.
+ """
+
+ # get one task
+ task = self.queue.get_result()
+
+ if task.exc_obj :
+ # XXX: is this threadsafe?
+ raise task.exc_obj
+
+ else :
+ # successful execution
+ return task.retval
+
+
+ def execute (self, tasks) :
+ """
+ Schedule a series of tasks for execution, and yield the return values as they become available.
+
+ If a task raises an error, it will be re-raised.
+
+ Waits for all the tasks to execute.
+ """
+
+ # number of tasks submitted; how many results we expect
+ task_count = len(tasks)
+
+ # submit them
+ for task in tasks :
+ self.submit(task)
+
+ # yield results
+ for count in xrange(task_count) :
+ yield self.consume()
+
+# pretty aliases
+task = Task
+threadable_task = ThreadableTask
+