author Tero Marttila <>
Mon, 15 Jun 2009 01:36:58 +0300
changeset 125 74f135774567
parent 117 a2e4562deaab
permissions -rw-r--r--
fix rotated size of auto-oriented thumbnails, and throw some code at mirroring - untested
    Concurrent execution

from __future__ import with_statement

import sys, threading, collections, traceback

class Task (object) :
        Something to execute

    def __init__ (self, func, *args, **kwargs) :
            Initialize to run the given function with arguments.

        self.func = func
        self.args = args
        self.kwargs = kwargs
    def execute (self) :
            Execute this task normally, returning something or raising an error.

        return self.func(*self.args, **self.kwargs)

    def execute_result (self) :
            Run this task, returning a Result. This should not raise any errors
        try :
            return Result(self.execute())

        except :
            return Failure()

class Result (object) :
        The result from executing a task

    def __init__ (self, res) :
        """ Store the result """

        self.res = res

    def evaluate (self) :
        """ Returns the result value """

        return self.res

class Failure (object) :
        A failed result, causing an exception

    def __init__ (self, exc_info=None) :
        """ Store the given execption info, or use current exception if not given """

        if exc_info :
            self.exc_info = exc_info

        else :
            self.exc_info = sys.exc_info()
    def evaluate (self) :
        """ re-Raises the exception info """
        # unpack
        type, value, tb = self.exc_info
        # re-raise
        raise type, value, tb

class Queue (object) :
        A thread-safe Queue of waiting tasks, and their results.
    def __init__ (self) :
            Setup our queues and locks.

        # global lock
        self.lock = threading.Lock()

        # queue of waiting tasks with wait lock
        self.task_queue = collections.deque()
        self.task_cond = threading.Condition(self.lock)

        # count of executing tasks
        self.exec_count = 0
        # queue of results
        self.result_queue = collections.deque()
        self.result_cond = threading.Condition(self.lock)

    def put_task (self, task) :
        """ Enqueue a task for async execution """

        with self.lock :
            # push it

            # notify waiting
    def put_tasks (self, tasks) :
        """ Enqueue a number of tasks for async execution """

        with self.lock :
            # extend them

            # notify multiple waiting

    def _get_task (self) :
        """ Dequeue a task, incrementing exec_count """
        with self.lock :
            # wait for a task to become available
            while not self.task_queue :

            # count
            self.exec_count += 1

            # get
            task = self.task_queue.popleft()

            return task
    def _put_result (self, result) :
        """ Enqueue a return value, decrementing exec_count """

        with self.lock :
            # push it

            # count
            self.exec_count -= 1
            # notify waiting
    def exec_task (self, task_func) :
            Get a task, and execute it using the given Executor, and put a result, handling errors.

            Maintains the task_exec_count
        # get it
        task = self._get_task()

        try :
            # execute it
            result = task_func(task)
        except :
            # internal error
            # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
            result = None
            # kill off the thread

        finally :
            # put the result
    def drain (self) :
        """ Return Results from this queue until all tasks have been executed and all results returned """
        # yield/wait until empty
        while True :
            with self.lock :
                # inspect current state
                if self.result_queue :
                    # we have a result we can return
                    yield self.result_queue.popleft()

                elif self.task_queue or self.exec_count :
                    # tasks are still pending

                else :
                    # no more tasks left

class Thread (threading.Thread) :
        A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.

    def __init__ (self, queue) :
        """ Create the thread and start running """
        # init as a daemon thread
        super(Thread, self).__init__()
        self.daemon = True

        # store
        self.queue = queue

        # run
    def run_task (self) :
        """ Get and handle a single task """

        # let the queue handle safe-execution on the task using the Task.execute_result method directly

    def run (self) :
        """ Handle tasks from our queue """
        try :
            # XXX: something to quit for?
            while True :
        except :
            from sys import stderr
            print >>stderr, "%s:" % self

            # can't do much more...

class Manager (object) :
        Manages execution of tasks

    def __init__ (self, thread_count=0) :
            Initialize our pool of executors
        # our queue
        self.queue = Queue()
        # thread pool
        self.threads = [Thread(self.queue) for count in xrange(thread_count)]

    def execute_threaded (self, tasks) :
            Execute the given tasks using our thread pool

        # schedule them all

        # then drain results
        for result in self.queue.drain() :
            # this will either return the value or raise the error
            yield result.evaluate()
    def execute_directly (self, tasks) :
            Fallback to execute the given tasks without any concurrency.

        for task in tasks :
            yield task.execute()

    def execute (self, tasks) :
            Execute the given tasks concurrently.
            This will yield the result of each task, and return once all tasks have completed.
        if self.threads :
            # yay
            return self.execute_threaded(tasks)
        else :
            # fallback
            return self.execute_directly(tasks)
# aliases   
task = Task