"""
Parallel execution of work, using threads.
The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for
execution by a TaskThread.
Example:
for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
print retval
"""
from __future__ import with_statement
import threading, Queue
import contextlib
class Task (object) :
"""
A task describes some work to do
"""
# this task can be executed in a Python thread of its own
threadable = False
def __init__ (self, func, *args, **kwargs) :
"""
Set this Task up to execute the given function/arguments
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.retval = self.exc_obj = None
def _execute (self) :
"""
Low-level task execution function. Calls `execute` and stores the result/error
"""
try :
# attempt to store normal retval
self.retval = self.execute()
except Exception, exc_obj :
# trap all errors
self.exc_obj = exc_obj
def execute (self) :
"""
Execute the task and return any results.
If the task is threadable, then this method must not access any external state!
By default, this will execute and return the function given to __init__
"""
return self.func(*self.args, **self.kwargs)
class ThreadableTask (Task) :
"""
A task that is marked as threadable by default
"""
threadable = True
class TaskQueue (object) :
"""
A threadsafe queue of waiting tasks, keeping track of executing tasks.
"""
def __init__ (self) :
"""
Setup
"""
# queue of incoming, to-be-executed tasks
self.in_queue = Queue.Queue()
# queue of outgoing, was-executed tasks
self.out_queue = Queue.Queue();
@contextlib.contextmanager
def get_task (self) :
"""
Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
"""
# XXX: shutdown signal?
task = self.in_queue.get()
try :
# process it
yield task
finally :
# mark it as complete
self.in_queue.task_done()
def execute_task (self, task) :
"""
Execute the given task, and put the result in our output queue
"""
# internal execute method
task._execute()
# return value
self.out_queue.put(task)
def put_task (self, task) :
"""
Add a task on to the queue.
"""
self.in_queue.put(task)
def get_result (self) :
"""
Wait for a single result from this queue.
XXX: nothing to garuntee that this won't block
"""
return self.out_queue.get()
class TaskThread (threading.Thread) :
"""
A Thread that executes Tasks
"""
def __init__ (self, queue) :
"""
Setup thread and start waiting for requests
"""
super(TaskThread, self).__init__()
self.queue = queue
# run
self.start()
def run (self) :
"""
Thread main
"""
# XXX: provide shutdown method
while True :
# get a task to execute
with self.queue.get_task() as task :
# execute it
self.queue.execute_task(task)
class TaskManager (object) :
"""
Provides the machinery to take tasks and execute them
"""
def __init__ (self, config) :
"""
Initialize with given config
"""
# setup queue
self.queue = TaskQueue()
# setup task threads
self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
def submit (self, task) :
"""
Submit given task for execution.
This garuntees that the task will eventually be returned via our queue.
"""
if self.threads and task.threadable :
# schedule for later execution by a thread
self.queue.put_task(task)
else :
# execute directly
self.queue.execute_task(task)
def consume (self) :
"""
Consume a single result from the task queue, either returning the return value, or raising the task's error.
"""
# get one task
task = self.queue.get_result()
if task.exc_obj :
# XXX: is this threadsafe?
raise task.exc_obj
else :
# successful execution
return task.retval
def execute (self, tasks) :
"""
Schedule a series of tasks for execution, and yield the return values as they become available.
If a task raises an error, it will be re-raised.
Waits for all the tasks to execute.
"""
# number of tasks submitted; how many results we expect
task_count = len(tasks)
# submit them
for task in tasks :
self.submit(task)
# yield results
for count in xrange(task_count) :
yield self.consume()
# pretty aliases
task = Task
threadable_task = ThreadableTask