pvl/socket.py
changeset 116 89b7385d19ba
child 118 4f9bcf1e53e0
equal deleted inserted replaced
115:9772d43669fb 116:89b7385d19ba
       
     1 """
       
     2     A simple TCP client in the kind of syslog.fifo/file.
       
     3 
       
     4     Interface: fileno(), __iter__, __call__
       
     5 """
       
     6 
       
     7 # XXX: absolute import plz
       
     8 socket = __import__('socket')
       
     9 
       
    10 import select
       
    11 
       
    12 import urlparse
       
    13 
       
    14 import logging; log = logging.getLogger('pvl.socket')
       
    15 
       
    16 URL = {
       
    17     'tcp':  (0,                 socket.SOCK_STREAM  ), # AF_UNSPEC
       
    18     'udp':  (0,                 socket.SOCK_DGRAM   ), # AF_UNSPEC
       
    19     'unix': (socket.AF_UNIX,    None                ), # socktype is given
       
    20 }
       
    21 
       
    22 def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
       
    23     """
       
    24         Parse given string into (AF_*, SOCK_*, host, port).
       
    25 
       
    26         For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
       
    27     """
       
    28    
       
    29     family, socktype = URL[scheme]
       
    30     url = urlparse.urlparse(str)
       
    31     
       
    32     # TODO: UNIX?
       
    33     if url.scheme and url.netloc :
       
    34         # proper url
       
    35         family, socktype = URL[url.scheme]
       
    36 
       
    37         return family, socktype, url.hostname, url.port or port
       
    38 
       
    39     elif url.scheme and url.path :
       
    40         # host:port
       
    41         return family, socktype, url.scheme, int(url.path)
       
    42 
       
    43     elif url.path :
       
    44         # host
       
    45         return family, socktype, url.path, port
       
    46 
       
    47     else :
       
    48         raise ValueError("unparseable connect URL: %s", str)
       
    49 
       
    50 def connect (str, *args, **kwargs) :
       
    51     """
       
    52         Returns a connected socket for given parse()'d string.
       
    53     """
       
    54 
       
    55     family, socktype, host, port = parse(str, *args, **kwargs)
       
    56 
       
    57     if family == socket.AF_UNIX :
       
    58         raise ValueError("XXX: AF_UNIX is not yet supported", str)
       
    59     else : # AF_UNSPEC
       
    60         return connect_inet(host, port, family=family, socktype=socktype)
       
    61  
       
    62 def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
       
    63     """
       
    64         Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
       
    65 
       
    66         TODO: timeout?
       
    67     """
       
    68 
       
    69     log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
       
    70     
       
    71     if host :
       
    72         flags = socket.AI_CANONNAME
       
    73     else :
       
    74         flags = 0
       
    75 
       
    76     addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
       
    77 
       
    78     if not addrinfo :
       
    79         raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
       
    80 
       
    81     for af, st, proto, name, addr in addrinfo :
       
    82         try :
       
    83             sock = socket.socket(af, st, proto)
       
    84 
       
    85         except socket.error as error :
       
    86             log.warning("%s:%s: socket: %s", host, port, error)
       
    87             continue
       
    88         
       
    89         log.debug("%s:%s: socket: %s", host, port, sock)
       
    90 
       
    91         try :
       
    92             sock.connect(addr)
       
    93 
       
    94         except socket.error as error :
       
    95             log.warning("%s:%s: connect: %s", host, port, error)
       
    96             continue
       
    97 
       
    98         log.debug("%s:%s: connect", host, port)
       
    99         log.info("%s", name)
       
   100         
       
   101         return sock
       
   102 
       
   103     else :
       
   104         raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
       
   105 
       
   106 def reverse (self, sockaddr, numeric_host=False, numeric_port=True) :
       
   107     """
       
   108         Resolve given sockaddr, returning (host, port).
       
   109     """
       
   110 
       
   111     flags = 0
       
   112 
       
   113     if numeric_host :
       
   114         flags |= socket.NI_NUMERICHOST
       
   115     
       
   116     if numeric_port :
       
   117         flags |= socket.NI_NUMERICSERV
       
   118 
       
   119     return socket.getnameinfo(sockaddr, flags)
       
   120 
       
   121 def nonblocking (call, *args, **kwargs) :
       
   122     """
       
   123         Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
       
   124     """
       
   125 
       
   126     try :
       
   127         return call(*args, **kwargs)
       
   128 
       
   129     except socket.error as ex :
       
   130         # block?
       
   131         if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
       
   132             # empty
       
   133             return None
       
   134 
       
   135         else :
       
   136             raise
       
   137 
       
   138 class ReadStream (object) :
       
   139     """
       
   140         Buffered stream, supporting non-blocking/line-based reads.
       
   141     """
       
   142 
       
   143     BLOCK=512
       
   144 
       
   145     def __init__ (self, sock, buffer=None) :
       
   146         """
       
   147             TODO: buffer    - maximum line length
       
   148         """
       
   149 
       
   150         self.sock = sock
       
   151         self._buf = ''
       
   152 
       
   153     def fileno (self) :
       
   154         return self.sock.fileno()
       
   155 
       
   156     def _read (self, block=BLOCK) :
       
   157         """
       
   158             Read up to n bytes from socket.
       
   159             
       
   160             Returns None if we would block.
       
   161             Raises EOFError on EOF.
       
   162         """
       
   163         
       
   164         buf = nonblocking(self.sock.recv, block)
       
   165 
       
   166         # eof?
       
   167         if not buf :
       
   168             raise EOFError()
       
   169 
       
   170         # ok
       
   171         return buf
       
   172 
       
   173     def peek (self) :
       
   174         """
       
   175             Peek at data in buffer.
       
   176         """
       
   177 
       
   178         return self._buf
       
   179 
       
   180     def read (self) :
       
   181         """
       
   182             Read and return any available input.
       
   183 
       
   184             Returns None if blocking.
       
   185         """
       
   186 
       
   187         if self._buf :
       
   188             buf, self._buf = self._buf, ''
       
   189             
       
   190         else :
       
   191             buf = self._read()
       
   192 
       
   193         return buf
       
   194 
       
   195     def readline (self) :
       
   196         """
       
   197             Read and return next waiting line from input.
       
   198 
       
   199             Line is returned without trailing '\r\n' or '\n'.
       
   200 
       
   201             Returns None if there is no line available.
       
   202         """
       
   203 
       
   204         while '\n' not in self._buf :
       
   205             # read chunk
       
   206             read = self._read()
       
   207 
       
   208             if read is None :
       
   209                 return None
       
   210             
       
   211             self._buf += read
       
   212         
       
   213         # split out one line
       
   214         line, self._buf = self._buf.split('\n', 1)
       
   215         
       
   216         # in case we had \r\n
       
   217         line = line.rstrip('\r')
       
   218 
       
   219         return line
       
   220     
       
   221     def readlines (self) :
       
   222         """
       
   223             Read any available input, yielding lines.
       
   224 
       
   225             Returns None if thre is no more input available.
       
   226 
       
   227             Raises EOFError in the socket was closed.
       
   228         """
       
   229 
       
   230         while True :
       
   231             line = self.readline()
       
   232 
       
   233             if line is None :
       
   234                 # no more
       
   235                 return
       
   236             else :
       
   237                 yield line
       
   238 
       
   239     __iter__ = readlines
       
   240 
       
   241 class WriteStream (object) :
       
   242     """
       
   243         Writable stream, supporting non-blocking/buffered writes.
       
   244 
       
   245         XXX: buffering is completely untested
       
   246     """
       
   247     
       
   248     EOL = '\n'
       
   249 
       
   250     def __init__ (self, sock, buffer=None) :
       
   251         """
       
   252             TODO:   buffer  - maximum outgoing buffer length
       
   253         """
       
   254 
       
   255         self.sock = sock
       
   256         self._buf = buffer
       
   257 
       
   258     def _write (self, buf) :
       
   259         """
       
   260             Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
       
   261         """
       
   262         
       
   263         send = nonblocking(self.sock.send, buf)
       
   264         
       
   265         # eof on write?
       
   266         if send :
       
   267             # ok, message (partially) written
       
   268             return send
       
   269 
       
   270         else :
       
   271             # XXX: how do we handle this? What does it actually mean?
       
   272             # handle as a wouldblock...
       
   273             return None
       
   274 
       
   275 
       
   276     def write (self, data) :
       
   277         """
       
   278             Write given data to socket.
       
   279 
       
   280             TODO: buffer small chunks -> select writable -> write?
       
   281 
       
   282             Buffers if not able to write, or raises EOFError (hah!)
       
   283         """
       
   284 
       
   285         if not self._buf :
       
   286             # write directly
       
   287             while data :
       
   288                 write = self._write(data)
       
   289                 
       
   290                 if write :
       
   291                     # remaining data
       
   292                     data = data[write:]
       
   293 
       
   294                 else :
       
   295                     # cannot write more
       
   296                     break
       
   297 
       
   298         if not data :
       
   299             # sent
       
   300             return
       
   301 
       
   302         if self._buf is None :
       
   303             # no write buffering, and socket buffer full!
       
   304             raise EOFError()
       
   305 
       
   306         # append to outgoing buffer
       
   307         self._buf += data
       
   308 
       
   309     def writeline (self, line, eol=EOL) :
       
   310         """
       
   311             Write out line.
       
   312         """
       
   313 
       
   314         self.write(str(line) + eol)
       
   315     
       
   316     def __call__ (self, *lines) :
       
   317         for line in lines :
       
   318             self.writeline(line)
       
   319         
       
   320         # TODO: flush