degal/task.py
branchthreaded-tasks
changeset 88 b1b0939517e7
equal deleted inserted replaced
87:a7a18893730d 88:b1b0939517e7
       
     1 """
       
     2     Parallel execution of work, using threads.
       
     3 
       
     4     The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager,
       
     5     which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for 
       
     6     execution by a TaskThread.
       
     7 
       
     8     Example:
       
     9 
       
    10     for retval in tasks.execute(threadable_task(foo_func, item) for item in items) :
       
    11         print retval
       
    12 """
       
    13 
       
    14 from __future__ import with_statement
       
    15 
       
    16 import threading, Queue
       
    17 import contextlib
       
    18 
       
    19 class Task (object) :
       
    20     """
       
    21         A task describes some work to do
       
    22     """
       
    23     
       
    24     # this task can be executed in a Python thread of its own
       
    25     threadable = False
       
    26     
       
    27     def __init__ (self, func, *args, **kwargs) :
       
    28         """
       
    29             Set this Task up to execute the given function/arguments
       
    30         """
       
    31 
       
    32         self.func = func
       
    33         self.args = args
       
    34         self.kwargs = kwargs
       
    35         self.retval = self.exc_obj = None
       
    36     
       
    37     def _execute (self) :
       
    38         """
       
    39             Low-level task execution function. Calls `execute` and stores the result/error
       
    40         """
       
    41 
       
    42         try :
       
    43             # attempt to store normal retval
       
    44             self.retval = self.execute()
       
    45 
       
    46         except Exception, exc_obj :
       
    47             # trap all errors
       
    48             self.exc_obj = exc_obj
       
    49 
       
    50     def execute (self) :
       
    51         """
       
    52             Execute the task and return any results.
       
    53 
       
    54             If the task is threadable, then this method must not access any external state!
       
    55 
       
    56             By default, this will execute and return the function given to __init__
       
    57         """
       
    58 
       
    59         return self.func(*self.args, **self.kwargs)
       
    60 
       
    61 class ThreadableTask (Task) :
       
    62     """
       
    63         A task that is marked as threadable by default
       
    64     """
       
    65 
       
    66     threadable = True
       
    67 
       
    68 class TaskQueue (object) :
       
    69     """
       
    70         A threadsafe queue of waiting tasks, keeping track of executing tasks.
       
    71     """
       
    72 
       
    73     def __init__ (self) :
       
    74         """
       
    75             Setup
       
    76         """
       
    77         
       
    78         # queue of incoming, to-be-executed tasks
       
    79         self.in_queue = Queue.Queue()
       
    80 
       
    81         # queue of outgoing, was-executed tasks
       
    82         self.out_queue = Queue.Queue();
       
    83 
       
    84 
       
    85     @contextlib.contextmanager
       
    86     def get_task (self) :
       
    87         """
       
    88             Intended to be used as the target of a 'with' statement to get a task from the queue and execute it.
       
    89         """
       
    90         
       
    91         # XXX: shutdown signal?
       
    92         task = self.in_queue.get()
       
    93         
       
    94         try :
       
    95             # process it
       
    96             yield task
       
    97         
       
    98         finally :
       
    99             # mark it as complete
       
   100             self.in_queue.task_done()
       
   101     
       
   102     def execute_task (self, task) :
       
   103         """
       
   104             Execute the given task, and put the result in our output queue
       
   105         """
       
   106         
       
   107         # internal execute method
       
   108         task._execute()
       
   109         
       
   110         # return value
       
   111         self.out_queue.put(task)
       
   112 
       
   113     def put_task (self, task) :
       
   114         """
       
   115             Add a task on to the queue.
       
   116         """
       
   117 
       
   118         self.in_queue.put(task)
       
   119 
       
   120     def get_result (self) :
       
   121         """
       
   122             Wait for a single result from this queue.
       
   123 
       
   124             XXX: nothing to garuntee that this won't block
       
   125         """
       
   126 
       
   127         return self.out_queue.get()
       
   128 
       
   129 class TaskThread (threading.Thread) :
       
   130     """
       
   131         A Thread that executes Tasks
       
   132     """
       
   133 
       
   134     def __init__ (self, queue) :
       
   135         """
       
   136             Setup thread and start waiting for requests
       
   137         """
       
   138         
       
   139         super(TaskThread, self).__init__()
       
   140 
       
   141         self.queue = queue
       
   142 
       
   143         # run
       
   144         self.start()
       
   145 
       
   146     def run (self) :
       
   147         """
       
   148             Thread main
       
   149         """
       
   150         
       
   151         # XXX: provide shutdown method
       
   152         while True :
       
   153             # get a task to execute
       
   154             with self.queue.get_task() as task :
       
   155                 # execute it
       
   156                 self.queue.execute_task(task)
       
   157 
       
   158 class TaskManager (object) :
       
   159     """
       
   160         Provides the machinery to take tasks and execute them
       
   161     """
       
   162 
       
   163     def __init__ (self, config) :
       
   164         """
       
   165             Initialize with given config
       
   166         """
       
   167         
       
   168         # setup queue
       
   169         self.queue = TaskQueue()
       
   170 
       
   171         # setup task threads
       
   172         self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)]
       
   173     
       
   174     def submit (self, task) :
       
   175         """
       
   176             Submit given task for execution.
       
   177 
       
   178             This garuntees that the task will eventually be returned via our queue.
       
   179         """
       
   180         
       
   181         if self.threads and task.threadable :
       
   182             # schedule for later execution by a thread
       
   183             self.queue.put_task(task)
       
   184         
       
   185         else :
       
   186             # execute directly
       
   187             self.queue.execute_task(task)
       
   188     
       
   189     def consume (self) :
       
   190         """
       
   191             Consume a single result from the task queue, either returning the return value, or raising the task's error.
       
   192         """
       
   193         
       
   194         # get one task
       
   195         task = self.queue.get_result()
       
   196 
       
   197         if task.exc_obj :
       
   198             # XXX: is this threadsafe?
       
   199             raise task.exc_obj
       
   200 
       
   201         else :
       
   202             # successful execution
       
   203             return task.retval
       
   204 
       
   205 
       
   206     def execute (self, tasks) :
       
   207         """
       
   208             Schedule a series of tasks for execution, and yield the return values as they become available.
       
   209 
       
   210             If a task raises an error, it will be re-raised.
       
   211 
       
   212             Waits for all the tasks to execute.
       
   213         """
       
   214 
       
   215         # number of tasks submitted; how many results we expect
       
   216         task_count = len(tasks)
       
   217 
       
   218         # submit them
       
   219         for task in tasks :
       
   220             self.submit(task)
       
   221 
       
   222         # yield results
       
   223         for count in xrange(task_count) :
       
   224             yield self.consume()
       
   225 
       
   226 # pretty aliases
       
   227 task = Task
       
   228 threadable_task = ThreadableTask
       
   229