implement concurrency... :)
--- a/degal/command.py Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/command.py Sun Jun 14 20:05:11 2009 +0300
@@ -2,7 +2,7 @@
Command implementations
"""
-import inspect, logging, traceback
+import inspect, logging, traceback, concurrent
class CommandList (object) :
"""
@@ -67,6 +67,9 @@
self.config = config
self.gallery = gallery
+ # conccurency
+ self.concurrent = concurrent.Manager(thread_count=config.thread_count)
+
def execute (self, *args, **kwargs) :
"""
Run the command in this context
--- a/degal/commands/main.py Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/commands/main.py Sun Jun 14 20:05:11 2009 +0300
@@ -1,4 +1,5 @@
from degal.command import command
+from degal.concurrent import task
from degal import templates
def render_image_html (ctx, image) :
@@ -16,28 +17,38 @@
# write output
tpl.render_file(image.html)
-def render_image_thumbs (image) :
+def update_image_thumbs (image) :
"""
- Render the thubmnails for the given image
+ Render the thubmnails for the given image, returning the image.
+
+ This /should/ be threadsafe.
"""
+
+ # this will unconditionally update the image
+ image.update()
- image.update()
+ return image
def render_folder_images (ctx, images, for_update=True) :
"""
Render the given series of images
"""
- # render them in parallel as required
- for image in images :
+ # XXX: this breaks force_html
+
+ # render the thumbnails concurrently
+ for image in ctx.concurrent.execute(
+ task(update_image_thumbs, image)
+
+ for image in images
+
+ # only test if not already filtered for update
+ # XXX: externalize logic
+ if for_update or ctx.config.force_thumb or image.stale()
+ ) :
# log image path
ctx.log_info("%s", image)
- # only test if not already filtered for update
- if for_update or ctx.config.force_thumb or image.stale() :
- # render output thumbs
- render_image_thumbs(image)
-
# render HTML
render_image_html(ctx, image)
@@ -45,7 +56,7 @@
# XXX: verify that this works
del image.img
-def render_folder_page (ctx, folder) :
+def render_folder_html (ctx, folder) :
"""
Render the .html output for one folder
"""
@@ -86,7 +97,7 @@
if new_images or ctx.config.force_html :
# update folder index
- render_folder_page(ctx, folder)
+ render_folder_html(ctx, folder)
ctx.log_info("%s - render %d/%d images", folder, len(new_images), image_count)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/concurrent.py Sun Jun 14 20:05:11 2009 +0300
@@ -0,0 +1,290 @@
+"""
+ Concurrent execution
+"""
+
+from __future__ import with_statement
+
+import sys, threading, collections, traceback
+
+class Task (object) :
+ """
+ Something to execute
+ """
+
+ def __init__ (self, func, *args, **kwargs) :
+ """
+ Initialize to run the given function with arguments.
+ """
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+
+ def execute (self) :
+ """
+ Execute this task normally, returning something or raising an error.
+ """
+
+ return self.func(*self.args, **self.kwargs)
+
+ def execute_result (self) :
+ """
+ Run this task, returning a Result. This should not raise any errors
+ """
+
+ try :
+ return Result(self.execute())
+
+ except :
+ return Failure()
+
+class Result (object) :
+ """
+ The result from executing a task
+ """
+
+ def __init__ (self, res) :
+ """ Store the result """
+
+ self.res = res
+
+ def evaluate (self) :
+ """ Returns the result value """
+
+ return self.res
+
+class Failure (object) :
+ """
+ A failed result, causing an exception
+ """
+
+ def __init__ (self, exc_info=None) :
+ """ Store the given execption info, or use current exception if not given """
+
+ if exc_info :
+ self.exc_info = exc_info
+
+ else :
+ self.exc_info = sys.exc_info()
+
+ def evaluate (self) :
+ """ re-Raises the exception info """
+
+ # unpack
+ type, value, tb = self.exc_info
+
+ # re-raise
+ raise type, value, tb
+
+class Queue (object) :
+ """
+ A thread-safe Queue of waiting tasks, and their results.
+ """
+
+ def __init__ (self) :
+ """
+ Setup our queues and locks.
+ """
+
+ # global lock
+ self.lock = threading.Lock()
+
+ # queue of waiting tasks with wait lock
+ self.task_queue = collections.deque()
+ self.task_cond = threading.Condition(self.lock)
+
+ # count of executing tasks
+ self.exec_count = 0
+
+ # queue of results
+ self.result_queue = collections.deque()
+ self.result_cond = threading.Condition(self.lock)
+
+ def put_task (self, task) :
+ """ Enqueue a task for async execution """
+
+ with self.lock :
+ # push it
+ self.task_queue.append(task)
+
+ # notify waiting
+ self.task_cond.notify()
+
+ def put_tasks (self, tasks) :
+ """ Enqueue a number of tasks for async execution """
+
+ with self.lock :
+ # extend them
+ self.task_queue.extend(tasks)
+
+ # notify multiple waiting
+ self.task_cond.notifyAll()
+
+ def _get_task (self) :
+ """ Dequeue a task, incrementing exec_count """
+
+ with self.lock :
+ # wait for a task to become available
+ while not self.task_queue :
+ self.task_cond.wait()
+
+ # count
+ self.exec_count += 1
+
+ # get
+ task = self.task_queue.popleft()
+
+ return task
+
+ def _put_result (self, result) :
+ """ Enqueue a return value, decrementing exec_count """
+
+ with self.lock :
+ # push it
+ self.result_queue.append(result)
+
+ # count
+ self.exec_count -= 1
+
+ # notify waiting
+ self.result_cond.notify()
+
+ def exec_task (self, task_func) :
+ """
+ Get a task, and execute it using the given Executor, and put a result, handling errors.
+
+ Maintains the task_exec_count
+ """
+
+ # get it
+ task = self._get_task()
+
+ try :
+ # execute it
+ result = task_func(task)
+
+ except :
+ # internal error
+ # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
+ result = None
+
+ # kill off the thread
+ raise
+
+ finally :
+ # put the result
+ self._put_result(result)
+
+ def drain (self) :
+ """ Return Results from this queue until all tasks have been executed and all results returned """
+
+ # yield/wait until empty
+ while True :
+ with self.lock :
+ # inspect current state
+ if self.result_queue :
+ # we have a result we can return
+ yield self.result_queue.popleft()
+
+ elif self.task_queue or self.exec_count :
+ # tasks are still pending
+ self.result_cond.wait()
+
+ else :
+ # no more tasks left
+ return
+
+class Thread (threading.Thread) :
+ """
+ A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
+ """
+
+ def __init__ (self, queue) :
+ """ Create the thread and start running """
+
+ # init as a daemon thread
+ super(Thread, self).__init__()
+ self.daemon = True
+
+ # store
+ self.queue = queue
+
+ # run
+ self.start()
+
+ def run_task (self) :
+ """ Get and handle a single task """
+
+ # let the queue handle safe-execution on the task using the Task.execute_result method directly
+ self.queue.exec_task(Task.execute_result)
+
+ def run (self) :
+ """ Handle tasks from our queue """
+
+ try :
+ # XXX: something to quit for?
+ while True :
+ self.run_task()
+
+ except :
+ from sys import stderr
+
+ print >>stderr, "%s:" % self
+
+ # can't do much more...
+ traceback.print_exc(file=stderr)
+
+class Manager (object) :
+ """
+ Manages execution of tasks
+ """
+
+ def __init__ (self, thread_count=0) :
+ """
+ Initialize our pool of executors
+ """
+
+ # our queue
+ self.queue = Queue()
+
+ # thread pool
+ self.threads = [Thread(self.queue) for count in xrange(thread_count)]
+
+ def execute_threaded (self, tasks) :
+ """
+ Execute the given tasks using our thread pool
+ """
+
+ # schedule them all
+ self.queue.put_tasks(tasks)
+
+ # then drain results
+ for result in self.queue.drain() :
+ # this will either return the value or raise the error
+ yield result.evaluate()
+
+ def execute_directly (self, tasks) :
+ """
+ Fallback to execute the given tasks without any concurrency.
+ """
+
+ for task in tasks :
+ yield task.execute()
+
+ def execute (self, tasks) :
+ """
+ Execute the given tasks concurrently.
+
+ This will yield the result of each task, and return once all tasks have completed.
+ """
+
+ if self.threads :
+ # yay
+ return self.execute_threaded(tasks)
+
+ else :
+ # fallback
+ return self.execute_directly(tasks)
+# aliases
+task = Task
+
--- a/degal/config.py Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/config.py Sun Jun 14 20:05:11 2009 +0300
@@ -20,6 +20,9 @@
# minimum logging level
log_level = logging.INFO
+ # number of threads to use for concurrency
+ thread_count = 2
+
## detailed configuration
# the name of the gallery
gallery_title = "Image Gallery"
--- a/degal/filesystem.py Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/filesystem.py Sun Jun 14 20:05:11 2009 +0300
@@ -146,8 +146,11 @@
def nodepath (self) :
"""
Returns the path of nodes from this node to the root node, inclusive
+
>>> list(Node(Root('/'), 'foo').subnode('bar').nodepath())
[Root('/'), Node('/', 'foo'), Node('/foo', 'bar')]
+
+ XXX: circular reference hell?
"""
# recursive generator
--- a/degal/main.py Sun Jun 14 18:24:14 2009 +0300
+++ b/degal/main.py Sun Jun 14 20:05:11 2009 +0300
@@ -30,6 +30,9 @@
parser.add_option("--with-exif", dest='exif_enabled', action="store_true", default=None,
help="Include Exif metadata in updated .html files")
+ parser.add_option('-c', "--thread-count", dest='thread_count', type="int", default=None,
+ help="Size of thread pool")
+
parser.add_option('-d', "--debug", dest='debug', action="store_true", default=False,
help="Show debug output")
@@ -63,6 +66,9 @@
if options.exif_enabled is not None :
config.exif_enabled = options.exif_enabled
+ if options.thread_count is not None :
+ config.thread_count = options.thread_count
+
if options.debug :
config.log_level = config_module.logging.DEBUG