degal/task.py
branchthreaded-tasks
changeset 88 b1b0939517e7
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/task.py	Thu Jun 11 20:39:59 2009 +0300
@@ -0,0 +1,229 @@
+"""
+    Parallel execution of work, using threads.
+
+    The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
+    which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for 
+    execution by a TaskThread.
+
+    Example:
+
+    for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
+        print retval
+"""
+
+from __future__ import with_statement
+
+import threading, Queue
+import contextlib
+
+class Task (object) :
+    """
+        A task describes some work to do
+    """
+    
+    # this task can be executed in a Python thread of its own
+    threadable = False
+    
+    def __init__ (self, func, *args, **kwargs) :
+        """
+            Set this Task up to execute the given function/arguments
+        """
+
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+        self.retval = self.exc_obj = None
+    
+    def _execute (self) :
+        """
+            Low-level task execution function. Calls `execute` and stores the result/error
+        """
+
+        try :
+            # attempt to store normal retval
+            self.retval = self.execute()
+
+        except Exception, exc_obj :
+            # trap all errors
+            self.exc_obj = exc_obj
+
+    def execute (self) :
+        """
+            Execute the task and return any results.
+
+            If the task is threadable, then this method must not access any external state!
+
+            By default, this will execute and return the function given to __init__
+        """
+
+        return self.func(*self.args, **self.kwargs)
+
+class ThreadableTask (Task) :
+    """
+        A task that is marked as threadable by default
+    """
+
+    threadable = True
+
+class TaskQueue (object) :
+    """
+        A threadsafe queue of waiting tasks, keeping track of executing tasks.
+    """
+
+    def __init__ (self) :
+        """
+            Setup
+        """
+        
+        # queue of incoming, to-be-executed tasks
+        self.in_queue = Queue.Queue()
+
+        # queue of outgoing, was-executed tasks
+        self.out_queue = Queue.Queue();
+
+
+    @contextlib.contextmanager
+    def get_task (self) :
+        """
+            Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
+        """
+        
+        # XXX: shutdown signal?
+        task = self.in_queue.get()
+        
+        try :
+            # process it
+            yield task
+        
+        finally :
+            # mark it as complete
+            self.in_queue.task_done()
+    
+    def execute_task (self, task) :
+        """
+            Execute the given task, and put the result in our output queue
+        """
+        
+        # internal execute method
+        task._execute()
+        
+        # return value
+        self.out_queue.put(task)
+
+    def put_task (self, task) :
+        """
+            Add a task on to the queue.
+        """
+
+        self.in_queue.put(task)
+
+    def get_result (self) :
+        """
+            Wait for a single result from this queue.
+
+            XXX: nothing to garuntee that this won't block
+        """
+
+        return self.out_queue.get()
+
+class TaskThread (threading.Thread) :
+    """
+        A Thread that executes Tasks
+    """
+
+    def __init__ (self, queue) :
+        """
+            Setup thread and start waiting for requests
+        """
+        
+        super(TaskThread, self).__init__()
+
+        self.queue = queue
+
+        # run
+        self.start()
+
+    def run (self) :
+        """
+            Thread main
+        """
+        
+        # XXX: provide shutdown method
+        while True :
+            # get a task to execute
+            with self.queue.get_task() as task :
+                # execute it
+                self.queue.execute_task(task)
+
+class TaskManager (object) :
+    """
+        Provides the machinery to take tasks and execute them
+    """
+
+    def __init__ (self, config) :
+        """
+            Initialize with given config
+        """
+        
+        # setup queue
+        self.queue = TaskQueue()
+
+        # setup task threads
+        self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
+    
+    def submit (self, task) :
+        """
+            Submit given task for execution.
+
+            This garuntees that the task will eventually be returned via our queue.
+        """
+        
+        if self.threads and task.threadable :
+            # schedule for later execution by a thread
+            self.queue.put_task(task)
+        
+        else :
+            # execute directly
+            self.queue.execute_task(task)
+    
+    def consume (self) :
+        """
+            Consume a single result from the task queue, either returning the return value, or raising the task's error.
+        """
+        
+        # get one task
+        task = self.queue.get_result()
+
+        if task.exc_obj :
+            # XXX: is this threadsafe?
+            raise task.exc_obj
+
+        else :
+            # successful execution
+            return task.retval
+
+
+    def execute (self, tasks) :
+        """
+            Schedule a series of tasks for execution, and yield the return values as they become available.
+
+            If a task raises an error, it will be re-raised.
+
+            Waits for all the tasks to execute.
+        """
+
+        # number of tasks submitted; how many results we expect
+        task_count = len(tasks)
+
+        # submit them
+        for task in tasks :
+            self.submit(task)
+
+        # yield results
+        for count in xrange(task_count) :
+            yield self.consume()
+
+# pretty aliases
+task = Task
+threadable_task = ThreadableTask
+