pvl/socket.py
changeset 224 ed410776effd
parent 223 6842794c20e8
child 225 3c2d0dd42045
equal deleted inserted replaced
223:6842794c20e8 224:ed410776effd
     1 """
       
     2     A simple TCP client in the kind of syslog.fifo/file.
       
     3 
       
     4     Interface: fileno(), __iter__, __call__
       
     5 """
       
     6 
       
     7 # XXX: absolute import plz
       
     8 socket = __import__('socket')
       
     9 
       
    10 import select
       
    11 import errno
       
    12 
       
    13 import urlparse
       
    14 
       
    15 import logging; log = logging.getLogger('pvl.socket')
       
    16 
       
    17 # order matters!
       
    18 URL = (
       
    19     # scheme    family              socktype
       
    20     ( 'unix',   (socket.AF_UNIX,    None                )   ), # socktype is given
       
    21     ( 'tcp',    (0,                 socket.SOCK_STREAM  )   ), # AF_UNSPEC
       
    22     ( 'udp',    (0,                 socket.SOCK_DGRAM   )   ), # AF_UNSPEC
       
    23 )
       
    24 
       
    25 URL_SCHEMES = dict(URL)
       
    26 
       
    27 def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
       
    28     """
       
    29         Parse given string into (AF_*, SOCK_*, host, port).
       
    30 
       
    31         For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
       
    32     """
       
    33    
       
    34     family, socktype = URL_SCHEMES[scheme]
       
    35     url = urlparse.urlparse(str)
       
    36     
       
    37     # TODO: UNIX?
       
    38     if url.scheme and url.netloc :
       
    39         # proper url
       
    40         family, socktype = URL_SCHEMES[url.scheme]
       
    41 
       
    42         return family, socktype, url.hostname, url.port or port
       
    43 
       
    44     elif url.scheme and url.path :
       
    45         # host:port
       
    46         return family, socktype, url.scheme, int(url.path)
       
    47 
       
    48     elif url.path :
       
    49         # host
       
    50         return family, socktype, url.path, port
       
    51 
       
    52     else :
       
    53         raise ValueError("unparseable connect URL: %s", str)
       
    54 
       
    55 def connect (str, *args, **kwargs) :
       
    56     """
       
    57         Returns a connected socket for given parse()'d string.
       
    58     """
       
    59 
       
    60     family, socktype, host, port = parse(str, *args, **kwargs)
       
    61 
       
    62     if family == socket.AF_UNIX :
       
    63         raise ValueError("XXX: AF_UNIX is not yet supported", str)
       
    64 
       
    65     else : # AF_UNSPEC
       
    66         return connect_inet(host, port, family=family, socktype=socktype)
       
    67  
       
    68 def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
       
    69     """
       
    70         Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
       
    71 
       
    72         TODO: timeout?
       
    73     """
       
    74 
       
    75     log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
       
    76     
       
    77     if host :
       
    78         flags = socket.AI_CANONNAME
       
    79     else :
       
    80         flags = 0
       
    81 
       
    82     addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
       
    83 
       
    84     if not addrinfo :
       
    85         raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
       
    86 
       
    87     for af, st, proto, name, addr in addrinfo :
       
    88         try :
       
    89             sock = socket.socket(af, st, proto)
       
    90 
       
    91         except socket.error as error :
       
    92             log.warning("%s:%s: socket: %s", host, port, error)
       
    93             continue
       
    94         
       
    95         log.debug("%s:%s: socket: %s", host, port, sock)
       
    96 
       
    97         try :
       
    98             sock.connect(addr)
       
    99 
       
   100         except socket.error as error :
       
   101             log.warning("%s:%s: connect: %s", host, port, error)
       
   102             continue
       
   103 
       
   104         log.debug("%s:%s: connect", host, port)
       
   105         log.info("%s", name)
       
   106         
       
   107         return sock
       
   108 
       
   109     else :
       
   110         raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
       
   111 
       
   112 def reverse (sockaddr, numeric_host=False, numeric_port=True) :
       
   113     """
       
   114         Resolve given sockaddr, returning (host, port).
       
   115     """
       
   116 
       
   117     flags = 0
       
   118 
       
   119     if numeric_host :
       
   120         flags |= socket.NI_NUMERICHOST
       
   121     
       
   122     if numeric_port :
       
   123         flags |= socket.NI_NUMERICSERV
       
   124 
       
   125     return socket.getnameinfo(sockaddr, flags)
       
   126 
       
   127 def socket_str (sock) :
       
   128     # get connected peer
       
   129     try :
       
   130         peer = sock.getpeername()
       
   131 
       
   132     except socket.error as ex :
       
   133         # fails if socket is not connected XXX: even after EOF on read..?
       
   134         return str(ex)
       
   135     
       
   136     # lookup scheme
       
   137     for scheme, (family, socktype) in URL :
       
   138         if family and family != sock.family :
       
   139             continue
       
   140         elif socktype and socktype != sock.type :
       
   141             continue
       
   142         else :
       
   143             break
       
   144     else :
       
   145         scheme = None
       
   146 
       
   147     host, port = reverse(peer)
       
   148     
       
   149     if scheme :
       
   150         return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
       
   151     else :
       
   152         return "{host}:{port}".format(host=host, port=port)
       
   153 
       
   154 def nonblocking (call, *args, **kwargs) :
       
   155     """
       
   156         Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
       
   157 
       
   158         Raises EOFError on SIGPIPE/EPIPE.
       
   159 
       
   160         # XXX: does python handle SIGPIPE for us?
       
   161     """
       
   162 
       
   163     try :
       
   164         return call(*args, **kwargs)
       
   165 
       
   166     except socket.error as ex :
       
   167         # block?
       
   168         if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
       
   169             # empty
       
   170             return None
       
   171         
       
   172         elif ex.errno == errno.EPIPE :
       
   173             # XXX: write-eof?
       
   174             raise EOFError()
       
   175 
       
   176         else :
       
   177             raise
       
   178 
       
   179 class ReadStream (object) :
       
   180     """
       
   181         Buffered stream, supporting non-blocking/line-based reads.
       
   182     """
       
   183 
       
   184     BLOCK=512
       
   185 
       
   186     def __init__ (self, sock, buffer=None) :
       
   187         """
       
   188             TODO: buffer    - maximum line length
       
   189         """
       
   190 
       
   191         self.sock = sock
       
   192         self._buf = ''
       
   193 
       
   194     def fileno (self) :
       
   195         return self.sock.fileno()
       
   196 
       
   197     def _read (self, block=BLOCK) :
       
   198         """
       
   199             Read up to n bytes from socket.
       
   200             
       
   201             Returns None if we would block.
       
   202             Raises EOFError on EOF.
       
   203         """
       
   204         
       
   205         buf = nonblocking(self.sock.recv, block)
       
   206 
       
   207         log.debug("%s: %s", self, buf)
       
   208 
       
   209         if buf is None :
       
   210             return None
       
   211         elif buf :
       
   212             return buf
       
   213         else :
       
   214             raise EOFError()
       
   215 
       
   216     def peek (self) :
       
   217         """
       
   218             Peek at data in buffer.
       
   219         """
       
   220 
       
   221         return self._buf
       
   222 
       
   223     def read (self) :
       
   224         """
       
   225             Read and return any available input.
       
   226 
       
   227             Returns None if blocking.
       
   228         """
       
   229 
       
   230         if self._buf :
       
   231             buf, self._buf = self._buf, ''
       
   232             
       
   233         else :
       
   234             buf = self._read()
       
   235 
       
   236         return buf
       
   237 
       
   238     def readline (self) :
       
   239         """
       
   240             Read and return next waiting line from input.
       
   241 
       
   242             Line is returned without trailing '\r\n' or '\n'.
       
   243 
       
   244             Returns None if there is no line available.
       
   245 
       
   246             XXX: trailing data in buf when _read() raises EOFError?
       
   247         """
       
   248 
       
   249         while '\n' not in self._buf :
       
   250             # read chunk
       
   251             read = self._read()
       
   252 
       
   253             if read is None :
       
   254                 return None
       
   255             
       
   256             self._buf += read
       
   257         
       
   258         # split out one line
       
   259         line, self._buf = self._buf.split('\n', 1)
       
   260         
       
   261         # in case we had \r\n
       
   262         line = line.rstrip('\r')
       
   263 
       
   264         log.debug("%s: %s", self, line)
       
   265 
       
   266         return line
       
   267     
       
   268     def readlines (self) :
       
   269         """
       
   270             Read any available input, yielding lines.
       
   271 
       
   272             Returns None if thre is no more input available.
       
   273 
       
   274             Raises EOFError in the socket was closed.
       
   275         """
       
   276 
       
   277         while True :
       
   278             line = self.readline()
       
   279 
       
   280             if line is None :
       
   281                 return
       
   282             else :
       
   283                 yield line
       
   284 
       
   285     __iter__ = readlines
       
   286 
       
   287     def __str__ (self) :
       
   288         return socket_str(self.sock)
       
   289 
       
   290 class WriteStream (object) :
       
   291     """
       
   292         Writable stream, supporting non-blocking/buffered writes.
       
   293 
       
   294         XXX: buffering is completely untested
       
   295     """
       
   296     
       
   297     EOL = '\n'
       
   298 
       
   299     def __init__ (self, sock, buffer=None) :
       
   300         """
       
   301             TODO:   buffer  - maximum outgoing buffer length
       
   302         """
       
   303 
       
   304         self.sock = sock
       
   305         self._buf = buffer
       
   306 
       
   307     def _write (self, buf) :
       
   308         """
       
   309             Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
       
   310         """
       
   311         
       
   312         send = nonblocking(self.sock.send, buf)
       
   313         
       
   314         # eof on write?
       
   315         if send is None :
       
   316             return None
       
   317 
       
   318         elif send :
       
   319             # ok, message (partially) written
       
   320             return send
       
   321 
       
   322         else :
       
   323             # XXX: zero-length send? how do we handle this? What does it actually mean?
       
   324             # handle as a wouldblock...
       
   325             return None
       
   326 
       
   327     def write (self, data) :
       
   328         """
       
   329             Write given data to socket.
       
   330 
       
   331             TODO: buffer small chunks -> select writable -> write?
       
   332 
       
   333             Buffers if not able to write, or raises EOFError (hah!)
       
   334         """
       
   335 
       
   336         if not self._buf :
       
   337             # write directly
       
   338             while data :
       
   339                 write = self._write(data)
       
   340                 
       
   341                 if write :
       
   342                     # remaining data
       
   343                     data = data[write:]
       
   344 
       
   345                 else :
       
   346                     # cannot write more
       
   347                     break
       
   348 
       
   349         if not data :
       
   350             # sent
       
   351             return
       
   352 
       
   353         if self._buf is None :
       
   354             # no write buffering, and socket buffer full!
       
   355             raise EOFError()
       
   356 
       
   357         # append to outgoing buffer
       
   358         self._buf += data
       
   359 
       
   360     def writeline (self, line, eol=EOL) :
       
   361         """
       
   362             Write out line.
       
   363         """
       
   364 
       
   365         log.debug("%s: %s", self, line)
       
   366 
       
   367         self.write(str(line))
       
   368         self.write(eol)
       
   369     
       
   370     def __call__ (self, *lines) :
       
   371         for line in lines :
       
   372             self.writeline(line)
       
   373         
       
   374         # TODO: flush
       
   375 
       
   376     def __str__ (self) :
       
   377         return socket_str(self.sock)
       
   378 
       
   379