terom@117: """ terom@117: Concurrent execution terom@117: """ terom@117: terom@117: from __future__ import with_statement terom@117: terom@117: import sys, threading, collections, traceback terom@117: terom@117: class Task (object) : terom@117: """ terom@117: Something to execute terom@117: """ terom@117: terom@117: def __init__ (self, func, *args, **kwargs) : terom@117: """ terom@117: Initialize to run the given function with arguments. terom@117: """ terom@117: terom@117: self.func = func terom@117: self.args = args terom@117: self.kwargs = kwargs terom@117: terom@117: def execute (self) : terom@117: """ terom@117: Execute this task normally, returning something or raising an error. terom@117: """ terom@117: terom@117: return self.func(*self.args, **self.kwargs) terom@117: terom@117: def execute_result (self) : terom@117: """ terom@117: Run this task, returning a Result. This should not raise any errors terom@117: """ terom@117: terom@117: try : terom@117: return Result(self.execute()) terom@117: terom@117: except : terom@117: return Failure() terom@117: terom@117: class Result (object) : terom@117: """ terom@117: The result from executing a task terom@117: """ terom@117: terom@117: def __init__ (self, res) : terom@117: """ Store the result """ terom@117: terom@117: self.res = res terom@117: terom@117: def evaluate (self) : terom@117: """ Returns the result value """ terom@117: terom@117: return self.res terom@117: terom@117: class Failure (object) : terom@117: """ terom@117: A failed result, causing an exception terom@117: """ terom@117: terom@117: def __init__ (self, exc_info=None) : terom@117: """ Store the given execption info, or use current exception if not given """ terom@117: terom@117: if exc_info : terom@117: self.exc_info = exc_info terom@117: terom@117: else : terom@117: self.exc_info = sys.exc_info() terom@117: terom@117: def evaluate (self) : terom@117: """ re-Raises the exception info """ terom@117: terom@117: # unpack terom@117: type, value, tb = self.exc_info terom@117: terom@117: # re-raise terom@117: raise type, value, tb terom@117: terom@117: class Queue (object) : terom@117: """ terom@117: A thread-safe Queue of waiting tasks, and their results. terom@117: """ terom@117: terom@117: def __init__ (self) : terom@117: """ terom@117: Setup our queues and locks. terom@117: """ terom@117: terom@117: # global lock terom@117: self.lock = threading.Lock() terom@117: terom@117: # queue of waiting tasks with wait lock terom@117: self.task_queue = collections.deque() terom@117: self.task_cond = threading.Condition(self.lock) terom@117: terom@117: # count of executing tasks terom@117: self.exec_count = 0 terom@117: terom@117: # queue of results terom@117: self.result_queue = collections.deque() terom@117: self.result_cond = threading.Condition(self.lock) terom@117: terom@117: def put_task (self, task) : terom@117: """ Enqueue a task for async execution """ terom@117: terom@117: with self.lock : terom@117: # push it terom@117: self.task_queue.append(task) terom@117: terom@117: # notify waiting terom@117: self.task_cond.notify() terom@117: terom@117: def put_tasks (self, tasks) : terom@117: """ Enqueue a number of tasks for async execution """ terom@117: terom@117: with self.lock : terom@117: # extend them terom@117: self.task_queue.extend(tasks) terom@117: terom@117: # notify multiple waiting terom@117: self.task_cond.notifyAll() terom@117: terom@117: def _get_task (self) : terom@117: """ Dequeue a task, incrementing exec_count """ terom@117: terom@117: with self.lock : terom@117: # wait for a task to become available terom@117: while not self.task_queue : terom@117: self.task_cond.wait() terom@117: terom@117: # count terom@117: self.exec_count += 1 terom@117: terom@117: # get terom@117: task = self.task_queue.popleft() terom@117: terom@117: return task terom@117: terom@117: def _put_result (self, result) : terom@117: """ Enqueue a return value, decrementing exec_count """ terom@117: terom@117: with self.lock : terom@117: # push it terom@117: self.result_queue.append(result) terom@117: terom@117: # count terom@117: self.exec_count -= 1 terom@117: terom@117: # notify waiting terom@117: self.result_cond.notify() terom@117: terom@117: def exec_task (self, task_func) : terom@117: """ terom@117: Get a task, and execute it using the given Executor, and put a result, handling errors. terom@117: terom@117: Maintains the task_exec_count terom@117: """ terom@117: terom@117: # get it terom@117: task = self._get_task() terom@117: terom@117: try : terom@117: # execute it terom@117: result = task_func(task) terom@117: terom@117: except : terom@117: # internal error terom@117: # XXX: something better to return? Maybe just dec exec_count, but don't return anything? terom@117: result = None terom@117: terom@117: # kill off the thread terom@117: raise terom@117: terom@117: finally : terom@117: # put the result terom@117: self._put_result(result) terom@117: terom@117: def drain (self) : terom@117: """ Return Results from this queue until all tasks have been executed and all results returned """ terom@117: terom@117: # yield/wait until empty terom@117: while True : terom@117: with self.lock : terom@117: # inspect current state terom@117: if self.result_queue : terom@117: # we have a result we can return terom@117: yield self.result_queue.popleft() terom@117: terom@117: elif self.task_queue or self.exec_count : terom@117: # tasks are still pending terom@117: self.result_cond.wait() terom@117: terom@117: else : terom@117: # no more tasks left terom@117: return terom@117: terom@117: class Thread (threading.Thread) : terom@117: """ terom@117: A concurrent-execution thread, which dequeues tasks, executes them, and returns the results. terom@117: """ terom@117: terom@117: def __init__ (self, queue) : terom@117: """ Create the thread and start running """ terom@117: terom@117: # init as a daemon thread terom@117: super(Thread, self).__init__() terom@117: self.daemon = True terom@117: terom@117: # store terom@117: self.queue = queue terom@117: terom@117: # run terom@117: self.start() terom@117: terom@117: def run_task (self) : terom@117: """ Get and handle a single task """ terom@117: terom@117: # let the queue handle safe-execution on the task using the Task.execute_result method directly terom@117: self.queue.exec_task(Task.execute_result) terom@117: terom@117: def run (self) : terom@117: """ Handle tasks from our queue """ terom@117: terom@117: try : terom@117: # XXX: something to quit for? terom@117: while True : terom@117: self.run_task() terom@117: terom@117: except : terom@117: from sys import stderr terom@117: terom@117: print >>stderr, "%s:" % self terom@117: terom@117: # can't do much more... terom@117: traceback.print_exc(file=stderr) terom@117: terom@117: class Manager (object) : terom@117: """ terom@117: Manages execution of tasks terom@117: """ terom@117: terom@117: def __init__ (self, thread_count=0) : terom@117: """ terom@117: Initialize our pool of executors terom@117: """ terom@117: terom@117: # our queue terom@117: self.queue = Queue() terom@117: terom@117: # thread pool terom@117: self.threads = [Thread(self.queue) for count in xrange(thread_count)] terom@117: terom@117: def execute_threaded (self, tasks) : terom@117: """ terom@117: Execute the given tasks using our thread pool terom@117: """ terom@117: terom@117: # schedule them all terom@117: self.queue.put_tasks(tasks) terom@117: terom@117: # then drain results terom@117: for result in self.queue.drain() : terom@117: # this will either return the value or raise the error terom@117: yield result.evaluate() terom@117: terom@117: def execute_directly (self, tasks) : terom@117: """ terom@117: Fallback to execute the given tasks without any concurrency. terom@117: """ terom@117: terom@117: for task in tasks : terom@117: yield task.execute() terom@117: terom@117: def execute (self, tasks) : terom@117: """ terom@117: Execute the given tasks concurrently. terom@117: terom@117: This will yield the result of each task, and return once all tasks have completed. terom@117: """ terom@117: terom@117: if self.threads : terom@117: # yay terom@117: return self.execute_threaded(tasks) terom@117: terom@117: else : terom@117: # fallback terom@117: return self.execute_directly(tasks) terom@117: # aliases terom@117: task = Task terom@117: