terom@0: try: terom@0: from cStringIO import StringIO terom@0: except ImportError: terom@0: from StringIO import StringIO terom@0: import struct terom@0: terom@0: # prefixed to all struct format strings terom@0: STRUCT_PREFIX = '!' terom@0: terom@0: def hex (bytes) : terom@0: return ' '.join(['%#04x' % ord(b) for b in bytes]) terom@0: terom@0: class NotEnoughDataError (Exception) : terom@0: pass terom@0: terom@0: class IStreamBase (object) : terom@0: # prefixed to all struct format strings terom@0: STRUCT_PREFIX = '!' terom@0: terom@0: class IReadStream (IStreamBase) : terom@0: """ terom@0: IReadStream simply provides various interfaces for reading bytes from a terom@0: stream in various ways terom@0: """ terom@0: terom@0: def read (self, size=None) : terom@0: """ terom@0: Read and return up to the given amount of bytes, or all bytes terom@0: available if no size given. terom@0: """ terom@0: terom@0: abstract terom@0: terom@0: def readStruct (self, fmt) : terom@0: """ terom@0: Reads the correct amount of data and then unpacks it according to terom@0: the given format. Note that this always returns a tuple, for single terom@0: items, use readItem terom@0: """ terom@0: terom@0: fmt = self.STRUCT_PREFIX + fmt terom@0: terom@0: fmt_size = struct.calcsize(fmt) terom@0: data = self.read(fmt_size) terom@0: terom@0: return struct.unpack(fmt, data) terom@0: terom@0: def readItem (self, fmt) : terom@0: """ terom@0: Reads the correct amount of data, unpacks it according to the terom@0: given format, and then returns the first item. terom@0: """ terom@0: terom@0: return self.readStruct(fmt)[0] terom@0: terom@0: def readVarLen (self, len_type) : terom@0: """ terom@0: Return the data part of a structure. terom@0: len_type indicates what type length has (struct format code). terom@0: terom@0: In the case of being zero, returns an empty string. terom@0: """ terom@0: terom@0: size = self.readItem(len_type) terom@0: terom@0: if size : terom@0: return self.read(size) terom@0: else : terom@0: return "" terom@0: terom@0: def readEnum (self, enum) : terom@0: """ terom@0: Returns the item from the given list of enum values that corresponds terom@0: to a single-byte value read from the stream terom@0: """ terom@0: terom@0: return enum[self.readItem('B')] terom@0: terom@0: class ISeekableStream (IStreamBase) : terom@0: """ terom@0: Extends IStreamBase to provide the ability to seek backwards into the terom@0: stream (which still does not know it's length, and thence cannot seek terom@0: forwards). terom@0: """ terom@0: terom@0: _position = None terom@0: terom@0: def tell (self) : terom@0: """ terom@0: Return the current offset into the seekable stream terom@0: """ terom@0: terom@0: abstract terom@0: terom@0: def seek (self, pos) : terom@0: """ terom@0: Seek to the given position in the stream. terom@0: """ terom@0: terom@0: abstract terom@0: terom@0: def mark (self) : terom@0: """ terom@0: Set a mark that can be later rolled back to with .reset() terom@0: """ terom@0: terom@0: self._position = self.tell() terom@0: terom@0: def unmark (self) : terom@0: """ terom@0: Remove the mark without affecting the current position terom@0: """ terom@0: terom@0: self._position = None terom@0: terom@0: def reset (self) : terom@0: """ terom@0: Rolls the buffer back to the position set earlier with mark() terom@0: """ terom@0: terom@0: if self._position is not None : terom@0: self.seek(self._position) terom@0: self._position = None terom@0: terom@0: else : terom@0: raise Exception("reset() without mark()") terom@0: terom@0: class ISeekableReadStream (ISeekableStream, IReadStream) : terom@0: def peek (self, len=None) : terom@0: """ terom@0: Return a string representing what buf.read(len) would return, but do terom@0: not affect future read operations terom@0: """ terom@0: terom@0: pos = self.tell() terom@0: terom@0: data = self.read(len) terom@0: terom@0: self.seek(pos) terom@0: terom@0: return data terom@0: terom@0: terom@0: class INonBlockingReadStream (IReadStream) : terom@0: """ terom@0: Otherwise identical to IReadStream, but read will either return size terom@0: bytes, or raise a NotEnoughDataError terom@0: """ terom@0: terom@0: pass terom@0: terom@0: class IWriteStream (IStreamBase) : terom@0: """ terom@0: IWriteStream provides various ways to write data to a byte stream terom@0: """ terom@0: terom@0: def write (self, data) : terom@0: """ terom@0: Write the given bytes to the stream terom@0: """ terom@0: terom@0: abstract terom@0: terom@0: def writeStruct (self, fmt, *args) : terom@0: """ terom@0: Pack the given arguments with the given struct format, and write it terom@0: to the stream. terom@0: """ terom@0: terom@0: self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) terom@0: terom@0: def writeVarLen (self, len_type, data) : terom@0: """ terom@0: Write a field into the buffer. Len_type is the terom@0: struct format code for the length field. terom@0: """ terom@0: terom@0: self.writeStruct(len_type, len(data)) terom@0: self.write(data) terom@0: terom@0: def writeEnum (self, enum, name) : terom@0: """ terom@0: Write the single-byte value correspnding to the given name's terom@0: position in the given enum terom@0: """ terom@0: terom@0: self.writeStruct('B', enum.index(name)) terom@0: terom@0: class IBufferBase (ISeekableStream) : terom@0: """ terom@0: A buffer simply provides a way to read and write data to/from a byte terom@0: sequence stored in memory. terom@0: """ terom@0: terom@0: def tell (self) : terom@0: return self._buf.tell() terom@0: terom@0: def seek (self, offset) : terom@0: return self._buf.seek(offset) terom@0: terom@0: def getvalue (self) : terom@0: """ terom@0: Returns the value of the buffer, i.e. a string with the contents of terom@0: the buffer from position zero to the end. terom@0: """ terom@0: terom@0: return self._buf.getvalue() terom@0: terom@0: class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : terom@0: """ terom@0: A read-only buffer. Can be initialized with a given value and then later terom@0: replaced in various ways, but cannot be modified. terom@0: """ terom@0: terom@0: def __init__ (self, data="") : terom@0: """ terom@0: Initialize the buffer with the given data terom@0: """ terom@0: terom@0: self._buf = StringIO(data) terom@0: terom@0: def read (self, size=None) : terom@0: """ terom@0: Return the given number of bytes, or raise a NotEnoughDataError terom@0: """ terom@0: terom@0: if size == 0 : terom@0: raise ValueError("can't read zero bytes") terom@0: terom@0: if size : terom@0: ret = self._buf.read(size) terom@0: else : terom@0: ret = self._buf.read() terom@0: terom@0: if size and len(ret) < size : terom@0: raise NotEnoughDataError() terom@0: terom@0: return ret terom@0: terom@0: def append (self, data) : terom@0: """ terom@0: Modify the buffer such that it contains the old data from this terom@0: buffer, and the given data at the end. The read position in the buffer terom@0: is kept the same. terom@0: """ terom@0: terom@0: pos = self.tell() terom@0: terom@0: self._buf = StringIO(self._buf.getvalue() + data) terom@0: terom@0: self.seek(pos) terom@0: terom@0: def chop (self) : terom@0: """ terom@0: Discard the data in the buffer before the current read position. terom@0: Also removes any marks. terom@0: """ terom@0: terom@0: self._position = None terom@0: terom@0: self._buf = StringIO(self.read()) terom@0: terom@0: def processWith (self, func) : terom@0: """ terom@0: Call the given function with this buffer as an argument after terom@0: calling mark(). If the function terom@0: a) returns None, the buffer is .chop()'d, and we repeat the terom@0: process. terom@0: b) raises a NotEnoughDataError, whereupon the buffer is rolled terom@0: back to where it was before calling the function with terom@0: chop(). terom@0: c) raises a StopIteration, whereupon we chop the buffer and terom@0: return. terom@0: d) returns something (i.e. ret is not None), whereupon we terom@0: return that (and leave the current buffer position untouched). terom@0: """ terom@0: ret = None terom@0: terom@0: try : terom@0: while ret is None : terom@0: self.mark() # mark the position of the packet we are processing terom@0: ret = func(self) terom@0: terom@0: if ret is None : terom@0: # discard the processed packet and proceed to the next one terom@0: self.chop() terom@0: terom@0: except NotEnoughDataError, e : terom@0: self.reset() # reset position back to the start of the packet terom@0: return e terom@0: terom@0: except StopIteration, e: terom@0: self.chop() terom@0: return e # processed ok, but we don't want to process any further packets terom@0: terom@0: else : terom@0: return ret terom@0: terom@0: class WriteBuffer (IWriteStream, IBufferBase) : terom@0: """ terom@0: A write-only buffer. Data can be written to this buffer in various terom@0: ways, but cannot be read from it except as a whole. terom@0: """ terom@0: terom@0: def __init__ (self) : terom@0: """ terom@0: Initialize the buffer terom@0: """ terom@0: terom@0: self._buf = StringIO() terom@0: terom@0: def write (self, data) : terom@0: """ terom@0: Write the given data to the current position in the stream, terom@0: overwriting any previous data in that position, and extending terom@0: the buffer if needed terom@0: """ terom@0: terom@0: return self._buf.write(data) terom@0: terom@0: def readStringStream (stream, varlen_type) : terom@0: """ terom@0: Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) terom@0: """ terom@0: terom@0: while True : terom@0: item = stream.readVarLen(varlen_type) terom@0: terom@0: if item : terom@0: yield item terom@0: else : terom@0: return terom@0: terom@0: def writeStringStream (stream, varlen_type, strings) : terom@0: """ terom@0: """ terom@0: terom@0: terom@0: class StreamProtocol (object) : terom@0: """ terom@0: A mixin to let you use Buffer with twisted.internet.protocol.Protocol terom@0: """ terom@0: terom@0: # a list of receivable command names terom@0: RECV_COMMANDS = None terom@0: terom@0: # a list of sendable command names terom@0: SEND_COMMANDS = None terom@0: terom@0: def __init__ (self) : terom@0: """ terom@0: Initialize the cross-dataReceived buffer terom@0: """ terom@0: terom@0: self.in_buffer = ReadBuffer() terom@0: terom@0: def send (self, buf) : terom@0: """ terom@0: Write the contents of the given WriteBuffer to the transport terom@0: """ terom@0: terom@0: self.transport.write(buf.getvalue()) terom@0: terom@0: def dataReceived (self, data) : terom@0: """ terom@0: Buffer the incoming data and then try and process it terom@0: """ terom@0: terom@0: self.in_buffer.append(data) terom@0: terom@0: ret = self.in_buffer.processWith(self.processPacket) terom@0: terom@0: def processPacket (self, buf) : terom@0: """ terom@0: Call processCommand with the buffer, handling the return value (either None or a deferred) terom@0: """ terom@0: terom@0: ret = self.processCommand(buf) terom@0: terom@0: if ret : terom@0: ret.addCallback(self.send) terom@0: terom@0: def processCommand (self, buf) : terom@0: """ terom@0: Process a command from the given buffer. May return a callback terom@0: """ terom@0: terom@0: return self.readMethod(buf, self.RECV_COMMANDS, buf) terom@0: terom@0: # conveniance read/write terom@0: def startCommand (self, cmd) : terom@0: buf = WriteBuffer() terom@0: terom@0: buf.writeEnum(self.SEND_COMMANDS, cmd) terom@0: terom@0: return buf terom@0: terom@0: def readMethod (self, buf, methods, *args, **kwargs) : terom@0: """ terom@0: Reads a single-byte -enum value from the given buffer and terom@0: use it to find the corresponding method (as _, terom@0: prefix can be overriden with a keyword argument and defaults to terom@0: 'on'. If any extra arguments are given, they will be passed to the terom@0: method. terom@0: """ terom@0: terom@0: prefix = kwargs.pop("prefix", "on") terom@0: terom@0: return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) terom@0: