pvl.socket: move from pvl-irker
authorTero Marttila <terom@paivola.fi>
Mon, 01 Apr 2013 03:11:43 +0300
changeset 7 95d06ed3c395
parent 5 a63d8f4d0a16
child 8 2c9bc42255a2
pvl.socket: move from pvl-irker
pvl/socket.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/socket.py	Mon Apr 01 03:11:43 2013 +0300
@@ -0,0 +1,379 @@
+"""
+    A simple TCP client in the kind of syslog.fifo/file.
+
+    Interface: fileno(), __iter__, __call__
+"""
+
+# XXX: absolute import plz
+socket = __import__('socket')
+
+import select
+import errno
+
+import urlparse
+
+import logging; log = logging.getLogger('pvl.socket')
+
+# order matters!
+URL = (
+    # scheme    family              socktype
+    ( 'unix',   (socket.AF_UNIX,    None                )   ), # socktype is given
+    ( 'tcp',    (0,                 socket.SOCK_STREAM  )   ), # AF_UNSPEC
+    ( 'udp',    (0,                 socket.SOCK_DGRAM   )   ), # AF_UNSPEC
+)
+
+URL_SCHEMES = dict(URL)
+
+def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
+    """
+        Parse given string into (AF_*, SOCK_*, host, port).
+
+        For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+    """
+   
+    family, socktype = URL_SCHEMES[scheme]
+    url = urlparse.urlparse(str)
+    
+    # TODO: UNIX?
+    if url.scheme and url.netloc :
+        # proper url
+        family, socktype = URL_SCHEMES[url.scheme]
+
+        return family, socktype, url.hostname, url.port or port
+
+    elif url.scheme and url.path :
+        # host:port
+        return family, socktype, url.scheme, int(url.path)
+
+    elif url.path :
+        # host
+        return family, socktype, url.path, port
+
+    else :
+        raise ValueError("unparseable connect URL: %s", str)
+
+def connect (str, *args, **kwargs) :
+    """
+        Returns a connected socket for given parse()'d string.
+    """
+
+    family, socktype, host, port = parse(str, *args, **kwargs)
+
+    if family == socket.AF_UNIX :
+        raise ValueError("XXX: AF_UNIX is not yet supported", str)
+
+    else : # AF_UNSPEC
+        return connect_inet(host, port, family=family, socktype=socktype)
+ 
+def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+    """
+        Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
+
+        TODO: timeout?
+    """
+
+    log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+    
+    if host :
+        flags = socket.AI_CANONNAME
+    else :
+        flags = 0
+
+    addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
+
+    if not addrinfo :
+        raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
+
+    for af, st, proto, name, addr in addrinfo :
+        try :
+            sock = socket.socket(af, st, proto)
+
+        except socket.error as error :
+            log.warning("%s:%s: socket: %s", host, port, error)
+            continue
+        
+        log.debug("%s:%s: socket: %s", host, port, sock)
+
+        try :
+            sock.connect(addr)
+
+        except socket.error as error :
+            log.warning("%s:%s: connect: %s", host, port, error)
+            continue
+
+        log.debug("%s:%s: connect", host, port)
+        log.info("%s", name)
+        
+        return sock
+
+    else :
+        raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
+
+def reverse (sockaddr, numeric_host=False, numeric_port=True) :
+    """
+        Resolve given sockaddr, returning (host, port).
+    """
+
+    flags = 0
+
+    if numeric_host :
+        flags |= socket.NI_NUMERICHOST
+    
+    if numeric_port :
+        flags |= socket.NI_NUMERICSERV
+
+    return socket.getnameinfo(sockaddr, flags)
+
+def socket_str (sock) :
+    # get connected peer
+    try :
+        peer = sock.getpeername()
+
+    except socket.error as ex :
+        # fails if socket is not connected XXX: even after EOF on read..?
+        return str(ex)
+    
+    # lookup scheme
+    for scheme, (family, socktype) in URL :
+        if family and family != sock.family :
+            continue
+        elif socktype and socktype != sock.type :
+            continue
+        else :
+            break
+    else :
+        scheme = None
+
+    host, port = reverse(peer)
+    
+    if scheme :
+        return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
+    else :
+        return "{host}:{port}".format(host=host, port=port)
+
+def nonblocking (call, *args, **kwargs) :
+    """
+        Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
+
+        Raises EOFError on SIGPIPE/EPIPE.
+
+        # XXX: does python handle SIGPIPE for us?
+    """
+
+    try :
+        return call(*args, **kwargs)
+
+    except socket.error as ex :
+        # block?
+        if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
+            # empty
+            return None
+        
+        elif ex.errno == errno.EPIPE :
+            # XXX: write-eof?
+            raise EOFError()
+
+        else :
+            raise
+
+class ReadStream (object) :
+    """
+        Buffered stream, supporting non-blocking/line-based reads.
+    """
+
+    BLOCK=512
+
+    def __init__ (self, sock, buffer=None) :
+        """
+            TODO: buffer    - maximum line length
+        """
+
+        self.sock = sock
+        self._buf = ''
+
+    def fileno (self) :
+        return self.sock.fileno()
+
+    def _read (self, block=BLOCK) :
+        """
+            Read up to n bytes from socket.
+            
+            Returns None if we would block.
+            Raises EOFError on EOF.
+        """
+        
+        buf = nonblocking(self.sock.recv, block)
+
+        log.debug("%s: %s", self, buf)
+
+        if buf is None :
+            return None
+        elif buf :
+            return buf
+        else :
+            raise EOFError()
+
+    def peek (self) :
+        """
+            Peek at data in buffer.
+        """
+
+        return self._buf
+
+    def read (self) :
+        """
+            Read and return any available input.
+
+            Returns None if blocking.
+        """
+
+        if self._buf :
+            buf, self._buf = self._buf, ''
+            
+        else :
+            buf = self._read()
+
+        return buf
+
+    def readline (self) :
+        """
+            Read and return next waiting line from input.
+
+            Line is returned without trailing '\r\n' or '\n'.
+
+            Returns None if there is no line available.
+
+            XXX: trailing data in buf when _read() raises EOFError?
+        """
+
+        while '\n' not in self._buf :
+            # read chunk
+            read = self._read()
+
+            if read is None :
+                return None
+            
+            self._buf += read
+        
+        # split out one line
+        line, self._buf = self._buf.split('\n', 1)
+        
+        # in case we had \r\n
+        line = line.rstrip('\r')
+
+        log.debug("%s: %s", self, line)
+
+        return line
+    
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+
+            Returns None if thre is no more input available.
+
+            Raises EOFError in the socket was closed.
+        """
+
+        while True :
+            line = self.readline()
+
+            if line is None :
+                return
+            else :
+                yield line
+
+    __iter__ = readlines
+
+    def __str__ (self) :
+        return socket_str(self.sock)
+
+class WriteStream (object) :
+    """
+        Writable stream, supporting non-blocking/buffered writes.
+
+        XXX: buffering is completely untested
+    """
+    
+    EOL = '\n'
+
+    def __init__ (self, sock, buffer=None) :
+        """
+            TODO:   buffer  - maximum outgoing buffer length
+        """
+
+        self.sock = sock
+        self._buf = buffer
+
+    def _write (self, buf) :
+        """
+            Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
+        """
+        
+        send = nonblocking(self.sock.send, buf)
+        
+        # eof on write?
+        if send is None :
+            return None
+
+        elif send :
+            # ok, message (partially) written
+            return send
+
+        else :
+            # XXX: zero-length send? how do we handle this? What does it actually mean?
+            # handle as a wouldblock...
+            return None
+
+    def write (self, data) :
+        """
+            Write given data to socket.
+
+            TODO: buffer small chunks -> select writable -> write?
+
+            Buffers if not able to write, or raises EOFError (hah!)
+        """
+
+        if not self._buf :
+            # write directly
+            while data :
+                write = self._write(data)
+                
+                if write :
+                    # remaining data
+                    data = data[write:]
+
+                else :
+                    # cannot write more
+                    break
+
+        if not data :
+            # sent
+            return
+
+        if self._buf is None :
+            # no write buffering, and socket buffer full!
+            raise EOFError()
+
+        # append to outgoing buffer
+        self._buf += data
+
+    def writeline (self, line, eol=EOL) :
+        """
+            Write out line.
+        """
+
+        log.debug("%s: %s", self, line)
+
+        self.write(str(line))
+        self.write(eol)
+    
+    def __call__ (self, *lines) :
+        for line in lines :
+            self.writeline(line)
+        
+        # TODO: flush
+
+    def __str__ (self) :
+        return socket_str(self.sock)
+
+