# HG changeset patch # User Tero Marttila # Date 1206031678 -7200 # Node ID 8e7493df9f521fea855b8c5a5fb8a11da09b3b8d # Parent 34d7897bd0f53f7517e0d592ca3e0081ded777c5 dos2unix on buffer.py committer: Tero Marttila diff -r 34d7897bd0f5 -r 8e7493df9f52 buffer.py --- a/buffer.py Thu Mar 20 18:40:45 2008 +0200 +++ b/buffer.py Thu Mar 20 18:47:58 2008 +0200 @@ -1,417 +1,417 @@ -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO -import struct - -# prefixed to all struct format strings -STRUCT_PREFIX = '!' - -def hex (bytes) : - return ' '.join(['%#04x' % ord(b) for b in bytes]) - -class NotEnoughDataError (Exception) : - pass - -class IStreamBase (object) : - # prefixed to all struct format strings - STRUCT_PREFIX = '!' - -class IReadStream (IStreamBase) : - """ - IReadStream simply provides various interfaces for reading bytes from a - stream in various ways - """ - - def read (self, size=None) : - """ - Read and return up to the given amount of bytes, or all bytes - available if no size given. - """ - - abstract - - def readStruct (self, fmt) : - """ - Reads the correct amount of data and then unpacks it according to - the given format. Note that this always returns a tuple, for single - items, use readItem - """ - - fmt = self.STRUCT_PREFIX + fmt - - fmt_size = struct.calcsize(fmt) - data = self.read(fmt_size) - - return struct.unpack(fmt, data) - - def readItem (self, fmt) : - """ - Reads the correct amount of data, unpacks it according to the - given format, and then returns the first item. - """ - - return self.readStruct(fmt)[0] - - def readVarLen (self, len_type) : - """ - Return the data part of a structure. - len_type indicates what type length has (struct format code). - - In the case of being zero, returns an empty string. - """ - - size = self.readItem(len_type) - - if size : - return self.read(size) - else : - return "" - - def readEnum (self, enum) : - """ - Returns the item from the given list of enum values that corresponds - to a single-byte value read from the stream - """ - - return enum[self.readItem('B')] - -class ISeekableStream (IStreamBase) : - """ - Extends IStreamBase to provide the ability to seek backwards into the - stream (which still does not know it's length, and thence cannot seek - forwards). - """ - - _position = None - - def tell (self) : - """ - Return the current offset into the seekable stream - """ - - abstract - - def seek (self, pos) : - """ - Seek to the given position in the stream. - """ - - abstract - - def mark (self) : - """ - Set a mark that can be later rolled back to with .reset() - """ - - self._position = self.tell() - - def unmark (self) : - """ - Remove the mark without affecting the current position - """ - - self._position = None - - def reset (self) : - """ - Rolls the buffer back to the position set earlier with mark() - """ - - if self._position is not None : - self.seek(self._position) - self._position = None - - else : - raise Exception("reset() without mark()") - -class ISeekableReadStream (ISeekableStream, IReadStream) : - def peek (self, len=None) : - """ - Return a string representing what buf.read(len) would return, but do - not affect future read operations - """ - - pos = self.tell() - - data = self.read(len) - - self.seek(pos) - - return data - - -class INonBlockingReadStream (IReadStream) : - """ - Otherwise identical to IReadStream, but read will either return size - bytes, or raise a NotEnoughDataError - """ - - pass - -class IWriteStream (IStreamBase) : - """ - IWriteStream provides various ways to write data to a byte stream - """ - - def write (self, data) : - """ - Write the given bytes to the stream - """ - - abstract - - def writeStruct (self, fmt, *args) : - """ - Pack the given arguments with the given struct format, and write it - to the stream. - """ - - self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) - - writeItem = writeStruct - - def writeVarLen (self, len_type, data) : - """ - Write a field into the buffer. Len_type is the - struct format code for the length field. - """ - - self.writeStruct(len_type, len(data)) - self.write(data) - - def writeEnum (self, enum, name) : - """ - Write the single-byte value correspnding to the given name's - position in the given enum - """ - - self.writeStruct('B', enum.index(name)) - -class IBufferBase (ISeekableStream) : - """ - A buffer simply provides a way to read and write data to/from a byte - sequence stored in memory. - """ - - def tell (self) : - return self._buf.tell() - - def seek (self, offset) : - return self._buf.seek(offset) - - def getvalue (self) : - """ - Returns the value of the buffer, i.e. a string with the contents of - the buffer from position zero to the end. - """ - - return self._buf.getvalue() - -class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : - """ - A read-only buffer. Can be initialized with a given value and then later - replaced in various ways, but cannot be modified. - """ - - def __init__ (self, data="") : - """ - Initialize the buffer with the given data - """ - - self._buf = StringIO(data) - - def read (self, size=None) : - """ - Return the given number of bytes, or raise a NotEnoughDataError - """ - - if size == 0 : - raise ValueError("can't read zero bytes") - - if size : - ret = self._buf.read(size) - else : - ret = self._buf.read() - - if size and len(ret) < size : - raise NotEnoughDataError() - - return ret - - def append (self, data) : - """ - Modify the buffer such that it contains the old data from this - buffer, and the given data at the end. The read position in the buffer - is kept the same. - """ - - pos = self.tell() - - self._buf = StringIO(self._buf.getvalue() + data) - - self.seek(pos) - - def chop (self) : - """ - Discard the data in the buffer before the current read position. - Also removes any marks. - """ - - self._position = None - - self._buf = StringIO(self.read()) - - def processWith (self, func) : - """ - Call the given function with this buffer as an argument after - calling mark(). If the function - a) returns None, the buffer is .chop()'d, and we repeat the - process. - b) raises a NotEnoughDataError, whereupon the buffer is rolled - back to where it was before calling the function with - chop(). - c) raises a StopIteration, whereupon we chop the buffer and - return. - d) returns something (i.e. ret is not None), whereupon we - return that (and leave the current buffer position untouched). - """ - ret = None - - try : - while ret is None : - self.mark() # mark the position of the packet we are processing - ret = func(self) - - if ret is None : - # discard the processed packet and proceed to the next one - self.chop() - - except NotEnoughDataError, e : - self.reset() # reset position back to the start of the packet - return e - - except StopIteration, e: - self.chop() - return e # processed ok, but we don't want to process any further packets - - else : - return ret - -class WriteBuffer (IWriteStream, IBufferBase) : - """ - A write-only buffer. Data can be written to this buffer in various - ways, but cannot be read from it except as a whole. - """ - - def __init__ (self) : - """ - Initialize the buffer - """ - - self._buf = StringIO() - - def write (self, data) : - """ - Write the given data to the current position in the stream, - overwriting any previous data in that position, and extending - the buffer if needed - """ - - return self._buf.write(data) - -def readStringStream (stream, varlen_type) : - """ - Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) - """ - - while True : - item = stream.readVarLen(varlen_type) - - if item : - yield item - else : - return - -def writeStringStream (stream, varlen_type, strings) : - """ - Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token - """ - - for item in strings : - stream.writeVarLen(varlen_type, item) - - stream.writeItem(varlen_type, 0) - -class StreamProtocol (object) : - """ - A mixin to let you use Buffer with twisted.internet.protocol.Protocol - """ - - # a list of receivable command names - RECV_COMMANDS = None - - # a list of sendable command names - SEND_COMMANDS = None - - def __init__ (self) : - """ - Initialize the cross-dataReceived buffer - """ - - self.in_buffer = ReadBuffer() - - def send (self, buf) : - """ - Write the contents of the given WriteBuffer to the transport - """ - - self.transport.write(buf.getvalue()) - - def dataReceived (self, data) : - """ - Buffer the incoming data and then try and process it - """ - - self.in_buffer.append(data) - - ret = self.in_buffer.processWith(self.processPacket) - - def processPacket (self, buf) : - """ - Call processCommand with the buffer, handling the return value (either None or a deferred) - """ - - ret = self.processCommand(buf) - - if ret : - ret.addCallback(self.send) - - def processCommand (self, buf) : - """ - Process a command from the given buffer. May return a callback - """ - - return self.readMethod(buf, self.RECV_COMMANDS, buf) - - # conveniance read/write - def startCommand (self, cmd) : - buf = WriteBuffer() - - buf.writeEnum(self.SEND_COMMANDS, cmd) - - return buf - - def readMethod (self, buf, methods, *args, **kwargs) : - """ - Reads a single-byte -enum value from the given buffer and - use it to find the corresponding method (as _, - prefix can be overriden with a keyword argument and defaults to - 'on'. If any extra arguments are given, they will be passed to the - method. - """ - - prefix = kwargs.pop("prefix", "on") - - return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) - +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO +import struct + +# prefixed to all struct format strings +STRUCT_PREFIX = '!' + +def hex (bytes) : + return ' '.join(['%#04x' % ord(b) for b in bytes]) + +class NotEnoughDataError (Exception) : + pass + +class IStreamBase (object) : + # prefixed to all struct format strings + STRUCT_PREFIX = '!' + +class IReadStream (IStreamBase) : + """ + IReadStream simply provides various interfaces for reading bytes from a + stream in various ways + """ + + def read (self, size=None) : + """ + Read and return up to the given amount of bytes, or all bytes + available if no size given. + """ + + abstract + + def readStruct (self, fmt) : + """ + Reads the correct amount of data and then unpacks it according to + the given format. Note that this always returns a tuple, for single + items, use readItem + """ + + fmt = self.STRUCT_PREFIX + fmt + + fmt_size = struct.calcsize(fmt) + data = self.read(fmt_size) + + return struct.unpack(fmt, data) + + def readItem (self, fmt) : + """ + Reads the correct amount of data, unpacks it according to the + given format, and then returns the first item. + """ + + return self.readStruct(fmt)[0] + + def readVarLen (self, len_type) : + """ + Return the data part of a structure. + len_type indicates what type length has (struct format code). + + In the case of being zero, returns an empty string. + """ + + size = self.readItem(len_type) + + if size : + return self.read(size) + else : + return "" + + def readEnum (self, enum) : + """ + Returns the item from the given list of enum values that corresponds + to a single-byte value read from the stream + """ + + return enum[self.readItem('B')] + +class ISeekableStream (IStreamBase) : + """ + Extends IStreamBase to provide the ability to seek backwards into the + stream (which still does not know it's length, and thence cannot seek + forwards). + """ + + _position = None + + def tell (self) : + """ + Return the current offset into the seekable stream + """ + + abstract + + def seek (self, pos) : + """ + Seek to the given position in the stream. + """ + + abstract + + def mark (self) : + """ + Set a mark that can be later rolled back to with .reset() + """ + + self._position = self.tell() + + def unmark (self) : + """ + Remove the mark without affecting the current position + """ + + self._position = None + + def reset (self) : + """ + Rolls the buffer back to the position set earlier with mark() + """ + + if self._position is not None : + self.seek(self._position) + self._position = None + + else : + raise Exception("reset() without mark()") + +class ISeekableReadStream (ISeekableStream, IReadStream) : + def peek (self, len=None) : + """ + Return a string representing what buf.read(len) would return, but do + not affect future read operations + """ + + pos = self.tell() + + data = self.read(len) + + self.seek(pos) + + return data + + +class INonBlockingReadStream (IReadStream) : + """ + Otherwise identical to IReadStream, but read will either return size + bytes, or raise a NotEnoughDataError + """ + + pass + +class IWriteStream (IStreamBase) : + """ + IWriteStream provides various ways to write data to a byte stream + """ + + def write (self, data) : + """ + Write the given bytes to the stream + """ + + abstract + + def writeStruct (self, fmt, *args) : + """ + Pack the given arguments with the given struct format, and write it + to the stream. + """ + + self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) + + writeItem = writeStruct + + def writeVarLen (self, len_type, data) : + """ + Write a field into the buffer. Len_type is the + struct format code for the length field. + """ + + self.writeStruct(len_type, len(data)) + self.write(data) + + def writeEnum (self, enum, name) : + """ + Write the single-byte value correspnding to the given name's + position in the given enum + """ + + self.writeStruct('B', enum.index(name)) + +class IBufferBase (ISeekableStream) : + """ + A buffer simply provides a way to read and write data to/from a byte + sequence stored in memory. + """ + + def tell (self) : + return self._buf.tell() + + def seek (self, offset) : + return self._buf.seek(offset) + + def getvalue (self) : + """ + Returns the value of the buffer, i.e. a string with the contents of + the buffer from position zero to the end. + """ + + return self._buf.getvalue() + +class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : + """ + A read-only buffer. Can be initialized with a given value and then later + replaced in various ways, but cannot be modified. + """ + + def __init__ (self, data="") : + """ + Initialize the buffer with the given data + """ + + self._buf = StringIO(data) + + def read (self, size=None) : + """ + Return the given number of bytes, or raise a NotEnoughDataError + """ + + if size == 0 : + raise ValueError("can't read zero bytes") + + if size : + ret = self._buf.read(size) + else : + ret = self._buf.read() + + if size and len(ret) < size : + raise NotEnoughDataError() + + return ret + + def append (self, data) : + """ + Modify the buffer such that it contains the old data from this + buffer, and the given data at the end. The read position in the buffer + is kept the same. + """ + + pos = self.tell() + + self._buf = StringIO(self._buf.getvalue() + data) + + self.seek(pos) + + def chop (self) : + """ + Discard the data in the buffer before the current read position. + Also removes any marks. + """ + + self._position = None + + self._buf = StringIO(self.read()) + + def processWith (self, func) : + """ + Call the given function with this buffer as an argument after + calling mark(). If the function + a) returns None, the buffer is .chop()'d, and we repeat the + process. + b) raises a NotEnoughDataError, whereupon the buffer is rolled + back to where it was before calling the function with + chop(). + c) raises a StopIteration, whereupon we chop the buffer and + return. + d) returns something (i.e. ret is not None), whereupon we + return that (and leave the current buffer position untouched). + """ + ret = None + + try : + while ret is None : + self.mark() # mark the position of the packet we are processing + ret = func(self) + + if ret is None : + # discard the processed packet and proceed to the next one + self.chop() + + except NotEnoughDataError, e : + self.reset() # reset position back to the start of the packet + return e + + except StopIteration, e: + self.chop() + return e # processed ok, but we don't want to process any further packets + + else : + return ret + +class WriteBuffer (IWriteStream, IBufferBase) : + """ + A write-only buffer. Data can be written to this buffer in various + ways, but cannot be read from it except as a whole. + """ + + def __init__ (self) : + """ + Initialize the buffer + """ + + self._buf = StringIO() + + def write (self, data) : + """ + Write the given data to the current position in the stream, + overwriting any previous data in that position, and extending + the buffer if needed + """ + + return self._buf.write(data) + +def readStringStream (stream, varlen_type) : + """ + Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) + """ + + while True : + item = stream.readVarLen(varlen_type) + + if item : + yield item + else : + return + +def writeStringStream (stream, varlen_type, strings) : + """ + Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token + """ + + for item in strings : + stream.writeVarLen(varlen_type, item) + + stream.writeItem(varlen_type, 0) + +class StreamProtocol (object) : + """ + A mixin to let you use Buffer with twisted.internet.protocol.Protocol + """ + + # a list of receivable command names + RECV_COMMANDS = None + + # a list of sendable command names + SEND_COMMANDS = None + + def __init__ (self) : + """ + Initialize the cross-dataReceived buffer + """ + + self.in_buffer = ReadBuffer() + + def send (self, buf) : + """ + Write the contents of the given WriteBuffer to the transport + """ + + self.transport.write(buf.getvalue()) + + def dataReceived (self, data) : + """ + Buffer the incoming data and then try and process it + """ + + self.in_buffer.append(data) + + ret = self.in_buffer.processWith(self.processPacket) + + def processPacket (self, buf) : + """ + Call processCommand with the buffer, handling the return value (either None or a deferred) + """ + + ret = self.processCommand(buf) + + if ret : + ret.addCallback(self.send) + + def processCommand (self, buf) : + """ + Process a command from the given buffer. May return a callback + """ + + return self.readMethod(buf, self.RECV_COMMANDS, buf) + + # conveniance read/write + def startCommand (self, cmd) : + buf = WriteBuffer() + + buf.writeEnum(self.SEND_COMMANDS, cmd) + + return buf + + def readMethod (self, buf, methods, *args, **kwargs) : + """ + Reads a single-byte -enum value from the given buffer and + use it to find the corresponding method (as _, + prefix can be overriden with a keyword argument and defaults to + 'on'. If any extra arguments are given, they will be passed to the + method. + """ + + prefix = kwargs.pop("prefix", "on") + + return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) +