degal/concurrent.py
author Tero Marttila <terom@fixme.fi>
Wed, 01 Jul 2009 20:40:00 +0300
changeset 141 9387da0dc183
parent 117 a2e4562deaab
permissions -rw-r--r--
move .config from filesystem to gallery/folder/image, rename degal_dir to app_dir
117
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     1
"""
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     2
    Concurrent execution
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     3
"""
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     4
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     5
from __future__ import with_statement
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     6
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     7
import sys, threading, collections, traceback
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     8
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
     9
class Task (object) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    10
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    11
        Something to execute
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    12
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    13
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    14
    def __init__ (self, func, *args, **kwargs) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    15
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    16
            Initialize to run the given function with arguments.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    17
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    18
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    19
        self.func = func
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    20
        self.args = args
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    21
        self.kwargs = kwargs
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    22
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    23
    def execute (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    24
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    25
            Execute this task normally, returning something or raising an error.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    26
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    27
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    28
        return self.func(*self.args, **self.kwargs)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    29
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    30
    def execute_result (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    31
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    32
            Run this task, returning a Result. This should not raise any errors
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    33
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    34
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    35
        try :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    36
            return Result(self.execute())
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    37
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    38
        except :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    39
            return Failure()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    40
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    41
class Result (object) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    42
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    43
        The result from executing a task
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    44
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    45
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    46
    def __init__ (self, res) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    47
        """ Store the result """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    48
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    49
        self.res = res
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    50
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    51
    def evaluate (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    52
        """ Returns the result value """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    53
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    54
        return self.res
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    55
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    56
class Failure (object) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    57
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    58
        A failed result, causing an exception
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    59
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    60
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    61
    def __init__ (self, exc_info=None) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    62
        """ Store the given execption info, or use current exception if not given """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    63
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    64
        if exc_info :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    65
            self.exc_info = exc_info
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    66
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    67
        else :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    68
            self.exc_info = sys.exc_info()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    69
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    70
    def evaluate (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    71
        """ re-Raises the exception info """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    72
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    73
        # unpack
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    74
        type, value, tb = self.exc_info
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    75
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    76
        # re-raise
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    77
        raise type, value, tb
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    78
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    79
class Queue (object) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    80
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    81
        A thread-safe Queue of waiting tasks, and their results.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    82
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    83
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    84
    def __init__ (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    85
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    86
            Setup our queues and locks.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    87
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    88
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    89
        # global lock
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    90
        self.lock = threading.Lock()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    91
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    92
        # queue of waiting tasks with wait lock
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    93
        self.task_queue = collections.deque()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    94
        self.task_cond = threading.Condition(self.lock)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    95
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    96
        # count of executing tasks
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    97
        self.exec_count = 0
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    98
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
    99
        # queue of results
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   100
        self.result_queue = collections.deque()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   101
        self.result_cond = threading.Condition(self.lock)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   102
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   103
    def put_task (self, task) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   104
        """ Enqueue a task for async execution """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   105
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   106
        with self.lock :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   107
            # push it
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   108
            self.task_queue.append(task)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   109
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   110
            # notify waiting
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   111
            self.task_cond.notify()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   112
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   113
    def put_tasks (self, tasks) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   114
        """ Enqueue a number of tasks for async execution """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   115
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   116
        with self.lock :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   117
            # extend them
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   118
            self.task_queue.extend(tasks)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   119
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   120
            # notify multiple waiting
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   121
            self.task_cond.notifyAll()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   122
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   123
    def _get_task (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   124
        """ Dequeue a task, incrementing exec_count """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   125
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   126
        with self.lock :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   127
            # wait for a task to become available
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   128
            while not self.task_queue :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   129
                self.task_cond.wait()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   130
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   131
            # count
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   132
            self.exec_count += 1
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   133
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   134
            # get
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   135
            task = self.task_queue.popleft()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   136
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   137
            return task
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   138
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   139
    def _put_result (self, result) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   140
        """ Enqueue a return value, decrementing exec_count """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   141
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   142
        with self.lock :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   143
            # push it
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   144
            self.result_queue.append(result)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   145
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   146
            # count
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   147
            self.exec_count -= 1
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   148
 
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   149
            # notify waiting
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   150
            self.result_cond.notify()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   151
   
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   152
    def exec_task (self, task_func) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   153
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   154
            Get a task, and execute it using the given Executor, and put a result, handling errors.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   155
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   156
            Maintains the task_exec_count
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   157
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   158
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   159
        # get it
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   160
        task = self._get_task()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   161
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   162
        try :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   163
            # execute it
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   164
            result = task_func(task)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   165
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   166
        except :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   167
            # internal error
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   168
            # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   169
            result = None
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   170
            
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   171
            # kill off the thread
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   172
            raise
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   173
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   174
        finally :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   175
            # put the result
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   176
            self._put_result(result)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   177
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   178
    def drain (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   179
        """ Return Results from this queue until all tasks have been executed and all results returned """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   180
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   181
        # yield/wait until empty
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   182
        while True :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   183
            with self.lock :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   184
                # inspect current state
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   185
                if self.result_queue :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   186
                    # we have a result we can return
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   187
                    yield self.result_queue.popleft()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   188
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   189
                elif self.task_queue or self.exec_count :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   190
                    # tasks are still pending
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   191
                    self.result_cond.wait()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   192
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   193
                else :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   194
                    # no more tasks left
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   195
                    return
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   196
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   197
class Thread (threading.Thread) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   198
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   199
        A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   200
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   201
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   202
    def __init__ (self, queue) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   203
        """ Create the thread and start running """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   204
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   205
        # init as a daemon thread
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   206
        super(Thread, self).__init__()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   207
        self.daemon = True
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   208
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   209
        # store
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   210
        self.queue = queue
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   211
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   212
        # run
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   213
        self.start()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   214
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   215
    def run_task (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   216
        """ Get and handle a single task """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   217
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   218
        # let the queue handle safe-execution on the task using the Task.execute_result method directly
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   219
        self.queue.exec_task(Task.execute_result)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   220
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   221
    def run (self) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   222
        """ Handle tasks from our queue """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   223
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   224
        try :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   225
            # XXX: something to quit for?
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   226
            while True :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   227
                self.run_task()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   228
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   229
        except :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   230
            from sys import stderr
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   231
            
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   232
            print >>stderr, "%s:" % self
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   233
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   234
            # can't do much more...
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   235
            traceback.print_exc(file=stderr)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   236
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   237
class Manager (object) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   238
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   239
        Manages execution of tasks
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   240
    """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   241
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   242
    def __init__ (self, thread_count=0) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   243
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   244
            Initialize our pool of executors
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   245
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   246
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   247
        # our queue
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   248
        self.queue = Queue()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   249
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   250
        # thread pool
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   251
        self.threads = [Thread(self.queue) for count in xrange(thread_count)]
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   252
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   253
    def execute_threaded (self, tasks) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   254
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   255
            Execute the given tasks using our thread pool
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   256
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   257
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   258
        # schedule them all
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   259
        self.queue.put_tasks(tasks)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   260
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   261
        # then drain results
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   262
        for result in self.queue.drain() :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   263
            # this will either return the value or raise the error
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   264
            yield result.evaluate()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   265
    
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   266
    def execute_directly (self, tasks) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   267
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   268
            Fallback to execute the given tasks without any concurrency.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   269
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   270
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   271
        for task in tasks :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   272
            yield task.execute()
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   273
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   274
    def execute (self, tasks) :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   275
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   276
            Execute the given tasks concurrently.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   277
            
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   278
            This will yield the result of each task, and return once all tasks have completed.
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   279
        """
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   280
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   281
        if self.threads :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   282
            # yay
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   283
            return self.execute_threaded(tasks)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   284
        
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   285
        else :
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   286
            # fallback
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   287
            return self.execute_directly(tasks)
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   288
# aliases   
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   289
task = Task
a2e4562deaab implement concurrency... :)
Tero Marttila <terom@fixme.fi>
parents:
diff changeset
   290