degal/concurrent.py
author Tero Marttila <terom@fixme.fi>
Thu, 02 Jul 2009 23:59:58 +0300
changeset 146 c226063eeb65
parent 117 a2e4562deaab
permissions -rw-r--r--
remove obsolete db.py
"""
    Concurrent execution
"""

from __future__ import with_statement

import sys, threading, collections, traceback

class Task (object) :
    """
        Something to execute
    """

    def __init__ (self, func, *args, **kwargs) :
        """
            Initialize to run the given function with arguments.
        """

        self.func = func
        self.args = args
        self.kwargs = kwargs
    
    def execute (self) :
        """
            Execute this task normally, returning something or raising an error.
        """

        return self.func(*self.args, **self.kwargs)

    def execute_result (self) :
        """
            Run this task, returning a Result. This should not raise any errors
        """
        
        try :
            return Result(self.execute())

        except :
            return Failure()

class Result (object) :
    """
        The result from executing a task
    """

    def __init__ (self, res) :
        """ Store the result """

        self.res = res

    def evaluate (self) :
        """ Returns the result value """

        return self.res

class Failure (object) :
    """
        A failed result, causing an exception
    """

    def __init__ (self, exc_info=None) :
        """ Store the given execption info, or use current exception if not given """

        if exc_info :
            self.exc_info = exc_info

        else :
            self.exc_info = sys.exc_info()
    
    def evaluate (self) :
        """ re-Raises the exception info """
        
        # unpack
        type, value, tb = self.exc_info
        
        # re-raise
        raise type, value, tb

class Queue (object) :
    """
        A thread-safe Queue of waiting tasks, and their results.
    """
    
    def __init__ (self) :
        """
            Setup our queues and locks.
        """

        # global lock
        self.lock = threading.Lock()

        # queue of waiting tasks with wait lock
        self.task_queue = collections.deque()
        self.task_cond = threading.Condition(self.lock)

        # count of executing tasks
        self.exec_count = 0
        
        # queue of results
        self.result_queue = collections.deque()
        self.result_cond = threading.Condition(self.lock)

    def put_task (self, task) :
        """ Enqueue a task for async execution """

        with self.lock :
            # push it
            self.task_queue.append(task)

            # notify waiting
            self.task_cond.notify()
    
    def put_tasks (self, tasks) :
        """ Enqueue a number of tasks for async execution """

        with self.lock :
            # extend them
            self.task_queue.extend(tasks)

            # notify multiple waiting
            self.task_cond.notifyAll()

    def _get_task (self) :
        """ Dequeue a task, incrementing exec_count """
        
        with self.lock :
            # wait for a task to become available
            while not self.task_queue :
                self.task_cond.wait()

            # count
            self.exec_count += 1

            # get
            task = self.task_queue.popleft()

            return task
    
    def _put_result (self, result) :
        """ Enqueue a return value, decrementing exec_count """

        with self.lock :
            # push it
            self.result_queue.append(result)

            # count
            self.exec_count -= 1
 
            # notify waiting
            self.result_cond.notify()
   
    def exec_task (self, task_func) :
        """
            Get a task, and execute it using the given Executor, and put a result, handling errors.

            Maintains the task_exec_count
        """
        
        # get it
        task = self._get_task()

        try :
            # execute it
            result = task_func(task)
        
        except :
            # internal error
            # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
            result = None
            
            # kill off the thread
            raise

        finally :
            # put the result
            self._put_result(result)
    
    def drain (self) :
        """ Return Results from this queue until all tasks have been executed and all results returned """
        
        # yield/wait until empty
        while True :
            with self.lock :
                # inspect current state
                if self.result_queue :
                    # we have a result we can return
                    yield self.result_queue.popleft()

                elif self.task_queue or self.exec_count :
                    # tasks are still pending
                    self.result_cond.wait()

                else :
                    # no more tasks left
                    return

class Thread (threading.Thread) :
    """
        A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
    """

    def __init__ (self, queue) :
        """ Create the thread and start running """
        
        # init as a daemon thread
        super(Thread, self).__init__()
        self.daemon = True

        # store
        self.queue = queue

        # run
        self.start()
    
    def run_task (self) :
        """ Get and handle a single task """

        # let the queue handle safe-execution on the task using the Task.execute_result method directly
        self.queue.exec_task(Task.execute_result)

    def run (self) :
        """ Handle tasks from our queue """
        
        try :
            # XXX: something to quit for?
            while True :
                self.run_task()
        
        except :
            from sys import stderr
            
            print >>stderr, "%s:" % self

            # can't do much more...
            traceback.print_exc(file=stderr)

class Manager (object) :
    """
        Manages execution of tasks
    """

    def __init__ (self, thread_count=0) :
        """
            Initialize our pool of executors
        """
        
        # our queue
        self.queue = Queue()
        
        # thread pool
        self.threads = [Thread(self.queue) for count in xrange(thread_count)]

    def execute_threaded (self, tasks) :
        """
            Execute the given tasks using our thread pool
        """

        # schedule them all
        self.queue.put_tasks(tasks)

        # then drain results
        for result in self.queue.drain() :
            # this will either return the value or raise the error
            yield result.evaluate()
    
    def execute_directly (self, tasks) :
        """
            Fallback to execute the given tasks without any concurrency.
        """

        for task in tasks :
            yield task.execute()

    def execute (self, tasks) :
        """
            Execute the given tasks concurrently.
            
            This will yield the result of each task, and return once all tasks have completed.
        """
        
        if self.threads :
            # yay
            return self.execute_threaded(tasks)
        
        else :
            # fallback
            return self.execute_directly(tasks)
# aliases   
task = Task