degal/concurrent.py
changeset 117 a2e4562deaab
equal deleted inserted replaced
116:2d3721b9ffd0 117:a2e4562deaab
       
     1 """
       
     2     Concurrent execution
       
     3 """
       
     4 
       
     5 from __future__ import with_statement
       
     6 
       
     7 import sys, threading, collections, traceback
       
     8 
       
     9 class Task (object) :
       
    10     """
       
    11         Something to execute
       
    12     """
       
    13 
       
    14     def __init__ (self, func, *args, **kwargs) :
       
    15         """
       
    16             Initialize to run the given function with arguments.
       
    17         """
       
    18 
       
    19         self.func = func
       
    20         self.args = args
       
    21         self.kwargs = kwargs
       
    22     
       
    23     def execute (self) :
       
    24         """
       
    25             Execute this task normally, returning something or raising an error.
       
    26         """
       
    27 
       
    28         return self.func(*self.args, **self.kwargs)
       
    29 
       
    30     def execute_result (self) :
       
    31         """
       
    32             Run this task, returning a Result. This should not raise any errors
       
    33         """
       
    34         
       
    35         try :
       
    36             return Result(self.execute())
       
    37 
       
    38         except :
       
    39             return Failure()
       
    40 
       
    41 class Result (object) :
       
    42     """
       
    43         The result from executing a task
       
    44     """
       
    45 
       
    46     def __init__ (self, res) :
       
    47         """ Store the result """
       
    48 
       
    49         self.res = res
       
    50 
       
    51     def evaluate (self) :
       
    52         """ Returns the result value """
       
    53 
       
    54         return self.res
       
    55 
       
    56 class Failure (object) :
       
    57     """
       
    58         A failed result, causing an exception
       
    59     """
       
    60 
       
    61     def __init__ (self, exc_info=None) :
       
    62         """ Store the given execption info, or use current exception if not given """
       
    63 
       
    64         if exc_info :
       
    65             self.exc_info = exc_info
       
    66 
       
    67         else :
       
    68             self.exc_info = sys.exc_info()
       
    69     
       
    70     def evaluate (self) :
       
    71         """ re-Raises the exception info """
       
    72         
       
    73         # unpack
       
    74         type, value, tb = self.exc_info
       
    75         
       
    76         # re-raise
       
    77         raise type, value, tb
       
    78 
       
    79 class Queue (object) :
       
    80     """
       
    81         A thread-safe Queue of waiting tasks, and their results.
       
    82     """
       
    83     
       
    84     def __init__ (self) :
       
    85         """
       
    86             Setup our queues and locks.
       
    87         """
       
    88 
       
    89         # global lock
       
    90         self.lock = threading.Lock()
       
    91 
       
    92         # queue of waiting tasks with wait lock
       
    93         self.task_queue = collections.deque()
       
    94         self.task_cond = threading.Condition(self.lock)
       
    95 
       
    96         # count of executing tasks
       
    97         self.exec_count = 0
       
    98         
       
    99         # queue of results
       
   100         self.result_queue = collections.deque()
       
   101         self.result_cond = threading.Condition(self.lock)
       
   102 
       
   103     def put_task (self, task) :
       
   104         """ Enqueue a task for async execution """
       
   105 
       
   106         with self.lock :
       
   107             # push it
       
   108             self.task_queue.append(task)
       
   109 
       
   110             # notify waiting
       
   111             self.task_cond.notify()
       
   112     
       
   113     def put_tasks (self, tasks) :
       
   114         """ Enqueue a number of tasks for async execution """
       
   115 
       
   116         with self.lock :
       
   117             # extend them
       
   118             self.task_queue.extend(tasks)
       
   119 
       
   120             # notify multiple waiting
       
   121             self.task_cond.notifyAll()
       
   122 
       
   123     def _get_task (self) :
       
   124         """ Dequeue a task, incrementing exec_count """
       
   125         
       
   126         with self.lock :
       
   127             # wait for a task to become available
       
   128             while not self.task_queue :
       
   129                 self.task_cond.wait()
       
   130 
       
   131             # count
       
   132             self.exec_count += 1
       
   133 
       
   134             # get
       
   135             task = self.task_queue.popleft()
       
   136 
       
   137             return task
       
   138     
       
   139     def _put_result (self, result) :
       
   140         """ Enqueue a return value, decrementing exec_count """
       
   141 
       
   142         with self.lock :
       
   143             # push it
       
   144             self.result_queue.append(result)
       
   145 
       
   146             # count
       
   147             self.exec_count -= 1
       
   148  
       
   149             # notify waiting
       
   150             self.result_cond.notify()
       
   151    
       
   152     def exec_task (self, task_func) :
       
   153         """
       
   154             Get a task, and execute it using the given Executor, and put a result, handling errors.
       
   155 
       
   156             Maintains the task_exec_count
       
   157         """
       
   158         
       
   159         # get it
       
   160         task = self._get_task()
       
   161 
       
   162         try :
       
   163             # execute it
       
   164             result = task_func(task)
       
   165         
       
   166         except :
       
   167             # internal error
       
   168             # XXX: something better to return? Maybe just dec exec_count, but don't return anything?
       
   169             result = None
       
   170             
       
   171             # kill off the thread
       
   172             raise
       
   173 
       
   174         finally :
       
   175             # put the result
       
   176             self._put_result(result)
       
   177     
       
   178     def drain (self) :
       
   179         """ Return Results from this queue until all tasks have been executed and all results returned """
       
   180         
       
   181         # yield/wait until empty
       
   182         while True :
       
   183             with self.lock :
       
   184                 # inspect current state
       
   185                 if self.result_queue :
       
   186                     # we have a result we can return
       
   187                     yield self.result_queue.popleft()
       
   188 
       
   189                 elif self.task_queue or self.exec_count :
       
   190                     # tasks are still pending
       
   191                     self.result_cond.wait()
       
   192 
       
   193                 else :
       
   194                     # no more tasks left
       
   195                     return
       
   196 
       
   197 class Thread (threading.Thread) :
       
   198     """
       
   199         A concurrent-execution thread, which dequeues tasks, executes them, and returns the results.
       
   200     """
       
   201 
       
   202     def __init__ (self, queue) :
       
   203         """ Create the thread and start running """
       
   204         
       
   205         # init as a daemon thread
       
   206         super(Thread, self).__init__()
       
   207         self.daemon = True
       
   208 
       
   209         # store
       
   210         self.queue = queue
       
   211 
       
   212         # run
       
   213         self.start()
       
   214     
       
   215     def run_task (self) :
       
   216         """ Get and handle a single task """
       
   217 
       
   218         # let the queue handle safe-execution on the task using the Task.execute_result method directly
       
   219         self.queue.exec_task(Task.execute_result)
       
   220 
       
   221     def run (self) :
       
   222         """ Handle tasks from our queue """
       
   223         
       
   224         try :
       
   225             # XXX: something to quit for?
       
   226             while True :
       
   227                 self.run_task()
       
   228         
       
   229         except :
       
   230             from sys import stderr
       
   231             
       
   232             print >>stderr, "%s:" % self
       
   233 
       
   234             # can't do much more...
       
   235             traceback.print_exc(file=stderr)
       
   236 
       
   237 class Manager (object) :
       
   238     """
       
   239         Manages execution of tasks
       
   240     """
       
   241 
       
   242     def __init__ (self, thread_count=0) :
       
   243         """
       
   244             Initialize our pool of executors
       
   245         """
       
   246         
       
   247         # our queue
       
   248         self.queue = Queue()
       
   249         
       
   250         # thread pool
       
   251         self.threads = [Thread(self.queue) for count in xrange(thread_count)]
       
   252 
       
   253     def execute_threaded (self, tasks) :
       
   254         """
       
   255             Execute the given tasks using our thread pool
       
   256         """
       
   257 
       
   258         # schedule them all
       
   259         self.queue.put_tasks(tasks)
       
   260 
       
   261         # then drain results
       
   262         for result in self.queue.drain() :
       
   263             # this will either return the value or raise the error
       
   264             yield result.evaluate()
       
   265     
       
   266     def execute_directly (self, tasks) :
       
   267         """
       
   268             Fallback to execute the given tasks without any concurrency.
       
   269         """
       
   270 
       
   271         for task in tasks :
       
   272             yield task.execute()
       
   273 
       
   274     def execute (self, tasks) :
       
   275         """
       
   276             Execute the given tasks concurrently.
       
   277             
       
   278             This will yield the result of each task, and return once all tasks have completed.
       
   279         """
       
   280         
       
   281         if self.threads :
       
   282             # yay
       
   283             return self.execute_threaded(tasks)
       
   284         
       
   285         else :
       
   286             # fallback
       
   287             return self.execute_directly(tasks)
       
   288 # aliases   
       
   289 task = Task
       
   290