1 try: |
|
2 from cStringIO import StringIO |
|
3 except ImportError: |
|
4 from StringIO import StringIO |
|
5 import struct |
|
6 |
|
7 # prefixed to all struct format strings |
|
8 STRUCT_PREFIX = '!' |
|
9 |
|
10 def hex (bytes) : |
|
11 return ' '.join(['%#04x' % ord(b) for b in bytes]) |
|
12 |
|
13 class NotEnoughDataError (Exception) : |
|
14 pass |
|
15 |
|
16 class IStreamBase (object) : |
|
17 # prefixed to all struct format strings |
|
18 STRUCT_PREFIX = '!' |
|
19 |
|
20 class IReadStream (IStreamBase) : |
|
21 """ |
|
22 IReadStream simply provides various interfaces for reading bytes from a |
|
23 stream in various ways |
|
24 """ |
|
25 |
|
26 def read (self, size=None) : |
|
27 """ |
|
28 Read and return up to the given amount of bytes, or all bytes |
|
29 available if no size given. |
|
30 """ |
|
31 |
|
32 abstract |
|
33 |
|
34 def readStruct (self, fmt) : |
|
35 """ |
|
36 Reads the correct amount of data and then unpacks it according to |
|
37 the given format. Note that this always returns a tuple, for single |
|
38 items, use readItem |
|
39 """ |
|
40 |
|
41 fmt = self.STRUCT_PREFIX + fmt |
|
42 |
|
43 fmt_size = struct.calcsize(fmt) |
|
44 data = self.read(fmt_size) |
|
45 |
|
46 return struct.unpack(fmt, data) |
|
47 |
|
48 def readItem (self, fmt) : |
|
49 """ |
|
50 Reads the correct amount of data, unpacks it according to the |
|
51 given format, and then returns the first item. |
|
52 """ |
|
53 |
|
54 return self.readStruct(fmt)[0] |
|
55 |
|
56 def readVarLen (self, len_type) : |
|
57 """ |
|
58 Return the data part of a <length><data> structure. |
|
59 len_type indicates what type length has (struct format code). |
|
60 |
|
61 In the case of <length> being zero, returns an empty string. |
|
62 """ |
|
63 |
|
64 size = self.readItem(len_type) |
|
65 |
|
66 if size : |
|
67 return self.read(size) |
|
68 else : |
|
69 return "" |
|
70 |
|
71 def readEnum (self, enum) : |
|
72 """ |
|
73 Returns the item from the given list of enum values that corresponds |
|
74 to a single-byte value read from the stream |
|
75 """ |
|
76 |
|
77 return enum[self.readItem('B')] |
|
78 |
|
79 class ISeekableStream (IStreamBase) : |
|
80 """ |
|
81 Extends IStreamBase to provide the ability to seek backwards into the |
|
82 stream (which still does not know it's length, and thence cannot seek |
|
83 forwards). |
|
84 """ |
|
85 |
|
86 _position = None |
|
87 |
|
88 def tell (self) : |
|
89 """ |
|
90 Return the current offset into the seekable stream |
|
91 """ |
|
92 |
|
93 abstract |
|
94 |
|
95 def seek (self, pos) : |
|
96 """ |
|
97 Seek to the given position in the stream. |
|
98 """ |
|
99 |
|
100 abstract |
|
101 |
|
102 def mark (self) : |
|
103 """ |
|
104 Set a mark that can be later rolled back to with .reset() |
|
105 """ |
|
106 |
|
107 self._position = self.tell() |
|
108 |
|
109 def unmark (self) : |
|
110 """ |
|
111 Remove the mark without affecting the current position |
|
112 """ |
|
113 |
|
114 self._position = None |
|
115 |
|
116 def reset (self) : |
|
117 """ |
|
118 Rolls the buffer back to the position set earlier with mark() |
|
119 """ |
|
120 |
|
121 if self._position is not None : |
|
122 self.seek(self._position) |
|
123 self._position = None |
|
124 |
|
125 else : |
|
126 raise Exception("reset() without mark()") |
|
127 |
|
128 class ISeekableReadStream (ISeekableStream, IReadStream) : |
|
129 def peek (self, len=None) : |
|
130 """ |
|
131 Return a string representing what buf.read(len) would return, but do |
|
132 not affect future read operations |
|
133 """ |
|
134 |
|
135 pos = self.tell() |
|
136 |
|
137 data = self.read(len) |
|
138 |
|
139 self.seek(pos) |
|
140 |
|
141 return data |
|
142 |
|
143 |
|
144 class INonBlockingReadStream (IReadStream) : |
|
145 """ |
|
146 Otherwise identical to IReadStream, but read will either return size |
|
147 bytes, or raise a NotEnoughDataError |
|
148 """ |
|
149 |
|
150 pass |
|
151 |
|
152 class IWriteStream (IStreamBase) : |
|
153 """ |
|
154 IWriteStream provides various ways to write data to a byte stream |
|
155 """ |
|
156 |
|
157 def write (self, data) : |
|
158 """ |
|
159 Write the given bytes to the stream |
|
160 """ |
|
161 |
|
162 abstract |
|
163 |
|
164 def writeStruct (self, fmt, *args) : |
|
165 """ |
|
166 Pack the given arguments with the given struct format, and write it |
|
167 to the stream. |
|
168 """ |
|
169 |
|
170 self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) |
|
171 |
|
172 writeItem = writeStruct |
|
173 |
|
174 def writeVarLen (self, len_type, data) : |
|
175 """ |
|
176 Write a <length><data> field into the buffer. Len_type is the |
|
177 struct format code for the length field. |
|
178 """ |
|
179 |
|
180 self.writeStruct(len_type, len(data)) |
|
181 self.write(data) |
|
182 |
|
183 def writeEnum (self, enum, name) : |
|
184 """ |
|
185 Write the single-byte value correspnding to the given name's |
|
186 position in the given enum |
|
187 """ |
|
188 |
|
189 self.writeStruct('B', enum.index(name)) |
|
190 |
|
191 class IBufferBase (ISeekableStream) : |
|
192 """ |
|
193 A buffer simply provides a way to read and write data to/from a byte |
|
194 sequence stored in memory. |
|
195 """ |
|
196 |
|
197 def tell (self) : |
|
198 return self._buf.tell() |
|
199 |
|
200 def seek (self, offset) : |
|
201 return self._buf.seek(offset) |
|
202 |
|
203 def getvalue (self) : |
|
204 """ |
|
205 Returns the value of the buffer, i.e. a string with the contents of |
|
206 the buffer from position zero to the end. |
|
207 """ |
|
208 |
|
209 return self._buf.getvalue() |
|
210 |
|
211 class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : |
|
212 """ |
|
213 A read-only buffer. Can be initialized with a given value and then later |
|
214 replaced in various ways, but cannot be modified. |
|
215 """ |
|
216 |
|
217 def __init__ (self, data="") : |
|
218 """ |
|
219 Initialize the buffer with the given data |
|
220 """ |
|
221 |
|
222 self._buf = StringIO(data) |
|
223 |
|
224 def read (self, size=None) : |
|
225 """ |
|
226 Return the given number of bytes, or raise a NotEnoughDataError |
|
227 """ |
|
228 |
|
229 if size == 0 : |
|
230 raise ValueError("can't read zero bytes") |
|
231 |
|
232 if size : |
|
233 ret = self._buf.read(size) |
|
234 else : |
|
235 ret = self._buf.read() |
|
236 |
|
237 if size and len(ret) < size : |
|
238 raise NotEnoughDataError() |
|
239 |
|
240 return ret |
|
241 |
|
242 def append (self, data) : |
|
243 """ |
|
244 Modify the buffer such that it contains the old data from this |
|
245 buffer, and the given data at the end. The read position in the buffer |
|
246 is kept the same. |
|
247 """ |
|
248 |
|
249 pos = self.tell() |
|
250 |
|
251 self._buf = StringIO(self._buf.getvalue() + data) |
|
252 |
|
253 self.seek(pos) |
|
254 |
|
255 def chop (self) : |
|
256 """ |
|
257 Discard the data in the buffer before the current read position. |
|
258 Also removes any marks. |
|
259 """ |
|
260 |
|
261 self._position = None |
|
262 |
|
263 self._buf = StringIO(self.read()) |
|
264 |
|
265 def processWith (self, func) : |
|
266 """ |
|
267 Call the given function with this buffer as an argument after |
|
268 calling mark(). If the function |
|
269 a) returns None, the buffer is .chop()'d, and we repeat the |
|
270 process. |
|
271 b) raises a NotEnoughDataError, whereupon the buffer is rolled |
|
272 back to where it was before calling the function with |
|
273 chop(). |
|
274 c) raises a StopIteration, whereupon we chop the buffer and |
|
275 return. |
|
276 d) returns something (i.e. ret is not None), whereupon we |
|
277 return that (and leave the current buffer position untouched). |
|
278 """ |
|
279 ret = None |
|
280 |
|
281 try : |
|
282 while ret is None : |
|
283 self.mark() # mark the position of the packet we are processing |
|
284 ret = func(self) |
|
285 |
|
286 if ret is None : |
|
287 # discard the processed packet and proceed to the next one |
|
288 self.chop() |
|
289 |
|
290 except NotEnoughDataError, e : |
|
291 self.reset() # reset position back to the start of the packet |
|
292 return e |
|
293 |
|
294 except StopIteration, e: |
|
295 self.chop() |
|
296 return e # processed ok, but we don't want to process any further packets |
|
297 |
|
298 else : |
|
299 return ret |
|
300 |
|
301 class WriteBuffer (IWriteStream, IBufferBase) : |
|
302 """ |
|
303 A write-only buffer. Data can be written to this buffer in various |
|
304 ways, but cannot be read from it except as a whole. |
|
305 """ |
|
306 |
|
307 def __init__ (self) : |
|
308 """ |
|
309 Initialize the buffer |
|
310 """ |
|
311 |
|
312 self._buf = StringIO() |
|
313 |
|
314 def write (self, data) : |
|
315 """ |
|
316 Write the given data to the current position in the stream, |
|
317 overwriting any previous data in that position, and extending |
|
318 the buffer if needed |
|
319 """ |
|
320 |
|
321 return self._buf.write(data) |
|
322 |
|
323 def readStringStream (stream, varlen_type) : |
|
324 """ |
|
325 Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) |
|
326 """ |
|
327 |
|
328 while True : |
|
329 item = stream.readVarLen(varlen_type) |
|
330 |
|
331 if item : |
|
332 yield item |
|
333 else : |
|
334 return |
|
335 |
|
336 def writeStringStream (stream, varlen_type, strings) : |
|
337 """ |
|
338 Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token |
|
339 """ |
|
340 |
|
341 for item in strings : |
|
342 stream.writeVarLen(varlen_type, item) |
|
343 |
|
344 stream.writeItem(varlen_type, 0) |
|
345 |
|
346 class StreamProtocol (object) : |
|
347 """ |
|
348 A mixin to let you use Buffer with twisted.internet.protocol.Protocol |
|
349 """ |
|
350 |
|
351 # a list of receivable command names |
|
352 RECV_COMMANDS = None |
|
353 |
|
354 # a list of sendable command names |
|
355 SEND_COMMANDS = None |
|
356 |
|
357 def __init__ (self) : |
|
358 """ |
|
359 Initialize the cross-dataReceived buffer |
|
360 """ |
|
361 |
|
362 self.in_buffer = ReadBuffer() |
|
363 |
|
364 def send (self, buf) : |
|
365 """ |
|
366 Write the contents of the given WriteBuffer to the transport |
|
367 """ |
|
368 |
|
369 self.transport.write(buf.getvalue()) |
|
370 |
|
371 def dataReceived (self, data) : |
|
372 """ |
|
373 Buffer the incoming data and then try and process it |
|
374 """ |
|
375 |
|
376 self.in_buffer.append(data) |
|
377 |
|
378 ret = self.in_buffer.processWith(self.processPacket) |
|
379 |
|
380 def processPacket (self, buf) : |
|
381 """ |
|
382 Call processCommand with the buffer, handling the return value (either None or a deferred) |
|
383 """ |
|
384 |
|
385 ret = self.processCommand(buf) |
|
386 |
|
387 if ret : |
|
388 ret.addCallback(self.send) |
|
389 |
|
390 def processCommand (self, buf) : |
|
391 """ |
|
392 Process a command from the given buffer. May return a callback |
|
393 """ |
|
394 |
|
395 return self.readMethod(buf, self.RECV_COMMANDS, buf) |
|
396 |
|
397 # conveniance read/write |
|
398 def startCommand (self, cmd) : |
|
399 buf = WriteBuffer() |
|
400 |
|
401 buf.writeEnum(self.SEND_COMMANDS, cmd) |
|
402 |
|
403 return buf |
|
404 |
|
405 def readMethod (self, buf, methods, *args, **kwargs) : |
|
406 """ |
|
407 Reads a single-byte <methods>-enum value from the given buffer and |
|
408 use it to find the corresponding method (as <prefix>_<method-name>, |
|
409 prefix can be overriden with a keyword argument and defaults to |
|
410 'on'. If any extra arguments are given, they will be passed to the |
|
411 method. |
|
412 """ |
|
413 |
|
414 prefix = kwargs.pop("prefix", "on") |
|
415 |
|
416 return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) |
|
417 |
|