terom@5: try: terom@5: from cStringIO import StringIO terom@5: except ImportError: terom@5: from StringIO import StringIO terom@5: import struct terom@5: terom@5: # prefixed to all struct format strings terom@5: STRUCT_PREFIX = '!' terom@5: terom@5: def hex (bytes) : terom@5: return ' '.join(['%#04x' % ord(b) for b in bytes]) terom@5: terom@5: class NotEnoughDataError (Exception) : terom@5: pass terom@5: terom@5: class IStreamBase (object) : terom@5: # prefixed to all struct format strings terom@5: STRUCT_PREFIX = '!' terom@5: terom@5: class IReadStream (IStreamBase) : terom@5: """ terom@5: IReadStream simply provides various interfaces for reading bytes from a terom@5: stream in various ways terom@5: """ terom@5: terom@5: def read (self, size=None) : terom@5: """ terom@5: Read and return up to the given amount of bytes, or all bytes terom@5: available if no size given. terom@5: """ terom@5: terom@5: abstract terom@5: terom@5: def readStruct (self, fmt) : terom@5: """ terom@5: Reads the correct amount of data and then unpacks it according to terom@5: the given format. Note that this always returns a tuple, for single terom@5: items, use readItem terom@5: """ terom@5: terom@5: fmt = self.STRUCT_PREFIX + fmt terom@5: terom@5: fmt_size = struct.calcsize(fmt) terom@5: data = self.read(fmt_size) terom@5: terom@5: return struct.unpack(fmt, data) terom@5: terom@5: def readItem (self, fmt) : terom@5: """ terom@5: Reads the correct amount of data, unpacks it according to the terom@5: given format, and then returns the first item. terom@5: """ terom@5: terom@5: return self.readStruct(fmt)[0] terom@5: terom@5: def readVarLen (self, len_type) : terom@5: """ terom@5: Return the data part of a structure. terom@5: len_type indicates what type length has (struct format code). terom@5: terom@5: In the case of being zero, returns an empty string. terom@5: """ terom@5: terom@5: size = self.readItem(len_type) terom@5: terom@5: if size : terom@5: return self.read(size) terom@5: else : terom@5: return "" terom@5: terom@5: def readEnum (self, enum) : terom@5: """ terom@5: Returns the item from the given list of enum values that corresponds terom@5: to a single-byte value read from the stream terom@5: """ terom@5: terom@5: return enum[self.readItem('B')] terom@5: terom@5: class ISeekableStream (IStreamBase) : terom@5: """ terom@5: Extends IStreamBase to provide the ability to seek backwards into the terom@5: stream (which still does not know it's length, and thence cannot seek terom@5: forwards). terom@5: """ terom@5: terom@5: _position = None terom@5: terom@5: def tell (self) : terom@5: """ terom@5: Return the current offset into the seekable stream terom@5: """ terom@5: terom@5: abstract terom@5: terom@5: def seek (self, pos) : terom@5: """ terom@5: Seek to the given position in the stream. terom@5: """ terom@5: terom@5: abstract terom@5: terom@5: def mark (self) : terom@5: """ terom@5: Set a mark that can be later rolled back to with .reset() terom@5: """ terom@5: terom@5: self._position = self.tell() terom@5: terom@5: def unmark (self) : terom@5: """ terom@5: Remove the mark without affecting the current position terom@5: """ terom@5: terom@5: self._position = None terom@5: terom@5: def reset (self) : terom@5: """ terom@5: Rolls the buffer back to the position set earlier with mark() terom@5: """ terom@5: terom@5: if self._position is not None : terom@5: self.seek(self._position) terom@5: self._position = None terom@5: terom@5: else : terom@5: raise Exception("reset() without mark()") terom@5: terom@5: class ISeekableReadStream (ISeekableStream, IReadStream) : terom@5: def peek (self, len=None) : terom@5: """ terom@5: Return a string representing what buf.read(len) would return, but do terom@5: not affect future read operations terom@5: """ terom@5: terom@5: pos = self.tell() terom@5: terom@5: data = self.read(len) terom@5: terom@5: self.seek(pos) terom@5: terom@5: return data terom@5: terom@5: terom@5: class INonBlockingReadStream (IReadStream) : terom@5: """ terom@5: Otherwise identical to IReadStream, but read will either return size terom@5: bytes, or raise a NotEnoughDataError terom@5: """ terom@5: terom@5: pass terom@5: terom@5: class IWriteStream (IStreamBase) : terom@5: """ terom@5: IWriteStream provides various ways to write data to a byte stream terom@5: """ terom@5: terom@5: def write (self, data) : terom@5: """ terom@5: Write the given bytes to the stream terom@5: """ terom@5: terom@5: abstract terom@5: terom@5: def writeStruct (self, fmt, *args) : terom@5: """ terom@5: Pack the given arguments with the given struct format, and write it terom@5: to the stream. terom@5: """ terom@5: terom@5: self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) terom@5: terom@5: writeItem = writeStruct terom@5: terom@5: def writeVarLen (self, len_type, data) : terom@5: """ terom@5: Write a field into the buffer. Len_type is the terom@5: struct format code for the length field. terom@5: """ terom@5: terom@5: self.writeStruct(len_type, len(data)) terom@5: self.write(data) terom@5: terom@5: def writeEnum (self, enum, name) : terom@5: """ terom@5: Write the single-byte value correspnding to the given name's terom@5: position in the given enum terom@5: """ terom@5: terom@5: self.writeStruct('B', enum.index(name)) terom@5: terom@5: class IBufferBase (ISeekableStream) : terom@5: """ terom@5: A buffer simply provides a way to read and write data to/from a byte terom@5: sequence stored in memory. terom@5: """ terom@5: terom@5: def tell (self) : terom@5: return self._buf.tell() terom@5: terom@5: def seek (self, offset) : terom@5: return self._buf.seek(offset) terom@5: terom@5: def getvalue (self) : terom@5: """ terom@5: Returns the value of the buffer, i.e. a string with the contents of terom@5: the buffer from position zero to the end. terom@5: """ terom@5: terom@5: return self._buf.getvalue() terom@5: terom@5: class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : terom@5: """ terom@5: A read-only buffer. Can be initialized with a given value and then later terom@5: replaced in various ways, but cannot be modified. terom@5: """ terom@5: terom@5: def __init__ (self, data="") : terom@5: """ terom@5: Initialize the buffer with the given data terom@5: """ terom@5: terom@5: self._buf = StringIO(data) terom@5: terom@5: def read (self, size=None) : terom@5: """ terom@5: Return the given number of bytes, or raise a NotEnoughDataError terom@5: """ terom@5: terom@5: if size == 0 : terom@5: raise ValueError("can't read zero bytes") terom@5: terom@5: if size : terom@5: ret = self._buf.read(size) terom@5: else : terom@5: ret = self._buf.read() terom@5: terom@5: if size and len(ret) < size : terom@5: raise NotEnoughDataError() terom@5: terom@5: return ret terom@5: terom@5: def append (self, data) : terom@5: """ terom@5: Modify the buffer such that it contains the old data from this terom@5: buffer, and the given data at the end. The read position in the buffer terom@5: is kept the same. terom@5: """ terom@5: terom@5: pos = self.tell() terom@5: terom@5: self._buf = StringIO(self._buf.getvalue() + data) terom@5: terom@5: self.seek(pos) terom@5: terom@5: def chop (self) : terom@5: """ terom@5: Discard the data in the buffer before the current read position. terom@5: Also removes any marks. terom@5: """ terom@5: terom@5: self._position = None terom@5: terom@5: self._buf = StringIO(self.read()) terom@5: terom@5: def processWith (self, func) : terom@5: """ terom@5: Call the given function with this buffer as an argument after terom@5: calling mark(). If the function terom@5: a) returns None, the buffer is .chop()'d, and we repeat the terom@5: process. terom@5: b) raises a NotEnoughDataError, whereupon the buffer is rolled terom@5: back to where it was before calling the function with terom@5: chop(). terom@5: c) raises a StopIteration, whereupon we chop the buffer and terom@5: return. terom@5: d) returns something (i.e. ret is not None), whereupon we terom@5: return that (and leave the current buffer position untouched). terom@5: """ terom@5: ret = None terom@5: terom@5: try : terom@5: while ret is None : terom@5: self.mark() # mark the position of the packet we are processing terom@5: ret = func(self) terom@5: terom@5: if ret is None : terom@5: # discard the processed packet and proceed to the next one terom@5: self.chop() terom@5: terom@5: except NotEnoughDataError, e : terom@5: self.reset() # reset position back to the start of the packet terom@5: return e terom@5: terom@5: except StopIteration, e: terom@5: self.chop() terom@5: return e # processed ok, but we don't want to process any further packets terom@5: terom@5: else : terom@5: return ret terom@5: terom@5: class WriteBuffer (IWriteStream, IBufferBase) : terom@5: """ terom@5: A write-only buffer. Data can be written to this buffer in various terom@5: ways, but cannot be read from it except as a whole. terom@5: """ terom@5: terom@5: def __init__ (self) : terom@5: """ terom@5: Initialize the buffer terom@5: """ terom@5: terom@5: self._buf = StringIO() terom@5: terom@5: def write (self, data) : terom@5: """ terom@5: Write the given data to the current position in the stream, terom@5: overwriting any previous data in that position, and extending terom@5: the buffer if needed terom@5: """ terom@5: terom@5: return self._buf.write(data) terom@5: terom@5: def readStringStream (stream, varlen_type) : terom@5: """ terom@5: Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) terom@5: """ terom@5: terom@5: while True : terom@5: item = stream.readVarLen(varlen_type) terom@5: terom@5: if item : terom@5: yield item terom@5: else : terom@5: return terom@5: terom@5: def writeStringStream (stream, varlen_type, strings) : terom@5: """ terom@5: Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token terom@5: """ terom@5: terom@5: for item in strings : terom@5: stream.writeVarLen(varlen_type, item) terom@5: terom@5: stream.writeItem(varlen_type, 0) terom@5: terom@5: class StreamProtocol (object) : terom@5: """ terom@5: A mixin to let you use Buffer with twisted.internet.protocol.Protocol terom@5: """ terom@5: terom@5: # a list of receivable command names terom@5: RECV_COMMANDS = None terom@5: terom@5: # a list of sendable command names terom@5: SEND_COMMANDS = None terom@5: terom@5: def __init__ (self) : terom@5: """ terom@5: Initialize the cross-dataReceived buffer terom@5: """ terom@5: terom@5: self.in_buffer = ReadBuffer() terom@5: terom@5: def send (self, buf) : terom@5: """ terom@5: Write the contents of the given WriteBuffer to the transport terom@5: """ terom@5: terom@5: self.transport.write(buf.getvalue()) terom@5: terom@5: def dataReceived (self, data) : terom@5: """ terom@5: Buffer the incoming data and then try and process it terom@5: """ terom@5: terom@5: self.in_buffer.append(data) terom@5: terom@5: ret = self.in_buffer.processWith(self.processPacket) terom@5: terom@5: def processPacket (self, buf) : terom@5: """ terom@5: Call processCommand with the buffer, handling the return value (either None or a deferred) terom@5: """ terom@5: terom@5: ret = self.processCommand(buf) terom@5: terom@5: if ret : terom@5: ret.addCallback(self.send) terom@5: terom@5: def processCommand (self, buf) : terom@5: """ terom@5: Process a command from the given buffer. May return a callback terom@5: """ terom@5: terom@5: return self.readMethod(buf, self.RECV_COMMANDS, buf) terom@5: terom@5: # conveniance read/write terom@5: def startCommand (self, cmd) : terom@5: buf = WriteBuffer() terom@5: terom@5: buf.writeEnum(self.SEND_COMMANDS, cmd) terom@5: terom@5: return buf terom@5: terom@5: def readMethod (self, buf, methods, *args, **kwargs) : terom@5: """ terom@5: Reads a single-byte -enum value from the given buffer and terom@5: use it to find the corresponding method (as _, terom@5: prefix can be overriden with a keyword argument and defaults to terom@5: 'on'. If any extra arguments are given, they will be passed to the terom@5: method. terom@5: """ terom@5: terom@5: prefix = kwargs.pop("prefix", "on") terom@5: terom@5: return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) terom@5: