--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/concurrent.py Sun Jun 14 20:05:11 2009 +0300
@@ -0,0 +1,290 @@
+"""
+ Concurrent execution
+"""
+
+from __future__ import with_statement
+
+import sys, threading, collections, traceback
+
+class Task (object) :
+ """
+ Something to execute
+ """
+
+ def __init__ (self, func, *args, **kwargs) :
+ """
+ Initialize to run the given function with arguments.
+ """
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+
+ def execute (self) :
+ """
+ Execute this task normally, returning something or raising an error.
+ """
+
+ return self.func(*self.args, **self.kwargs)
+
+ def execute_result (self) :
+ """
+ Run this task, returning a Result. This should not raise any errors
+ """
+
+ try :
+ return Result(self.execute())
+
+ except :
+ return Failure()
+
+class Result (object) :
+ """
+ The result from executing a task
+ """
+
+ def __init__ (self, res) :
+ """ Store the result """
+
+ self.res = res
+
+ def evaluate (self) :
+ """ Returns the result value """
+
+ return self.res
+
+class Failure (object) :
+ """
+ A failed result, causing an exception
+ """
+
+ def __init__ (self, exc_info=None) :
+ """ Store the given execption info, or use current exception if not given """
+
+ if exc_info :
+ self.exc_info = exc_info
+
+ else :
+ self.exc_info = sys.exc_info()
+
+ def evaluate (self) :
+ """ re-Raises the exception info """
+
+ # unpack
+ type, value, tb = self.exc_info
+
+ # re-raise
+ raise type, value, tb
+
+class Queue (object) :
+ """
+ A thread-safe Queue of waiting tasks, and their results.
+ """
+
+ def __init__ (self) :
+ """
+ Setup our queues and locks.
+ """
+
+ # global lock
+ self.lock = threading.Lock()
+
+ # queue of waiting tasks with wait lock
+ self.task_queue = collections.deque()
+ self.task_cond = threading.Condition(self.lock)
+
+ # count of executing tasks
+ self.exec_count = 0
+
+ # queue of results
+ self.result_queue = collections.deque()
+ self.result_cond = threading.Condition(self.lock)
+
+ def put_task (self, task) :
+ """ Enqueue a task for async execution """
+
+ with self.lock :
+ # push it
+ self.task_queue.append(task)
+
+ # notify waiting
+ self.task_cond.notify()
+
+ def put_tasks (self, tasks) :
+ """ Enqueue a number of tasks for async execution """
+
+ with self.lock :
+ # extend them
+ self.task_queue.extend(tasks)
+
+ # notify multiple waiting
+ self.task_cond.notifyAll()
+
+ def _get_task (self) :
+ """ Dequeue a task, incrementing exec_count """
+
+ with self.lock :
+ # wait for a task to become available
+ while not self.task_queue :
+ self.task_cond.wait()
+
+ # count
+ self.exec_count += 1
+
+ # get
+ task = self.task_queue.popleft()
+
+ return task
+
+ def _put_result (self, result) :
+ """ Enqueue a return value, decrementing exec_count """
+
+ with self.lock :
+ # push it
+ self.result_queue.append(result)
+
+ # count
+ self.exec_count -= 1
+
+ # notify waiting
+ self.result_cond.notify()
+
+ def exec_task (self, task_func) :
+ """
+ Get a task, and execute it using the given Executor, and put a result, handling errors.
+
+ Maintains the task_exec_count
+ """
+
+ # get it
+ task = self._get_task()
+
+ try :
+ # execute it
+ result = task_func(task)
+
+ except :
+ # internal error
+ # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
+ result = None
+
+ # kill off the thread
+ raise
+
+ finally :
+ # put the result
+ self._put_result(result)
+
+ def drain (self) :
+ """ Return Results from this queue until all tasks have been executed and all results returned """
+
+ # yield/wait until empty
+ while True :
+ with self.lock :
+ # inspect current state
+ if self.result_queue :
+ # we have a result we can return
+ yield self.result_queue.popleft()
+
+ elif self.task_queue or self.exec_count :
+ # tasks are still pending
+ self.result_cond.wait()
+
+ else :
+ # no more tasks left
+ return
+
+class Thread (threading.Thread) :
+ """
+ A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
+ """
+
+ def __init__ (self, queue) :
+ """ Create the thread and start running """
+
+ # init as a daemon thread
+ super(Thread, self).__init__()
+ self.daemon = True
+
+ # store
+ self.queue = queue
+
+ # run
+ self.start()
+
+ def run_task (self) :
+ """ Get and handle a single task """
+
+ # let the queue handle safe-execution on the task using the Task.execute_result method directly
+ self.queue.exec_task(Task.execute_result)
+
+ def run (self) :
+ """ Handle tasks from our queue """
+
+ try :
+ # XXX: something to quit for?
+ while True :
+ self.run_task()
+
+ except :
+ from sys import stderr
+
+ print >>stderr, "%s:" % self
+
+ # can't do much more...
+ traceback.print_exc(file=stderr)
+
+class Manager (object) :
+ """
+ Manages execution of tasks
+ """
+
+ def __init__ (self, thread_count=0) :
+ """
+ Initialize our pool of executors
+ """
+
+ # our queue
+ self.queue = Queue()
+
+ # thread pool
+ self.threads = [Thread(self.queue) for count in xrange(thread_count)]
+
+ def execute_threaded (self, tasks) :
+ """
+ Execute the given tasks using our thread pool
+ """
+
+ # schedule them all
+ self.queue.put_tasks(tasks)
+
+ # then drain results
+ for result in self.queue.drain() :
+ # this will either return the value or raise the error
+ yield result.evaluate()
+
+ def execute_directly (self, tasks) :
+ """
+ Fallback to execute the given tasks without any concurrency.
+ """
+
+ for task in tasks :
+ yield task.execute()
+
+ def execute (self, tasks) :
+ """
+ Execute the given tasks concurrently.
+
+ This will yield the result of each task, and return once all tasks have completed.
+ """
+
+ if self.threads :
+ # yay
+ return self.execute_threaded(tasks)
+
+ else :
+ # fallback
+ return self.execute_directly(tasks)
+# aliases
+task = Task
+