fixbot/buffer.py
changeset 67 00907acd732a
parent 66 eb0545ec03e7
parent 64 8574aeff9b36
equal deleted inserted replaced
66:eb0545ec03e7 67:00907acd732a
     1 try:
       
     2     from cStringIO import StringIO
       
     3 except ImportError:
       
     4     from StringIO import StringIO
       
     5 import struct
       
     6 
       
     7 # prefixed to all struct format strings
       
     8 STRUCT_PREFIX = '!'
       
     9 
       
    10 def hex (bytes) :
       
    11     return ' '.join(['%#04x' % ord(b) for b in bytes])
       
    12     
       
    13 class NotEnoughDataError (Exception) : 
       
    14     pass
       
    15 
       
    16 class IStreamBase (object) :
       
    17     # prefixed to all struct format strings
       
    18     STRUCT_PREFIX = '!'
       
    19 
       
    20 class IReadStream (IStreamBase) :
       
    21     """
       
    22         IReadStream simply provides various interfaces for reading bytes from a
       
    23         stream in various ways
       
    24     """
       
    25 
       
    26     def read (self, size=None) :
       
    27         """
       
    28             Read and return up to the given amount of bytes, or all bytes
       
    29             available if no size given.
       
    30         """
       
    31 
       
    32         abstract
       
    33 
       
    34     def readStruct (self, fmt) :
       
    35         """
       
    36             Reads the correct amount of data and then unpacks it according to
       
    37             the given format. Note that this always returns a tuple, for single
       
    38             items, use readItem
       
    39         """
       
    40         
       
    41         fmt = self.STRUCT_PREFIX + fmt
       
    42 
       
    43         fmt_size = struct.calcsize(fmt)
       
    44         data = self.read(fmt_size)
       
    45         
       
    46         return struct.unpack(fmt, data)
       
    47 
       
    48     def readItem (self, fmt) :
       
    49         """
       
    50             Reads the correct amount of data, unpacks it according to the 
       
    51             given format, and then returns the first item.
       
    52         """
       
    53 
       
    54         return self.readStruct(fmt)[0]
       
    55 
       
    56     def readVarLen (self, len_type) :
       
    57         """
       
    58             Return the data part of a <length><data> structure.
       
    59             len_type indicates what type length has (struct format code).
       
    60 
       
    61             In the case of <length> being zero, returns an empty string.
       
    62         """
       
    63         
       
    64         size = self.readItem(len_type)
       
    65         
       
    66         if size :
       
    67             return self.read(size)
       
    68         else :
       
    69             return ""
       
    70 
       
    71     def readEnum (self, enum) :
       
    72         """
       
    73             Returns the item from the given list of enum values that corresponds
       
    74             to a single-byte value read from the stream
       
    75         """
       
    76 
       
    77         return enum[self.readItem('B')]
       
    78 
       
    79 class ISeekableStream (IStreamBase) :
       
    80     """
       
    81         Extends IStreamBase to provide the ability to seek backwards into the
       
    82         stream (which still does not know it's length, and thence cannot seek
       
    83         forwards).
       
    84     """
       
    85 
       
    86     _position = None
       
    87 
       
    88     def tell (self) :
       
    89         """
       
    90             Return the current offset into the seekable stream
       
    91         """
       
    92         
       
    93         abstract
       
    94 
       
    95     def seek (self, pos) :
       
    96         """
       
    97             Seek to the given position in the stream. 
       
    98         """
       
    99 
       
   100         abstract
       
   101 
       
   102     def mark (self) :
       
   103         """
       
   104             Set a mark that can be later rolled back to with .reset()
       
   105         """
       
   106         
       
   107         self._position = self.tell()
       
   108 
       
   109     def unmark (self) :
       
   110         """
       
   111             Remove the mark without affecting the current position
       
   112         """
       
   113         
       
   114         self._position = None
       
   115     
       
   116     def reset (self) :
       
   117         """
       
   118             Rolls the buffer back to the position set earlier with mark()
       
   119         """
       
   120         
       
   121         if self._position is not None :
       
   122             self.seek(self._position)
       
   123             self._position = None
       
   124 
       
   125         else :
       
   126             raise Exception("reset() without mark()")
       
   127 
       
   128 class ISeekableReadStream (ISeekableStream, IReadStream) :
       
   129     def peek (self, len=None) :
       
   130         """
       
   131             Return a string representing what buf.read(len) would return, but do
       
   132             not affect future read operations
       
   133         """
       
   134 
       
   135         pos = self.tell()
       
   136 
       
   137         data = self.read(len)
       
   138 
       
   139         self.seek(pos)
       
   140         
       
   141         return data
       
   142     
       
   143 
       
   144 class INonBlockingReadStream (IReadStream) :
       
   145     """
       
   146         Otherwise identical to IReadStream, but read will either return size
       
   147         bytes, or raise a NotEnoughDataError
       
   148     """
       
   149     
       
   150     pass
       
   151 
       
   152 class IWriteStream (IStreamBase) :
       
   153     """
       
   154         IWriteStream provides various ways to write data to a byte stream
       
   155     """
       
   156 
       
   157     def write (self, data) :
       
   158         """
       
   159             Write the given bytes to the stream
       
   160         """
       
   161 
       
   162         abstract
       
   163 
       
   164     def writeStruct (self, fmt, *args) :
       
   165         """
       
   166             Pack the given arguments with the given struct format, and write it
       
   167             to the stream.
       
   168         """
       
   169 
       
   170         self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args))
       
   171 
       
   172     writeItem = writeStruct
       
   173         
       
   174     def writeVarLen (self, len_type, data) :
       
   175         """
       
   176             Write a <length><data> field into the buffer. Len_type is the
       
   177             struct format code for the length field.
       
   178         """
       
   179 
       
   180         self.writeStruct(len_type, len(data))
       
   181         self.write(data)
       
   182 
       
   183     def writeEnum (self, enum, name) :
       
   184         """
       
   185             Write the single-byte value correspnding to the given name's
       
   186             position in the given enum
       
   187         """
       
   188 
       
   189         self.writeStruct('B', enum.index(name))
       
   190 
       
   191 class IBufferBase (ISeekableStream) :
       
   192     """
       
   193         A buffer simply provides a way to read and write data to/from a byte
       
   194         sequence stored in memory.
       
   195     """
       
   196 
       
   197     def tell (self) :
       
   198         return self._buf.tell()
       
   199     
       
   200     def seek (self, offset) :
       
   201         return self._buf.seek(offset)
       
   202 
       
   203     def getvalue (self) :
       
   204         """
       
   205             Returns the value of the buffer, i.e. a string with the contents of
       
   206             the buffer from position zero to the end.
       
   207         """
       
   208 
       
   209         return self._buf.getvalue()
       
   210 
       
   211 class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) :
       
   212     """
       
   213        A read-only buffer. Can be initialized with a given value and then later
       
   214        replaced in various ways, but cannot be modified.
       
   215     """
       
   216 
       
   217     def __init__ (self, data="") :
       
   218         """
       
   219             Initialize the buffer with the given data
       
   220         """
       
   221 
       
   222         self._buf = StringIO(data)
       
   223     
       
   224     def read (self, size=None) :
       
   225         """
       
   226             Return the given number of bytes, or raise a NotEnoughDataError
       
   227         """
       
   228 
       
   229         if size == 0 :
       
   230             raise ValueError("can't read zero bytes")
       
   231          
       
   232         if size :
       
   233             ret = self._buf.read(size)
       
   234         else :
       
   235             ret = self._buf.read()
       
   236 
       
   237         if size and len(ret) < size :
       
   238             raise NotEnoughDataError()
       
   239 
       
   240         return ret    
       
   241     
       
   242     def append (self, data) :
       
   243         """
       
   244             Modify the buffer such that it contains the old data from this
       
   245             buffer, and the given data at the end. The read position in the buffer
       
   246             is kept the same.
       
   247         """
       
   248 
       
   249         pos = self.tell()
       
   250 
       
   251         self._buf = StringIO(self._buf.getvalue() + data)
       
   252 
       
   253         self.seek(pos)
       
   254 
       
   255     def chop (self) :
       
   256         """
       
   257             Discard the data in the buffer before the current read position.
       
   258             Also removes any marks.
       
   259         """
       
   260 
       
   261         self._position = None
       
   262         
       
   263         self._buf = StringIO(self.read())
       
   264 
       
   265     def processWith (self, func) :
       
   266         """
       
   267             Call the given function with this buffer as an argument after
       
   268             calling mark(). If the function 
       
   269                 a) returns None, the buffer is .chop()'d, and we repeat the
       
   270                    process.
       
   271                 b) raises a NotEnoughDataError, whereupon the buffer is rolled
       
   272                    back to where it was before calling the function with 
       
   273                    chop().
       
   274                 c) raises a StopIteration, whereupon we chop the buffer and 
       
   275                    return.
       
   276                 d) returns something (i.e. ret is not None), whereupon we
       
   277                    return that (and leave the current buffer position untouched).
       
   278         """
       
   279         ret = None
       
   280         
       
   281         try :
       
   282             while ret is None :
       
   283                 self.mark()  # mark the position of the packet we are processing
       
   284                 ret = func(self)
       
   285 
       
   286                 if ret is None :
       
   287                     # discard the processed packet and proceed to the next one
       
   288                     self.chop()
       
   289                 
       
   290         except NotEnoughDataError, e :
       
   291             self.reset() # reset position back to the start of the packet
       
   292             return e
       
   293             
       
   294         except StopIteration, e:
       
   295             self.chop()
       
   296             return e # processed ok, but we don't want to process any further packets
       
   297             
       
   298         else :
       
   299             return ret
       
   300 
       
   301 class WriteBuffer (IWriteStream, IBufferBase) :
       
   302     """
       
   303         A write-only buffer. Data can be written to this buffer in various
       
   304         ways, but cannot be read from it except as a whole.
       
   305     """
       
   306 
       
   307     def __init__ (self) :
       
   308         """
       
   309             Initialize the buffer
       
   310         """
       
   311 
       
   312         self._buf = StringIO()
       
   313 
       
   314     def write (self, data) :
       
   315         """
       
   316             Write the given data to the current position in the stream,
       
   317             overwriting any previous data in that position, and extending
       
   318             the buffer if needed
       
   319         """
       
   320 
       
   321         return self._buf.write(data)
       
   322 
       
   323 def readStringStream (stream, varlen_type) :
       
   324     """
       
   325         Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string)
       
   326     """
       
   327 
       
   328     while True :
       
   329         item = stream.readVarLen(varlen_type)
       
   330 
       
   331         if item :
       
   332             yield item
       
   333         else :
       
   334             return
       
   335 
       
   336 def writeStringStream (stream, varlen_type, strings) :
       
   337     """
       
   338         Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token
       
   339     """
       
   340 
       
   341     for item in strings :
       
   342         stream.writeVarLen(varlen_type, item)
       
   343 
       
   344     stream.writeItem(varlen_type, 0)
       
   345 
       
   346 class StreamProtocol (object) :
       
   347     """
       
   348         A mixin to let you use Buffer with twisted.internet.protocol.Protocol
       
   349     """
       
   350     
       
   351     # a list of receivable command names 
       
   352     RECV_COMMANDS = None
       
   353 
       
   354     # a list of sendable command names
       
   355     SEND_COMMANDS = None
       
   356 
       
   357     def __init__ (self) :
       
   358         """
       
   359             Initialize the cross-dataReceived buffer
       
   360         """
       
   361 
       
   362         self.in_buffer = ReadBuffer()
       
   363 
       
   364     def send (self, buf) :
       
   365         """
       
   366             Write the contents of the given WriteBuffer to the transport
       
   367         """
       
   368 
       
   369         self.transport.write(buf.getvalue())
       
   370 
       
   371     def dataReceived (self, data) :
       
   372         """
       
   373             Buffer the incoming data and then try and process it
       
   374         """
       
   375 
       
   376         self.in_buffer.append(data)
       
   377         
       
   378         ret = self.in_buffer.processWith(self.processPacket)
       
   379         
       
   380     def processPacket (self, buf) :
       
   381         """
       
   382             Call processCommand with the buffer, handling the return value (either None or a deferred)
       
   383         """
       
   384 
       
   385         ret = self.processCommand(buf)
       
   386 
       
   387         if ret :
       
   388             ret.addCallback(self.send)
       
   389 
       
   390     def processCommand (self, buf) :
       
   391         """
       
   392             Process a command from the given buffer. May return a callback
       
   393         """
       
   394 
       
   395         return self.readMethod(buf, self.RECV_COMMANDS, buf)
       
   396 
       
   397     # conveniance read/write
       
   398     def startCommand (self, cmd) :
       
   399         buf = WriteBuffer()
       
   400         
       
   401         buf.writeEnum(self.SEND_COMMANDS, cmd)
       
   402 
       
   403         return buf
       
   404     
       
   405     def readMethod (self, buf, methods, *args, **kwargs) :
       
   406         """
       
   407             Reads a single-byte <methods>-enum value from the given buffer and
       
   408             use it to find the corresponding method (as <prefix>_<method-name>,
       
   409             prefix can be overriden with a keyword argument and defaults to
       
   410             'on'. If any extra arguments are given, they will be passed to the
       
   411             method.
       
   412         """
       
   413 
       
   414         prefix = kwargs.pop("prefix", "on")
       
   415 
       
   416         return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs)
       
   417