buffer.py
changeset 0 b610bb7f8823
child 4 34d7897bd0f5
equal deleted inserted replaced
-1:000000000000 0:b610bb7f8823
       
     1 try:
       
     2     from cStringIO import StringIO
       
     3 except ImportError:
       
     4     from StringIO import StringIO
       
     5 import struct
       
     6 
       
     7 # prefixed to all struct format strings
       
     8 STRUCT_PREFIX = '!'
       
     9 
       
    10 def hex (bytes) :
       
    11     return ' '.join(['%#04x' % ord(b) for b in bytes])
       
    12     
       
    13 class NotEnoughDataError (Exception) : 
       
    14     pass
       
    15 
       
    16 class IStreamBase (object) :
       
    17     # prefixed to all struct format strings
       
    18     STRUCT_PREFIX = '!'
       
    19 
       
    20 class IReadStream (IStreamBase) :
       
    21     """
       
    22         IReadStream simply provides various interfaces for reading bytes from a
       
    23         stream in various ways
       
    24     """
       
    25 
       
    26     def read (self, size=None) :
       
    27         """
       
    28             Read and return up to the given amount of bytes, or all bytes
       
    29             available if no size given.
       
    30         """
       
    31 
       
    32         abstract
       
    33 
       
    34     def readStruct (self, fmt) :
       
    35         """
       
    36             Reads the correct amount of data and then unpacks it according to
       
    37             the given format. Note that this always returns a tuple, for single
       
    38             items, use readItem
       
    39         """
       
    40         
       
    41         fmt = self.STRUCT_PREFIX + fmt
       
    42 
       
    43         fmt_size = struct.calcsize(fmt)
       
    44         data = self.read(fmt_size)
       
    45         
       
    46         return struct.unpack(fmt, data)
       
    47 
       
    48     def readItem (self, fmt) :
       
    49         """
       
    50             Reads the correct amount of data, unpacks it according to the 
       
    51             given format, and then returns the first item.
       
    52         """
       
    53 
       
    54         return self.readStruct(fmt)[0]
       
    55 
       
    56     def readVarLen (self, len_type) :
       
    57         """
       
    58             Return the data part of a <length><data> structure.
       
    59             len_type indicates what type length has (struct format code).
       
    60 
       
    61             In the case of <length> being zero, returns an empty string.
       
    62         """
       
    63         
       
    64         size = self.readItem(len_type)
       
    65         
       
    66         if size :
       
    67             return self.read(size)
       
    68         else :
       
    69             return ""
       
    70 
       
    71     def readEnum (self, enum) :
       
    72         """
       
    73             Returns the item from the given list of enum values that corresponds
       
    74             to a single-byte value read from the stream
       
    75         """
       
    76 
       
    77         return enum[self.readItem('B')]
       
    78 
       
    79 class ISeekableStream (IStreamBase) :
       
    80     """
       
    81         Extends IStreamBase to provide the ability to seek backwards into the
       
    82         stream (which still does not know it's length, and thence cannot seek
       
    83         forwards).
       
    84     """
       
    85 
       
    86     _position = None
       
    87 
       
    88     def tell (self) :
       
    89         """
       
    90             Return the current offset into the seekable stream
       
    91         """
       
    92         
       
    93         abstract
       
    94 
       
    95     def seek (self, pos) :
       
    96         """
       
    97             Seek to the given position in the stream. 
       
    98         """
       
    99 
       
   100         abstract
       
   101 
       
   102     def mark (self) :
       
   103         """
       
   104             Set a mark that can be later rolled back to with .reset()
       
   105         """
       
   106         
       
   107         self._position = self.tell()
       
   108 
       
   109     def unmark (self) :
       
   110         """
       
   111             Remove the mark without affecting the current position
       
   112         """
       
   113         
       
   114         self._position = None
       
   115     
       
   116     def reset (self) :
       
   117         """
       
   118             Rolls the buffer back to the position set earlier with mark()
       
   119         """
       
   120         
       
   121         if self._position is not None :
       
   122             self.seek(self._position)
       
   123             self._position = None
       
   124 
       
   125         else :
       
   126             raise Exception("reset() without mark()")
       
   127 
       
   128 class ISeekableReadStream (ISeekableStream, IReadStream) :
       
   129     def peek (self, len=None) :
       
   130         """
       
   131             Return a string representing what buf.read(len) would return, but do
       
   132             not affect future read operations
       
   133         """
       
   134 
       
   135         pos = self.tell()
       
   136 
       
   137         data = self.read(len)
       
   138 
       
   139         self.seek(pos)
       
   140         
       
   141         return data
       
   142     
       
   143 
       
   144 class INonBlockingReadStream (IReadStream) :
       
   145     """
       
   146         Otherwise identical to IReadStream, but read will either return size
       
   147         bytes, or raise a NotEnoughDataError
       
   148     """
       
   149     
       
   150     pass
       
   151 
       
   152 class IWriteStream (IStreamBase) :
       
   153     """
       
   154         IWriteStream provides various ways to write data to a byte stream
       
   155     """
       
   156 
       
   157     def write (self, data) :
       
   158         """
       
   159             Write the given bytes to the stream
       
   160         """
       
   161 
       
   162         abstract
       
   163 
       
   164     def writeStruct (self, fmt, *args) :
       
   165         """
       
   166             Pack the given arguments with the given struct format, and write it
       
   167             to the stream.
       
   168         """
       
   169 
       
   170         self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args))
       
   171         
       
   172     def writeVarLen (self, len_type, data) :
       
   173         """
       
   174             Write a <length><data> field into the buffer. Len_type is the
       
   175             struct format code for the length field.
       
   176         """
       
   177 
       
   178         self.writeStruct(len_type, len(data))
       
   179         self.write(data)
       
   180 
       
   181     def writeEnum (self, enum, name) :
       
   182         """
       
   183             Write the single-byte value correspnding to the given name's
       
   184             position in the given enum
       
   185         """
       
   186 
       
   187         self.writeStruct('B', enum.index(name))
       
   188 
       
   189 class IBufferBase (ISeekableStream) :
       
   190     """
       
   191         A buffer simply provides a way to read and write data to/from a byte
       
   192         sequence stored in memory.
       
   193     """
       
   194 
       
   195     def tell (self) :
       
   196         return self._buf.tell()
       
   197     
       
   198     def seek (self, offset) :
       
   199         return self._buf.seek(offset)
       
   200 
       
   201     def getvalue (self) :
       
   202         """
       
   203             Returns the value of the buffer, i.e. a string with the contents of
       
   204             the buffer from position zero to the end.
       
   205         """
       
   206 
       
   207         return self._buf.getvalue()
       
   208 
       
   209 class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) :
       
   210     """
       
   211        A read-only buffer. Can be initialized with a given value and then later
       
   212        replaced in various ways, but cannot be modified.
       
   213     """
       
   214 
       
   215     def __init__ (self, data="") :
       
   216         """
       
   217             Initialize the buffer with the given data
       
   218         """
       
   219 
       
   220         self._buf = StringIO(data)
       
   221     
       
   222     def read (self, size=None) :
       
   223         """
       
   224             Return the given number of bytes, or raise a NotEnoughDataError
       
   225         """
       
   226 
       
   227         if size == 0 :
       
   228             raise ValueError("can't read zero bytes")
       
   229          
       
   230         if size :
       
   231             ret = self._buf.read(size)
       
   232         else :
       
   233             ret = self._buf.read()
       
   234 
       
   235         if size and len(ret) < size :
       
   236             raise NotEnoughDataError()
       
   237 
       
   238         return ret    
       
   239     
       
   240     def append (self, data) :
       
   241         """
       
   242             Modify the buffer such that it contains the old data from this
       
   243             buffer, and the given data at the end. The read position in the buffer
       
   244             is kept the same.
       
   245         """
       
   246 
       
   247         pos = self.tell()
       
   248 
       
   249         self._buf = StringIO(self._buf.getvalue() + data)
       
   250 
       
   251         self.seek(pos)
       
   252 
       
   253     def chop (self) :
       
   254         """
       
   255             Discard the data in the buffer before the current read position.
       
   256             Also removes any marks.
       
   257         """
       
   258 
       
   259         self._position = None
       
   260         
       
   261         self._buf = StringIO(self.read())
       
   262 
       
   263     def processWith (self, func) :
       
   264         """
       
   265             Call the given function with this buffer as an argument after
       
   266             calling mark(). If the function 
       
   267                 a) returns None, the buffer is .chop()'d, and we repeat the
       
   268                    process.
       
   269                 b) raises a NotEnoughDataError, whereupon the buffer is rolled
       
   270                    back to where it was before calling the function with 
       
   271                    chop().
       
   272                 c) raises a StopIteration, whereupon we chop the buffer and 
       
   273                    return.
       
   274                 d) returns something (i.e. ret is not None), whereupon we
       
   275                    return that (and leave the current buffer position untouched).
       
   276         """
       
   277         ret = None
       
   278         
       
   279         try :
       
   280             while ret is None :
       
   281                 self.mark()  # mark the position of the packet we are processing
       
   282                 ret = func(self)
       
   283 
       
   284                 if ret is None :
       
   285                     # discard the processed packet and proceed to the next one
       
   286                     self.chop()
       
   287                 
       
   288         except NotEnoughDataError, e :
       
   289             self.reset() # reset position back to the start of the packet
       
   290             return e
       
   291             
       
   292         except StopIteration, e:
       
   293             self.chop()
       
   294             return e # processed ok, but we don't want to process any further packets
       
   295             
       
   296         else :
       
   297             return ret
       
   298 
       
   299 class WriteBuffer (IWriteStream, IBufferBase) :
       
   300     """
       
   301         A write-only buffer. Data can be written to this buffer in various
       
   302         ways, but cannot be read from it except as a whole.
       
   303     """
       
   304 
       
   305     def __init__ (self) :
       
   306         """
       
   307             Initialize the buffer
       
   308         """
       
   309 
       
   310         self._buf = StringIO()
       
   311 
       
   312     def write (self, data) :
       
   313         """
       
   314             Write the given data to the current position in the stream,
       
   315             overwriting any previous data in that position, and extending
       
   316             the buffer if needed
       
   317         """
       
   318 
       
   319         return self._buf.write(data)
       
   320 
       
   321 def readStringStream (stream, varlen_type) :
       
   322     """
       
   323         Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string)
       
   324     """
       
   325 
       
   326     while True :
       
   327         item = stream.readVarLen(varlen_type)
       
   328 
       
   329         if item :
       
   330             yield item
       
   331         else :
       
   332             return
       
   333 
       
   334 def writeStringStream (stream, varlen_type, strings) :
       
   335     """
       
   336     """
       
   337 
       
   338 
       
   339 class StreamProtocol (object) :
       
   340     """
       
   341         A mixin to let you use Buffer with twisted.internet.protocol.Protocol
       
   342     """
       
   343     
       
   344     # a list of receivable command names 
       
   345     RECV_COMMANDS = None
       
   346 
       
   347     # a list of sendable command names
       
   348     SEND_COMMANDS = None
       
   349 
       
   350     def __init__ (self) :
       
   351         """
       
   352             Initialize the cross-dataReceived buffer
       
   353         """
       
   354 
       
   355         self.in_buffer = ReadBuffer()
       
   356 
       
   357     def send (self, buf) :
       
   358         """
       
   359             Write the contents of the given WriteBuffer to the transport
       
   360         """
       
   361 
       
   362         self.transport.write(buf.getvalue())
       
   363 
       
   364     def dataReceived (self, data) :
       
   365         """
       
   366             Buffer the incoming data and then try and process it
       
   367         """
       
   368 
       
   369         self.in_buffer.append(data)
       
   370         
       
   371         ret = self.in_buffer.processWith(self.processPacket)
       
   372         
       
   373     def processPacket (self, buf) :
       
   374         """
       
   375             Call processCommand with the buffer, handling the return value (either None or a deferred)
       
   376         """
       
   377 
       
   378         ret = self.processCommand(buf)
       
   379 
       
   380         if ret :
       
   381             ret.addCallback(self.send)
       
   382 
       
   383     def processCommand (self, buf) :
       
   384         """
       
   385             Process a command from the given buffer. May return a callback
       
   386         """
       
   387 
       
   388         return self.readMethod(buf, self.RECV_COMMANDS, buf)
       
   389 
       
   390     # conveniance read/write
       
   391     def startCommand (self, cmd) :
       
   392         buf = WriteBuffer()
       
   393         
       
   394         buf.writeEnum(self.SEND_COMMANDS, cmd)
       
   395 
       
   396         return buf
       
   397     
       
   398     def readMethod (self, buf, methods, *args, **kwargs) :
       
   399         """
       
   400             Reads a single-byte <methods>-enum value from the given buffer and
       
   401             use it to find the corresponding method (as <prefix>_<method-name>,
       
   402             prefix can be overriden with a keyword argument and defaults to
       
   403             'on'. If any extra arguments are given, they will be passed to the
       
   404             method.
       
   405         """
       
   406 
       
   407         prefix = kwargs.pop("prefix", "on")
       
   408 
       
   409         return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs)
       
   410