|
1 """ |
|
2 Concurrent execution |
|
3 """ |
|
4 |
|
5 from __future__ import with_statement |
|
6 |
|
7 import sys, threading, collections, traceback |
|
8 |
|
9 class Task (object) : |
|
10 """ |
|
11 Something to execute |
|
12 """ |
|
13 |
|
14 def __init__ (self, func, *args, **kwargs) : |
|
15 """ |
|
16 Initialize to run the given function with arguments. |
|
17 """ |
|
18 |
|
19 self.func = func |
|
20 self.args = args |
|
21 self.kwargs = kwargs |
|
22 |
|
23 def execute (self) : |
|
24 """ |
|
25 Execute this task normally, returning something or raising an error. |
|
26 """ |
|
27 |
|
28 return self.func(*self.args, **self.kwargs) |
|
29 |
|
30 def execute_result (self) : |
|
31 """ |
|
32 Run this task, returning a Result. This should not raise any errors |
|
33 """ |
|
34 |
|
35 try : |
|
36 return Result(self.execute()) |
|
37 |
|
38 except : |
|
39 return Failure() |
|
40 |
|
41 class Result (object) : |
|
42 """ |
|
43 The result from executing a task |
|
44 """ |
|
45 |
|
46 def __init__ (self, res) : |
|
47 """ Store the result """ |
|
48 |
|
49 self.res = res |
|
50 |
|
51 def evaluate (self) : |
|
52 """ Returns the result value """ |
|
53 |
|
54 return self.res |
|
55 |
|
56 class Failure (object) : |
|
57 """ |
|
58 A failed result, causing an exception |
|
59 """ |
|
60 |
|
61 def __init__ (self, exc_info=None) : |
|
62 """ Store the given execption info, or use current exception if not given """ |
|
63 |
|
64 if exc_info : |
|
65 self.exc_info = exc_info |
|
66 |
|
67 else : |
|
68 self.exc_info = sys.exc_info() |
|
69 |
|
70 def evaluate (self) : |
|
71 """ re-Raises the exception info """ |
|
72 |
|
73 # unpack |
|
74 type, value, tb = self.exc_info |
|
75 |
|
76 # re-raise |
|
77 raise type, value, tb |
|
78 |
|
79 class Queue (object) : |
|
80 """ |
|
81 A thread-safe Queue of waiting tasks, and their results. |
|
82 """ |
|
83 |
|
84 def __init__ (self) : |
|
85 """ |
|
86 Setup our queues and locks. |
|
87 """ |
|
88 |
|
89 # global lock |
|
90 self.lock = threading.Lock() |
|
91 |
|
92 # queue of waiting tasks with wait lock |
|
93 self.task_queue = collections.deque() |
|
94 self.task_cond = threading.Condition(self.lock) |
|
95 |
|
96 # count of executing tasks |
|
97 self.exec_count = 0 |
|
98 |
|
99 # queue of results |
|
100 self.result_queue = collections.deque() |
|
101 self.result_cond = threading.Condition(self.lock) |
|
102 |
|
103 def put_task (self, task) : |
|
104 """ Enqueue a task for async execution """ |
|
105 |
|
106 with self.lock : |
|
107 # push it |
|
108 self.task_queue.append(task) |
|
109 |
|
110 # notify waiting |
|
111 self.task_cond.notify() |
|
112 |
|
113 def put_tasks (self, tasks) : |
|
114 """ Enqueue a number of tasks for async execution """ |
|
115 |
|
116 with self.lock : |
|
117 # extend them |
|
118 self.task_queue.extend(tasks) |
|
119 |
|
120 # notify multiple waiting |
|
121 self.task_cond.notifyAll() |
|
122 |
|
123 def _get_task (self) : |
|
124 """ Dequeue a task, incrementing exec_count """ |
|
125 |
|
126 with self.lock : |
|
127 # wait for a task to become available |
|
128 while not self.task_queue : |
|
129 self.task_cond.wait() |
|
130 |
|
131 # count |
|
132 self.exec_count += 1 |
|
133 |
|
134 # get |
|
135 task = self.task_queue.popleft() |
|
136 |
|
137 return task |
|
138 |
|
139 def _put_result (self, result) : |
|
140 """ Enqueue a return value, decrementing exec_count """ |
|
141 |
|
142 with self.lock : |
|
143 # push it |
|
144 self.result_queue.append(result) |
|
145 |
|
146 # count |
|
147 self.exec_count -= 1 |
|
148 |
|
149 # notify waiting |
|
150 self.result_cond.notify() |
|
151 |
|
152 def exec_task (self, task_func) : |
|
153 """ |
|
154 Get a task, and execute it using the given Executor, and put a result, handling errors. |
|
155 |
|
156 Maintains the task_exec_count |
|
157 """ |
|
158 |
|
159 # get it |
|
160 task = self._get_task() |
|
161 |
|
162 try : |
|
163 # execute it |
|
164 result = task_func(task) |
|
165 |
|
166 except : |
|
167 # internal error |
|
168 # XXX: something better to return? Maybe just dec exec_count, but don't return anything? |
|
169 result = None |
|
170 |
|
171 # kill off the thread |
|
172 raise |
|
173 |
|
174 finally : |
|
175 # put the result |
|
176 self._put_result(result) |
|
177 |
|
178 def drain (self) : |
|
179 """ Return Results from this queue until all tasks have been executed and all results returned """ |
|
180 |
|
181 # yield/wait until empty |
|
182 while True : |
|
183 with self.lock : |
|
184 # inspect current state |
|
185 if self.result_queue : |
|
186 # we have a result we can return |
|
187 yield self.result_queue.popleft() |
|
188 |
|
189 elif self.task_queue or self.exec_count : |
|
190 # tasks are still pending |
|
191 self.result_cond.wait() |
|
192 |
|
193 else : |
|
194 # no more tasks left |
|
195 return |
|
196 |
|
197 class Thread (threading.Thread) : |
|
198 """ |
|
199 A concurrent-execution thread, which dequeues tasks, executes them, and returns the results. |
|
200 """ |
|
201 |
|
202 def __init__ (self, queue) : |
|
203 """ Create the thread and start running """ |
|
204 |
|
205 # init as a daemon thread |
|
206 super(Thread, self).__init__() |
|
207 self.daemon = True |
|
208 |
|
209 # store |
|
210 self.queue = queue |
|
211 |
|
212 # run |
|
213 self.start() |
|
214 |
|
215 def run_task (self) : |
|
216 """ Get and handle a single task """ |
|
217 |
|
218 # let the queue handle safe-execution on the task using the Task.execute_result method directly |
|
219 self.queue.exec_task(Task.execute_result) |
|
220 |
|
221 def run (self) : |
|
222 """ Handle tasks from our queue """ |
|
223 |
|
224 try : |
|
225 # XXX: something to quit for? |
|
226 while True : |
|
227 self.run_task() |
|
228 |
|
229 except : |
|
230 from sys import stderr |
|
231 |
|
232 print >>stderr, "%s:" % self |
|
233 |
|
234 # can't do much more... |
|
235 traceback.print_exc(file=stderr) |
|
236 |
|
237 class Manager (object) : |
|
238 """ |
|
239 Manages execution of tasks |
|
240 """ |
|
241 |
|
242 def __init__ (self, thread_count=0) : |
|
243 """ |
|
244 Initialize our pool of executors |
|
245 """ |
|
246 |
|
247 # our queue |
|
248 self.queue = Queue() |
|
249 |
|
250 # thread pool |
|
251 self.threads = [Thread(self.queue) for count in xrange(thread_count)] |
|
252 |
|
253 def execute_threaded (self, tasks) : |
|
254 """ |
|
255 Execute the given tasks using our thread pool |
|
256 """ |
|
257 |
|
258 # schedule them all |
|
259 self.queue.put_tasks(tasks) |
|
260 |
|
261 # then drain results |
|
262 for result in self.queue.drain() : |
|
263 # this will either return the value or raise the error |
|
264 yield result.evaluate() |
|
265 |
|
266 def execute_directly (self, tasks) : |
|
267 """ |
|
268 Fallback to execute the given tasks without any concurrency. |
|
269 """ |
|
270 |
|
271 for task in tasks : |
|
272 yield task.execute() |
|
273 |
|
274 def execute (self, tasks) : |
|
275 """ |
|
276 Execute the given tasks concurrently. |
|
277 |
|
278 This will yield the result of each task, and return once all tasks have completed. |
|
279 """ |
|
280 |
|
281 if self.threads : |
|
282 # yay |
|
283 return self.execute_threaded(tasks) |
|
284 |
|
285 else : |
|
286 # fallback |
|
287 return self.execute_directly(tasks) |
|
288 # aliases |
|
289 task = Task |
|
290 |