initial implementation of threaded rendering of a folder's images threaded-tasks
authorTero Marttila <terom@fixme.fi>
Thu, 11 Jun 2009 20:39:59 +0300
branchthreaded-tasks
changeset 88 b1b0939517e7
parent 87 a7a18893730d
child 89 4b254a90d6d0
initial implementation of threaded rendering of a folder's images
degal/command.py
degal/commands/main.py
degal/config.py
degal/task.py
--- a/degal/command.py	Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/command.py	Thu Jun 11 20:39:59 2009 +0300
@@ -2,6 +2,8 @@
     Command implementations
 """
 
+import task
+
 import inspect, logging, traceback
 
 class CommandList (object) :
@@ -67,6 +69,9 @@
         self.config = config
         self.gallery = gallery
 
+        # set up a task executor for this
+        self.tasks = task.TaskManager(config)
+
     def __call__ (self, *args, **kwargs) :
         """
             Run the command in this context
--- a/degal/commands/main.py	Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/commands/main.py	Thu Jun 11 20:39:59 2009 +0300
@@ -1,14 +1,12 @@
 from degal.command import command
 from degal import templates
+from degal.task import threadable_task
 
-def render_image (ctx, image) :
+def render_image_html (ctx, image) :
     """
         Render the thumbnails and .html for one image
     """
 
-    # log image path
-    ctx.log_info("%s", image)
-
     # render output
     tpl = templates.master(ctx.gallery, image.title, image.html, 
         templates.image_page(image)
@@ -17,6 +15,31 @@
     # write output
     tpl.render_file(image.html)
 
+def render_image_task (image) :
+    """
+        Render the image output for the given image
+    """
+    
+    # XXX: is this really threadsafe?
+    image.update()
+    
+    # return value is the image itself
+    return image
+
+def render_folder_images (ctx, images) :
+    """
+        Render the given series of images
+    """
+    
+    # render them in parallel as required
+    for image in ctx.tasks.execute([
+        threadable_task(render_image_task, image) for image in images
+    ]) :
+        # log image path
+        ctx.log_info("%s", image)
+
+        render_image_html(ctx, image)
+
 def render_folder_page (ctx, folder) :
     """
         Render the .html output for one folder
@@ -34,9 +57,12 @@
 
         # write output
         tpl.render_file(html_file)
-    
 
 def render_folder (ctx, folder) :
+    """
+        Render the HTML/images for this folder if needed, and recrurse into subfolders.
+    """
+
     # index images that require updating
     image_count = len(folder.images)
     new_images = list(folder.index_images(for_update=(not ctx.config.force_update)))
@@ -48,12 +74,7 @@
         ctx.log_info("%s - render %d/%d images", folder, len(new_images), image_count)
 
         # update images
-        for image in new_images :
-            # update thumbs
-            image.update()
-            
-            # render output
-            render_image(ctx, image)
+        render_folder_images(ctx, new_images)
     
     else :
         ctx.log_info("%s - up to date", folder)
--- a/degal/config.py	Thu Jun 11 00:37:01 2009 +0300
+++ b/degal/config.py	Thu Jun 11 20:39:59 2009 +0300
@@ -19,6 +19,9 @@
     # force-update items
     force_update        = False
 
+    # number of task threads to use, may be logical False
+    task_threads        = 4
+
     # minimum logging level
     log_level           = logging.INFO
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/task.py	Thu Jun 11 20:39:59 2009 +0300
@@ -0,0 +1,229 @@
+"""
+    Parallel execution of work, using threads.
+
+    The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
+    which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for 
+    execution by a TaskThread.
+
+    Example:
+
+    for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
+        print retval
+"""
+
+from __future__ import with_statement
+
+import threading, Queue
+import contextlib
+
+class Task (object) :
+    """
+        A task describes some work to do
+    """
+    
+    # this task can be executed in a Python thread of its own
+    threadable = False
+    
+    def __init__ (self, func, *args, **kwargs) :
+        """
+            Set this Task up to execute the given function/arguments
+        """
+
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+        self.retval = self.exc_obj = None
+    
+    def _execute (self) :
+        """
+            Low-level task execution function. Calls `execute` and stores the result/error
+        """
+
+        try :
+            # attempt to store normal retval
+            self.retval = self.execute()
+
+        except Exception, exc_obj :
+            # trap all errors
+            self.exc_obj = exc_obj
+
+    def execute (self) :
+        """
+            Execute the task and return any results.
+
+            If the task is threadable, then this method must not access any external state!
+
+            By default, this will execute and return the function given to __init__
+        """
+
+        return self.func(*self.args, **self.kwargs)
+
+class ThreadableTask (Task) :
+    """
+        A task that is marked as threadable by default
+    """
+
+    threadable = True
+
+class TaskQueue (object) :
+    """
+        A threadsafe queue of waiting tasks, keeping track of executing tasks.
+    """
+
+    def __init__ (self) :
+        """
+            Setup
+        """
+        
+        # queue of incoming, to-be-executed tasks
+        self.in_queue = Queue.Queue()
+
+        # queue of outgoing, was-executed tasks
+        self.out_queue = Queue.Queue();
+
+
+    @contextlib.contextmanager
+    def get_task (self) :
+        """
+            Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
+        """
+        
+        # XXX: shutdown signal?
+        task = self.in_queue.get()
+        
+        try :
+            # process it
+            yield task
+        
+        finally :
+            # mark it as complete
+            self.in_queue.task_done()
+    
+    def execute_task (self, task) :
+        """
+            Execute the given task, and put the result in our output queue
+        """
+        
+        # internal execute method
+        task._execute()
+        
+        # return value
+        self.out_queue.put(task)
+
+    def put_task (self, task) :
+        """
+            Add a task on to the queue.
+        """
+
+        self.in_queue.put(task)
+
+    def get_result (self) :
+        """
+            Wait for a single result from this queue.
+
+            XXX: nothing to garuntee that this won't block
+        """
+
+        return self.out_queue.get()
+
+class TaskThread (threading.Thread) :
+    """
+        A Thread that executes Tasks
+    """
+
+    def __init__ (self, queue) :
+        """
+            Setup thread and start waiting for requests
+        """
+        
+        super(TaskThread, self).__init__()
+
+        self.queue = queue
+
+        # run
+        self.start()
+
+    def run (self) :
+        """
+            Thread main
+        """
+        
+        # XXX: provide shutdown method
+        while True :
+            # get a task to execute
+            with self.queue.get_task() as task :
+                # execute it
+                self.queue.execute_task(task)
+
+class TaskManager (object) :
+    """
+        Provides the machinery to take tasks and execute them
+    """
+
+    def __init__ (self, config) :
+        """
+            Initialize with given config
+        """
+        
+        # setup queue
+        self.queue = TaskQueue()
+
+        # setup task threads
+        self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
+    
+    def submit (self, task) :
+        """
+            Submit given task for execution.
+
+            This garuntees that the task will eventually be returned via our queue.
+        """
+        
+        if self.threads and task.threadable :
+            # schedule for later execution by a thread
+            self.queue.put_task(task)
+        
+        else :
+            # execute directly
+            self.queue.execute_task(task)
+    
+    def consume (self) :
+        """
+            Consume a single result from the task queue, either returning the return value, or raising the task's error.
+        """
+        
+        # get one task
+        task = self.queue.get_result()
+
+        if task.exc_obj :
+            # XXX: is this threadsafe?
+            raise task.exc_obj
+
+        else :
+            # successful execution
+            return task.retval
+
+
+    def execute (self, tasks) :
+        """
+            Schedule a series of tasks for execution, and yield the return values as they become available.
+
+            If a task raises an error, it will be re-raised.
+
+            Waits for all the tasks to execute.
+        """
+
+        # number of tasks submitted; how many results we expect
+        task_count = len(tasks)
+
+        # submit them
+        for task in tasks :
+            self.submit(task)
+
+        # yield results
+        for count in xrange(task_count) :
+            yield self.consume()
+
+# pretty aliases
+task = Task
+threadable_task = ThreadableTask
+