|
1 """ |
|
2 Parallel execution of work, using threads. |
|
3 |
|
4 The work to be done is submit'd in the form of Tasks (which may or may not be `threadable`) to a TaskManager, |
|
5 which then takes care of actually executing the tasks, either directly, or by queueing them up in a TaskQueue for |
|
6 execution by a TaskThread. |
|
7 |
|
8 Example: |
|
9 |
|
10 for retval in tasks.execute(threadable_task(foo_func, item) for item in items) : |
|
11 print retval |
|
12 """ |
|
13 |
|
14 from __future__ import with_statement |
|
15 |
|
16 import threading, Queue |
|
17 import contextlib |
|
18 |
|
19 class Task (object) : |
|
20 """ |
|
21 A task describes some work to do |
|
22 """ |
|
23 |
|
24 # this task can be executed in a Python thread of its own |
|
25 threadable = False |
|
26 |
|
27 def __init__ (self, func, *args, **kwargs) : |
|
28 """ |
|
29 Set this Task up to execute the given function/arguments |
|
30 """ |
|
31 |
|
32 self.func = func |
|
33 self.args = args |
|
34 self.kwargs = kwargs |
|
35 self.retval = self.exc_obj = None |
|
36 |
|
37 def _execute (self) : |
|
38 """ |
|
39 Low-level task execution function. Calls `execute` and stores the result/error |
|
40 """ |
|
41 |
|
42 try : |
|
43 # attempt to store normal retval |
|
44 self.retval = self.execute() |
|
45 |
|
46 except Exception, exc_obj : |
|
47 # trap all errors |
|
48 self.exc_obj = exc_obj |
|
49 |
|
50 def execute (self) : |
|
51 """ |
|
52 Execute the task and return any results. |
|
53 |
|
54 If the task is threadable, then this method must not access any external state! |
|
55 |
|
56 By default, this will execute and return the function given to __init__ |
|
57 """ |
|
58 |
|
59 return self.func(*self.args, **self.kwargs) |
|
60 |
|
61 class ThreadableTask (Task) : |
|
62 """ |
|
63 A task that is marked as threadable by default |
|
64 """ |
|
65 |
|
66 threadable = True |
|
67 |
|
68 class TaskQueue (object) : |
|
69 """ |
|
70 A threadsafe queue of waiting tasks, keeping track of executing tasks. |
|
71 """ |
|
72 |
|
73 def __init__ (self) : |
|
74 """ |
|
75 Setup |
|
76 """ |
|
77 |
|
78 # queue of incoming, to-be-executed tasks |
|
79 self.in_queue = Queue.Queue() |
|
80 |
|
81 # queue of outgoing, was-executed tasks |
|
82 self.out_queue = Queue.Queue(); |
|
83 |
|
84 |
|
85 @contextlib.contextmanager |
|
86 def get_task (self) : |
|
87 """ |
|
88 Intended to be used as the target of a 'with' statement to get a task from the queue and execute it. |
|
89 """ |
|
90 |
|
91 # XXX: shutdown signal? |
|
92 task = self.in_queue.get() |
|
93 |
|
94 try : |
|
95 # process it |
|
96 yield task |
|
97 |
|
98 finally : |
|
99 # mark it as complete |
|
100 self.in_queue.task_done() |
|
101 |
|
102 def execute_task (self, task) : |
|
103 """ |
|
104 Execute the given task, and put the result in our output queue |
|
105 """ |
|
106 |
|
107 # internal execute method |
|
108 task._execute() |
|
109 |
|
110 # return value |
|
111 self.out_queue.put(task) |
|
112 |
|
113 def put_task (self, task) : |
|
114 """ |
|
115 Add a task on to the queue. |
|
116 """ |
|
117 |
|
118 self.in_queue.put(task) |
|
119 |
|
120 def get_result (self) : |
|
121 """ |
|
122 Wait for a single result from this queue. |
|
123 |
|
124 XXX: nothing to garuntee that this won't block |
|
125 """ |
|
126 |
|
127 return self.out_queue.get() |
|
128 |
|
129 class TaskThread (threading.Thread) : |
|
130 """ |
|
131 A Thread that executes Tasks |
|
132 """ |
|
133 |
|
134 def __init__ (self, queue) : |
|
135 """ |
|
136 Setup thread and start waiting for requests |
|
137 """ |
|
138 |
|
139 super(TaskThread, self).__init__() |
|
140 |
|
141 self.queue = queue |
|
142 |
|
143 # run |
|
144 self.start() |
|
145 |
|
146 def run (self) : |
|
147 """ |
|
148 Thread main |
|
149 """ |
|
150 |
|
151 # XXX: provide shutdown method |
|
152 while True : |
|
153 # get a task to execute |
|
154 with self.queue.get_task() as task : |
|
155 # execute it |
|
156 self.queue.execute_task(task) |
|
157 |
|
158 class TaskManager (object) : |
|
159 """ |
|
160 Provides the machinery to take tasks and execute them |
|
161 """ |
|
162 |
|
163 def __init__ (self, config) : |
|
164 """ |
|
165 Initialize with given config |
|
166 """ |
|
167 |
|
168 # setup queue |
|
169 self.queue = TaskQueue() |
|
170 |
|
171 # setup task threads |
|
172 self.threads = [TaskThread(self.queue) for count in xrange(config.task_threads)] |
|
173 |
|
174 def submit (self, task) : |
|
175 """ |
|
176 Submit given task for execution. |
|
177 |
|
178 This garuntees that the task will eventually be returned via our queue. |
|
179 """ |
|
180 |
|
181 if self.threads and task.threadable : |
|
182 # schedule for later execution by a thread |
|
183 self.queue.put_task(task) |
|
184 |
|
185 else : |
|
186 # execute directly |
|
187 self.queue.execute_task(task) |
|
188 |
|
189 def consume (self) : |
|
190 """ |
|
191 Consume a single result from the task queue, either returning the return value, or raising the task's error. |
|
192 """ |
|
193 |
|
194 # get one task |
|
195 task = self.queue.get_result() |
|
196 |
|
197 if task.exc_obj : |
|
198 # XXX: is this threadsafe? |
|
199 raise task.exc_obj |
|
200 |
|
201 else : |
|
202 # successful execution |
|
203 return task.retval |
|
204 |
|
205 |
|
206 def execute (self, tasks) : |
|
207 """ |
|
208 Schedule a series of tasks for execution, and yield the return values as they become available. |
|
209 |
|
210 If a task raises an error, it will be re-raised. |
|
211 |
|
212 Waits for all the tasks to execute. |
|
213 """ |
|
214 |
|
215 # number of tasks submitted; how many results we expect |
|
216 task_count = len(tasks) |
|
217 |
|
218 # submit them |
|
219 for task in tasks : |
|
220 self.submit(task) |
|
221 |
|
222 # yield results |
|
223 for count in xrange(task_count) : |
|
224 yield self.consume() |
|
225 |
|
226 # pretty aliases |
|
227 task = Task |
|
228 threadable_task = ThreadableTask |
|
229 |