degal/concurrent.py
changeset 117 a2e4562deaab
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/degal/concurrent.py	Sun Jun 14 20:05:11 2009 +0300
@@ -0,0 +1,290 @@
+"""
+    Concurrent execution
+"""
+
+from __future__ import with_statement
+
+import sys, threading, collections, traceback
+
+class Task (object) :
+    """
+        Something to execute
+    """
+
+    def __init__ (self, func, *args, **kwargs) :
+        """
+            Initialize to run the given function with arguments.
+        """
+
+        self.func = func
+        self.args = args
+        self.kwargs = kwargs
+    
+    def execute (self) :
+        """
+            Execute this task normally, returning something or raising an error.
+        """
+
+        return self.func(*self.args, **self.kwargs)
+
+    def execute_result (self) :
+        """
+            Run this task, returning a Result. This should not raise any errors
+        """
+        
+        try :
+            return Result(self.execute())
+
+        except :
+            return Failure()
+
+class Result (object) :
+    """
+        The result from executing a task
+    """
+
+    def __init__ (self, res) :
+        """ Store the result """
+
+        self.res = res
+
+    def evaluate (self) :
+        """ Returns the result value """
+
+        return self.res
+
+class Failure (object) :
+    """
+        A failed result, causing an exception
+    """
+
+    def __init__ (self, exc_info=None) :
+        """ Store the given execption info, or use current exception if not given """
+
+        if exc_info :
+            self.exc_info = exc_info
+
+        else :
+            self.exc_info = sys.exc_info()
+    
+    def evaluate (self) :
+        """ re-Raises the exception info """
+        
+        # unpack
+        type, value, tb = self.exc_info
+        
+        # re-raise
+        raise type, value, tb
+
+class Queue (object) :
+    """
+        A thread-safe Queue of waiting tasks, and their results.
+    """
+    
+    def __init__ (self) :
+        """
+            Setup our queues and locks.
+        """
+
+        # global lock
+        self.lock = threading.Lock()
+
+        # queue of waiting tasks with wait lock
+        self.task_queue = collections.deque()
+        self.task_cond = threading.Condition(self.lock)
+
+        # count of executing tasks
+        self.exec_count = 0
+        
+        # queue of results
+        self.result_queue = collections.deque()
+        self.result_cond = threading.Condition(self.lock)
+
+    def put_task (self, task) :
+        """ Enqueue a task for async execution """
+
+        with self.lock :
+            # push it
+            self.task_queue.append(task)
+
+            # notify waiting
+            self.task_cond.notify()
+    
+    def put_tasks (self, tasks) :
+        """ Enqueue a number of tasks for async execution """
+
+        with self.lock :
+            # extend them
+            self.task_queue.extend(tasks)
+
+            # notify multiple waiting
+            self.task_cond.notifyAll()
+
+    def _get_task (self) :
+        """ Dequeue a task, incrementing exec_count """
+        
+        with self.lock :
+            # wait for a task to become available
+            while not self.task_queue :
+                self.task_cond.wait()
+
+            # count
+            self.exec_count += 1
+
+            # get
+            task = self.task_queue.popleft()
+
+            return task
+    
+    def _put_result (self, result) :
+        """ Enqueue a return value, decrementing exec_count """
+
+        with self.lock :
+            # push it
+            self.result_queue.append(result)
+
+            # count
+            self.exec_count -= 1
+ 
+            # notify waiting
+            self.result_cond.notify()
+   
+    def exec_task (self, task_func) :
+        """
+            Get a task, and execute it using the given Executor, and put a result, handling errors.
+
+            Maintains the task_exec_count
+        """
+        
+        # get it
+        task = self._get_task()
+
+        try :
+            # execute it
+            result = task_func(task)
+        
+        except :
+            # internal error
+            # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
+            result = None
+            
+            # kill off the thread
+            raise
+
+        finally :
+            # put the result
+            self._put_result(result)
+    
+    def drain (self) :
+        """ Return Results from this queue until all tasks have been executed and all results returned """
+        
+        # yield/wait until empty
+        while True :
+            with self.lock :
+                # inspect current state
+                if self.result_queue :
+                    # we have a result we can return
+                    yield self.result_queue.popleft()
+
+                elif self.task_queue or self.exec_count :
+                    # tasks are still pending
+                    self.result_cond.wait()
+
+                else :
+                    # no more tasks left
+                    return
+
+class Thread (threading.Thread) :
+    """
+        A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
+    """
+
+    def __init__ (self, queue) :
+        """ Create the thread and start running """
+        
+        # init as a daemon thread
+        super(Thread, self).__init__()
+        self.daemon = True
+
+        # store
+        self.queue = queue
+
+        # run
+        self.start()
+    
+    def run_task (self) :
+        """ Get and handle a single task """
+
+        # let the queue handle safe-execution on the task using the Task.execute_result method directly
+        self.queue.exec_task(Task.execute_result)
+
+    def run (self) :
+        """ Handle tasks from our queue """
+        
+        try :
+            # XXX: something to quit for?
+            while True :
+                self.run_task()
+        
+        except :
+            from sys import stderr
+            
+            print >>stderr, "%s:" % self
+
+            # can't do much more...
+            traceback.print_exc(file=stderr)
+
+class Manager (object) :
+    """
+        Manages execution of tasks
+    """
+
+    def __init__ (self, thread_count=0) :
+        """
+            Initialize our pool of executors
+        """
+        
+        # our queue
+        self.queue = Queue()
+        
+        # thread pool
+        self.threads = [Thread(self.queue) for count in xrange(thread_count)]
+
+    def execute_threaded (self, tasks) :
+        """
+            Execute the given tasks using our thread pool
+        """
+
+        # schedule them all
+        self.queue.put_tasks(tasks)
+
+        # then drain results
+        for result in self.queue.drain() :
+            # this will either return the value or raise the error
+            yield result.evaluate()
+    
+    def execute_directly (self, tasks) :
+        """
+            Fallback to execute the given tasks without any concurrency.
+        """
+
+        for task in tasks :
+            yield task.execute()
+
+    def execute (self, tasks) :
+        """
+            Execute the given tasks concurrently.
+            
+            This will yield the result of each task, and return once all tasks have completed.
+        """
+        
+        if self.threads :
+            # yay
+            return self.execute_threaded(tasks)
+        
+        else :
+            # fallback
+            return self.execute_directly(tasks)
+# aliases   
+task = Task
+