terom@88: """ terom@88: Parallel execution of work, using threads. terom@88: terom@88: The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager, terom@88: which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for terom@88: execution by a TaskThread. terom@88: terom@88: Example: terom@88: terom@88: for retval in tasks.execute(threadable_task(foo_func, item) for item in items) : terom@88: print retval terom@88: """ terom@88: terom@88: from __future__ import with_statement terom@88: terom@88: import threading, Queue terom@88: import contextlib terom@88: terom@88: class Task (object) : terom@88: """ terom@88: A task describes some work to do terom@88: """ terom@88: terom@88: # this task can be executed in a Python thread of its own terom@88: threadable = False terom@88: terom@88: def __init__ (self, func, *args, **kwargs) : terom@88: """ terom@88: Set this Task up to execute the given function/arguments terom@88: """ terom@88: terom@88: self.func = func terom@88: self.args = args terom@88: self.kwargs = kwargs terom@88: self.retval = self.exc_obj = None terom@88: terom@88: def _execute (self) : terom@88: """ terom@88: Low-level task execution function. Calls `execute` and stores the result/error terom@88: """ terom@88: terom@88: try : terom@88: # attempt to store normal retval terom@88: self.retval = self.execute() terom@88: terom@88: except Exception, exc_obj : terom@88: # trap all errors terom@88: self.exc_obj = exc_obj terom@88: terom@88: def execute (self) : terom@88: """ terom@88: Execute the task and return any results. terom@88: terom@88: If the task is threadable, then this method must not access any external state! terom@88: terom@88: By default, this will execute and return the function given to __init__ terom@88: """ terom@88: terom@88: return self.func(*self.args, **self.kwargs) terom@88: terom@88: class ThreadableTask (Task) : terom@88: """ terom@88: A task that is marked as threadable by default terom@88: """ terom@88: terom@88: threadable = True terom@88: terom@88: class TaskQueue (object) : terom@88: """ terom@88: A threadsafe queue of waiting tasks, keeping track of executing tasks. terom@88: """ terom@88: terom@88: def __init__ (self) : terom@88: """ terom@88: Setup terom@88: """ terom@88: terom@88: # queue of incoming, to-be-executed tasks terom@88: self.in_queue = Queue.Queue() terom@88: terom@88: # queue of outgoing, was-executed tasks terom@88: self.out_queue = Queue.Queue(); terom@88: terom@88: terom@88: @contextlib.contextmanager terom@88: def get_task (self) : terom@88: """ terom@88: Intended to be used as the target of a 'with' statement to get a task from the queue and execute it. terom@88: """ terom@88: terom@88: # XXX: shutdown signal? terom@88: task = self.in_queue.get() terom@88: terom@88: try : terom@88: # process it terom@88: yield task terom@88: terom@88: finally : terom@88: # mark it as complete terom@88: self.in_queue.task_done() terom@88: terom@88: def execute_task (self, task) : terom@88: """ terom@88: Execute the given task, and put the result in our output queue terom@88: """ terom@88: terom@88: # internal execute method terom@88: task._execute() terom@88: terom@88: # return value terom@88: self.out_queue.put(task) terom@88: terom@88: def put_task (self, task) : terom@88: """ terom@88: Add a task on to the queue. terom@88: """ terom@88: terom@88: self.in_queue.put(task) terom@88: terom@88: def get_result (self) : terom@88: """ terom@88: Wait for a single result from this queue. terom@88: terom@88: XXX: nothing to garuntee that this won't block terom@88: """ terom@88: terom@88: return self.out_queue.get() terom@88: terom@88: class TaskThread (threading.Thread) : terom@88: """ terom@88: A Thread that executes Tasks terom@88: """ terom@88: terom@88: def __init__ (self, queue) : terom@88: """ terom@88: Setup thread and start waiting for requests terom@88: """ terom@88: terom@88: super(TaskThread, self).__init__() terom@88: terom@88: self.queue = queue terom@88: terom@88: # run terom@88: self.start() terom@88: terom@88: def run (self) : terom@88: """ terom@88: Thread main terom@88: """ terom@88: terom@88: # XXX: provide shutdown method terom@88: while True : terom@88: # get a task to execute terom@88: with self.queue.get_task() as task : terom@88: # execute it terom@88: self.queue.execute_task(task) terom@88: terom@88: class TaskManager (object) : terom@88: """ terom@88: Provides the machinery to take tasks and execute them terom@88: """ terom@88: terom@88: def __init__ (self, config) : terom@88: """ terom@88: Initialize with given config terom@88: """ terom@88: terom@88: # setup queue terom@88: self.queue = TaskQueue() terom@88: terom@88: # setup task threads terom@88: self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)] terom@88: terom@88: def submit (self, task) : terom@88: """ terom@88: Submit given task for execution. terom@88: terom@88: This garuntees that the task will eventually be returned via our queue. terom@88: """ terom@88: terom@88: if self.threads and task.threadable : terom@88: # schedule for later execution by a thread terom@88: self.queue.put_task(task) terom@88: terom@88: else : terom@88: # execute directly terom@88: self.queue.execute_task(task) terom@88: terom@88: def consume (self) : terom@88: """ terom@88: Consume a single result from the task queue, either returning the return value, or raising the task's error. terom@88: """ terom@88: terom@88: # get one task terom@88: task = self.queue.get_result() terom@88: terom@88: if task.exc_obj : terom@88: # XXX: is this threadsafe? terom@88: raise task.exc_obj terom@88: terom@88: else : terom@88: # successful execution terom@88: return task.retval terom@88: terom@88: terom@88: def execute (self, tasks) : terom@88: """ terom@88: Schedule a series of tasks for execution, and yield the return values as they become available. terom@88: terom@88: If a task raises an error, it will be re-raised. terom@88: terom@88: Waits for all the tasks to execute. terom@88: """ terom@88: terom@88: # number of tasks submitted; how many results we expect terom@88: task_count = len(tasks) terom@88: terom@88: # submit them terom@88: for task in tasks : terom@88: self.submit(task) terom@88: terom@88: # yield results terom@88: for count in xrange(task_count) : terom@88: yield self.consume() terom@88: terom@88: # pretty aliases terom@88: task = Task terom@88: threadable_task = ThreadableTask terom@88: