implement concurrency... :)
authorTero Marttila <terom@fixme.fi>
Sun, 14 Jun 2009 20:05:11 +0300
changeset 117 a2e4562deaab
parent 116 2d3721b9ffd0
child 118 60b126ff0b74
implement concurrency... :)
degal/command.py
degal/commands/main.py
degal/concurrent.py
degal/config.py
degal/filesystem.py
degal/main.py
--- a/degal/command.py	Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/command.py	Sun Jun 14 20:05:11 2009 +0300
@@ -2,7 +2,7 @@
     Command implementations
 """
 
-import inspect, logging, traceback
+import inspect, logging, traceback, concurrent
 
 class CommandList (object) :
     """
@@ -67,6 +67,9 @@
         self.config = config
         self.gallery = gallery
 
+        # conccurency
+        self.concurrent = concurrent.Manager(thread_count=config.thread_count)
+
     def execute (self, *args, **kwargs) :
         """
             Run the command in this context
--- a/degal/commands/main.py	Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/commands/main.py	Sun Jun 14 20:05:11 2009 +0300
@@ -1,4 +1,5 @@
 from degal.command import command
+from degal.concurrent import task
 from degal import templates
 
 def render_image_html (ctx, image) :
@@ -16,28 +17,38 @@
     # write output
     tpl.render_file(image.html)
 
-def render_image_thumbs (image) :
+def update_image_thumbs (image) :
     """
-        Render the thubmnails for the given image
+        Render the thubmnails for the given image, returning the image.
+
+        This /should/ be threadsafe.
     """
+    
+    # this will unconditionally update the image
+    image.update()
 
-    image.update()
+    return image
 
 def render_folder_images (ctx, images, for_update=True) :
     """
         Render the given series of images
     """
     
-    # render them in parallel as required
-    for image in images :
+    # XXX: this breaks force_html
+
+    # render the thumbnails concurrently
+    for image in ctx.concurrent.execute(
+        task(update_image_thumbs, image) 
+
+            for image in images 
+
+            # only test if not already filtered for update
+            # XXX: externalize logic
+            if for_update or ctx.config.force_thumb or image.stale()
+    ) :
         # log image path
         ctx.log_info("%s", image)
         
-        # only test if not already filtered for update
-        if for_update or ctx.config.force_thumb or image.stale() :
-            # render output thumbs
-            render_image_thumbs(image)
-        
         # render HTML
         render_image_html(ctx, image)
 
@@ -45,7 +56,7 @@
         # XXX: verify that this works
         del image.img
 
-def render_folder_page (ctx, folder) :
+def render_folder_html (ctx, folder) :
     """
         Render the .html output for one folder
     """
@@ -86,7 +97,7 @@
 
     if new_images or ctx.config.force_html :
         # update folder index
-        render_folder_page(ctx, folder)
+        render_folder_html(ctx, folder)
         
         ctx.log_info("%s - render %d/%d images", folder, len(new_images), image_count)
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/concurrent.py	Sun Jun 14 20:05:11 2009 +0300
@@ -0,0 +1,290 @@
+"""
+    Concurrent execution
+"""
+
+from __future__ import with_statement
+
+import sys, threading, collections, traceback
+
+class Task (object) :
+    """
+        Something to execute
+    """
+
+    def __init__ (self, func, *args, **kwargs) :
+        """
+            Initialize to run the given function with arguments.
+        """
+
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+    
+    def execute (self) :
+        """
+            Execute this task normally, returning something or raising an error.
+        """
+
+        return self.func(*self.args, **self.kwargs)
+
+    def execute_result (self) :
+        """
+            Run this task, returning a Result. This should not raise any errors
+        """
+        
+        try :
+            return Result(self.execute())
+
+        except :
+            return Failure()
+
+class Result (object) :
+    """
+        The result from executing a task
+    """
+
+    def __init__ (self, res) :
+        """ Store the result """
+
+        self.res = res
+
+    def evaluate (self) :
+        """ Returns the result value """
+
+        return self.res
+
+class Failure (object) :
+    """
+        A failed result, causing an exception
+    """
+
+    def __init__ (self, exc_info=None) :
+        """ Store the given execption info, or use current exception if not given """
+
+        if exc_info :
+            self.exc_info = exc_info
+
+        else :
+            self.exc_info = sys.exc_info()
+    
+    def evaluate (self) :
+        """ re-Raises the exception info """
+        
+        # unpack
+        type, value, tb = self.exc_info
+        
+        # re-raise
+        raise type, value, tb
+
+class Queue (object) :
+    """
+        A thread-safe Queue of waiting tasks, and their results.
+    """
+    
+    def __init__ (self) :
+        """
+            Setup our queues and locks.
+        """
+
+        # global lock
+        self.lock = threading.Lock()
+
+        # queue of waiting tasks with wait lock
+        self.task_queue = collections.deque()
+        self.task_cond = threading.Condition(self.lock)
+
+        # count of executing tasks
+        self.exec_count = 0
+        
+        # queue of results
+        self.result_queue = collections.deque()
+        self.result_cond = threading.Condition(self.lock)
+
+    def put_task (self, task) :
+        """ Enqueue a task for async execution """
+
+        with self.lock :
+            # push it
+            self.task_queue.append(task)
+
+            # notify waiting
+            self.task_cond.notify()
+    
+    def put_tasks (self, tasks) :
+        """ Enqueue a number of tasks for async execution """
+
+        with self.lock :
+            # extend them
+            self.task_queue.extend(tasks)
+
+            # notify multiple waiting
+            self.task_cond.notifyAll()
+
+    def _get_task (self) :
+        """ Dequeue a task, incrementing exec_count """
+        
+        with self.lock :
+            # wait for a task to become available
+            while not self.task_queue :
+                self.task_cond.wait()
+
+            # count
+            self.exec_count += 1
+
+            # get
+            task = self.task_queue.popleft()
+
+            return task
+    
+    def _put_result (self, result) :
+        """ Enqueue a return value, decrementing exec_count """
+
+        with self.lock :
+            # push it
+            self.result_queue.append(result)
+
+            # count
+            self.exec_count -= 1
+ 
+            # notify waiting
+            self.result_cond.notify()
+   
+    def exec_task (self, task_func) :
+        """
+            Get a task, and execute it using the given Executor, and put a result, handling errors.
+
+            Maintains the task_exec_count
+        """
+        
+        # get it
+        task = self._get_task()
+
+        try :
+            # execute it
+            result = task_func(task)
+        
+        except :
+            # internal error
+            # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
+            result = None
+            
+            # kill off the thread
+            raise
+
+        finally :
+            # put the result
+            self._put_result(result)
+    
+    def drain (self) :
+        """ Return Results from this queue until all tasks have been executed and all results returned """
+        
+        # yield/wait until empty
+        while True :
+            with self.lock :
+                # inspect current state
+                if self.result_queue :
+                    # we have a result we can return
+                    yield self.result_queue.popleft()
+
+                elif self.task_queue or self.exec_count :
+                    # tasks are still pending
+                    self.result_cond.wait()
+
+                else :
+                    # no more tasks left
+                    return
+
+class Thread (threading.Thread) :
+    """
+        A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
+    """
+
+    def __init__ (self, queue) :
+        """ Create the thread and start running """
+        
+        # init as a daemon thread
+        super(Thread, self).__init__()
+        self.daemon = True
+
+        # store
+        self.queue = queue
+
+        # run
+        self.start()
+    
+    def run_task (self) :
+        """ Get and handle a single task """
+
+        # let the queue handle safe-execution on the task using the Task.execute_result method directly
+        self.queue.exec_task(Task.execute_result)
+
+    def run (self) :
+        """ Handle tasks from our queue """
+        
+        try :
+            # XXX: something to quit for?
+            while True :
+                self.run_task()
+        
+        except :
+            from sys import stderr
+            
+            print >>stderr, "%s:" % self
+
+            # can't do much more...
+            traceback.print_exc(file=stderr)
+
+class Manager (object) :
+    """
+        Manages execution of tasks
+    """
+
+    def __init__ (self, thread_count=0) :
+        """
+            Initialize our pool of executors
+        """
+        
+        # our queue
+        self.queue = Queue()
+        
+        # thread pool
+        self.threads = [Thread(self.queue) for count in xrange(thread_count)]
+
+    def execute_threaded (self, tasks) :
+        """
+            Execute the given tasks using our thread pool
+        """
+
+        # schedule them all
+        self.queue.put_tasks(tasks)
+
+        # then drain results
+        for result in self.queue.drain() :
+            # this will either return the value or raise the error
+            yield result.evaluate()
+    
+    def execute_directly (self, tasks) :
+        """
+            Fallback to execute the given tasks without any concurrency.
+        """
+
+        for task in tasks :
+            yield task.execute()
+
+    def execute (self, tasks) :
+        """
+            Execute the given tasks concurrently.
+            
+            This will yield the result of each task, and return once all tasks have completed.
+        """
+        
+        if self.threads :
+            # yay
+            return self.execute_threaded(tasks)
+        
+        else :
+            # fallback
+            return self.execute_directly(tasks)
+# aliases   
+task = Task
+
--- a/degal/config.py	Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/config.py	Sun Jun 14 20:05:11 2009 +0300
@@ -20,6 +20,9 @@
     # minimum logging level
     log_level           = logging.INFO
 
+    # number of threads to use for concurrency
+    thread_count        = 2
+
     ## detailed configuration
     # the name of the gallery
     gallery_title       = "Image Gallery"
--- a/degal/filesystem.py	Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/filesystem.py	Sun Jun 14 20:05:11 2009 +0300
@@ -146,8 +146,11 @@
     def nodepath (self) :
         """
             Returns the path of nodes from this node to the root node, inclusive
+
             >>> list(Node(Root('/'), 'foo').subnode('bar').nodepath())
             [Root('/'), Node('/', 'foo'), Node('/foo', 'bar')]
+
+            XXX: circular reference hell?
         """
         
         # recursive generator
--- a/degal/main.py	Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/main.py	Sun Jun 14 20:05:11 2009 +0300
@@ -30,6 +30,9 @@
     parser.add_option("--with-exif",            dest='exif_enabled', action="store_true", default=None,
             help="Include Exif metadata in updated .html files")
 
+    parser.add_option('-c', "--thread-count",   dest='thread_count', type="int", default=None,
+            help="Size of thread pool")
+
     parser.add_option('-d', "--debug",          dest='debug', action="store_true", default=False,
             help="Show debug output")
 
@@ -63,6 +66,9 @@
     if options.exif_enabled is not None :
         config.exif_enabled = options.exif_enabled
 
+    if options.thread_count is not None :
+        config.thread_count = options.thread_count
+
     if options.debug :
         config.log_level = config_module.logging.DEBUG