--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/task.py Thu Jun 11 20:39:59 2009 +0300
@@ -0,0 +1,229 @@
+"""
+ Parallel execution of work, using threads.
+
+ The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
+ which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for
+ execution by a TaskThread.
+
+ Example:
+
+ for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
+ print retval
+"""
+
+from __future__ import with_statement
+
+import threading, Queue
+import contextlib
+
+class Task (object) :
+ """
+ A task describes some work to do
+ """
+
+ # this task can be executed in a Python thread of its own
+ threadable = False
+
+ def __init__ (self, func, *args, **kwargs) :
+ """
+ Set this Task up to execute the given function/arguments
+ """
+
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ self.retval = self.exc_obj = None
+
+ def _execute (self) :
+ """
+ Low-level task execution function. Calls `execute` and stores the result/error
+ """
+
+ try :
+ # attempt to store normal retval
+ self.retval = self.execute()
+
+ except Exception, exc_obj :
+ # trap all errors
+ self.exc_obj = exc_obj
+
+ def execute (self) :
+ """
+ Execute the task and return any results.
+
+ If the task is threadable, then this method must not access any external state!
+
+ By default, this will execute and return the function given to __init__
+ """
+
+ return self.func(*self.args, **self.kwargs)
+
+class ThreadableTask (Task) :
+ """
+ A task that is marked as threadable by default
+ """
+
+ threadable = True
+
+class TaskQueue (object) :
+ """
+ A threadsafe queue of waiting tasks, keeping track of executing tasks.
+ """
+
+ def __init__ (self) :
+ """
+ Setup
+ """
+
+ # queue of incoming, to-be-executed tasks
+ self.in_queue = Queue.Queue()
+
+ # queue of outgoing, was-executed tasks
+ self.out_queue = Queue.Queue();
+
+
+ @contextlib.contextmanager
+ def get_task (self) :
+ """
+ Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
+ """
+
+ # XXX: shutdown signal?
+ task = self.in_queue.get()
+
+ try :
+ # process it
+ yield task
+
+ finally :
+ # mark it as complete
+ self.in_queue.task_done()
+
+ def execute_task (self, task) :
+ """
+ Execute the given task, and put the result in our output queue
+ """
+
+ # internal execute method
+ task._execute()
+
+ # return value
+ self.out_queue.put(task)
+
+ def put_task (self, task) :
+ """
+ Add a task on to the queue.
+ """
+
+ self.in_queue.put(task)
+
+ def get_result (self) :
+ """
+ Wait for a single result from this queue.
+
+ XXX: nothing to garuntee that this won't block
+ """
+
+ return self.out_queue.get()
+
+class TaskThread (threading.Thread) :
+ """
+ A Thread that executes Tasks
+ """
+
+ def __init__ (self, queue) :
+ """
+ Setup thread and start waiting for requests
+ """
+
+ super(TaskThread, self).__init__()
+
+ self.queue = queue
+
+ # run
+ self.start()
+
+ def run (self) :
+ """
+ Thread main
+ """
+
+ # XXX: provide shutdown method
+ while True :
+ # get a task to execute
+ with self.queue.get_task() as task :
+ # execute it
+ self.queue.execute_task(task)
+
+class TaskManager (object) :
+ """
+ Provides the machinery to take tasks and execute them
+ """
+
+ def __init__ (self, config) :
+ """
+ Initialize with given config
+ """
+
+ # setup queue
+ self.queue = TaskQueue()
+
+ # setup task threads
+ self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
+
+ def submit (self, task) :
+ """
+ Submit given task for execution.
+
+ This garuntees that the task will eventually be returned via our queue.
+ """
+
+ if self.threads and task.threadable :
+ # schedule for later execution by a thread
+ self.queue.put_task(task)
+
+ else :
+ # execute directly
+ self.queue.execute_task(task)
+
+ def consume (self) :
+ """
+ Consume a single result from the task queue, either returning the return value, or raising the task's error.
+ """
+
+ # get one task
+ task = self.queue.get_result()
+
+ if task.exc_obj :
+ # XXX: is this threadsafe?
+ raise task.exc_obj
+
+ else :
+ # successful execution
+ return task.retval
+
+
+ def execute (self, tasks) :
+ """
+ Schedule a series of tasks for execution, and yield the return values as they become available.
+
+ If a task raises an error, it will be re-raised.
+
+ Waits for all the tasks to execute.
+ """
+
+ # number of tasks submitted; how many results we expect
+ task_count = len(tasks)
+
+ # submit them
+ for task in tasks :
+ self.submit(task)
+
+ # yield results
+ for count in xrange(task_count) :
+ yield self.consume()
+
+# pretty aliases
+task = Task
+threadable_task = ThreadableTask
+