degal/task.py
author Tero Marttila <terom@fixme.fi>
Thu, 11 Jun 2009 21:26:05 +0300
branchthreaded-tasks
changeset 89 4b254a90d6d0
parent 88 b1b0939517e7
permissions -rw-r--r--
add task_threads config setting
"""
    Parallel execution of work, using threads.

    The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
    which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for 
    execution by a TaskThread.

    Example:

    for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
        print retval
"""

from __future__ import with_statement

import threading, Queue
import contextlib

class Task (object) :
    """
        A task describes some work to do
    """
    
    # this task can be executed in a Python thread of its own
    threadable = False
    
    def __init__ (self, func, *args, **kwargs) :
        """
            Set this Task up to execute the given function/arguments
        """

        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.retval = self.exc_obj = None
    
    def _execute (self) :
        """
            Low-level task execution function. Calls `execute` and stores the result/error
        """

        try :
            # attempt to store normal retval
            self.retval = self.execute()

        except Exception, exc_obj :
            # trap all errors
            self.exc_obj = exc_obj

    def execute (self) :
        """
            Execute the task and return any results.

            If the task is threadable, then this method must not access any external state!

            By default, this will execute and return the function given to __init__
        """

        return self.func(*self.args, **self.kwargs)

class ThreadableTask (Task) :
    """
        A task that is marked as threadable by default
    """

    threadable = True

class TaskQueue (object) :
    """
        A threadsafe queue of waiting tasks, keeping track of executing tasks.
    """

    def __init__ (self) :
        """
            Setup
        """
        
        # queue of incoming, to-be-executed tasks
        self.in_queue = Queue.Queue()

        # queue of outgoing, was-executed tasks
        self.out_queue = Queue.Queue();


    @contextlib.contextmanager
    def get_task (self) :
        """
            Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
        """
        
        # XXX: shutdown signal?
        task = self.in_queue.get()
        
        try :
            # process it
            yield task
        
        finally :
            # mark it as complete
            self.in_queue.task_done()
    
    def execute_task (self, task) :
        """
            Execute the given task, and put the result in our output queue
        """
        
        # internal execute method
        task._execute()
        
        # return value
        self.out_queue.put(task)

    def put_task (self, task) :
        """
            Add a task on to the queue.
        """

        self.in_queue.put(task)

    def get_result (self) :
        """
            Wait for a single result from this queue.

            XXX: nothing to garuntee that this won't block
        """

        return self.out_queue.get()

class TaskThread (threading.Thread) :
    """
        A Thread that executes Tasks
    """

    def __init__ (self, queue) :
        """
            Setup thread and start waiting for requests
        """
        
        super(TaskThread, self).__init__()

        self.queue = queue

        # run
        self.start()

    def run (self) :
        """
            Thread main
        """
        
        # XXX: provide shutdown method
        while True :
            # get a task to execute
            with self.queue.get_task() as task :
                # execute it
                self.queue.execute_task(task)

class TaskManager (object) :
    """
        Provides the machinery to take tasks and execute them
    """

    def __init__ (self, config) :
        """
            Initialize with given config
        """
        
        # setup queue
        self.queue = TaskQueue()

        # setup task threads
        self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
    
    def submit (self, task) :
        """
            Submit given task for execution.

            This garuntees that the task will eventually be returned via our queue.
        """
        
        if self.threads and task.threadable :
            # schedule for later execution by a thread
            self.queue.put_task(task)
        
        else :
            # execute directly
            self.queue.execute_task(task)
    
    def consume (self) :
        """
            Consume a single result from the task queue, either returning the return value, or raising the task's error.
        """
        
        # get one task
        task = self.queue.get_result()

        if task.exc_obj :
            # XXX: is this threadsafe?
            raise task.exc_obj

        else :
            # successful execution
            return task.retval


    def execute (self, tasks) :
        """
            Schedule a series of tasks for execution, and yield the return values as they become available.

            If a task raises an error, it will be re-raised.

            Waits for all the tasks to execute.
        """

        # number of tasks submitted; how many results we expect
        task_count = len(tasks)

        # submit them
        for task in tasks :
            self.submit(task)

        # yield results
        for count in xrange(task_count) :
            yield self.consume()

# pretty aliases
task = Task
threadable_task = ThreadableTask