pvl.socket: cleanup unused scheme support, making socktype explicit. Add SockStream class
authorTero Marttila <terom@paivola.fi>
Fri, 20 Sep 2013 14:45:27 +0300
changeset 14 80d9f73f379a
parent 13 69a35866264b
child 15 e699ed00fcf1
pvl.socket: cleanup unused scheme support, making socktype explicit. Add SockStream class
pvl/socket.py
--- a/pvl/socket.py	Wed May 01 00:22:58 2013 +0300
+++ b/pvl/socket.py	Fri Sep 20 14:45:27 2013 +0300
@@ -4,75 +4,81 @@
     Interface: fileno(), __iter__, __call__
 """
 
-# XXX: absolute import plz
-socket = __import__('socket')
+from __future__ import absolute_import
 
+import errno
 import select
-import errno
-
+import socket
 import urlparse
-
 import logging; log = logging.getLogger('pvl.socket')
 
-# order matters!
-URL = (
-    # scheme    family              socktype
-    ( 'unix',   (socket.AF_UNIX,    None                )   ), # socktype is given
-    ( 'tcp',    (0,                 socket.SOCK_STREAM  )   ), # AF_UNSPEC
-    ( 'udp',    (0,                 socket.SOCK_DGRAM   )   ), # AF_UNSPEC
-)
-
-URL_SCHEMES = dict(URL)
-
-def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
-    """
-        Parse given string into (AF_*, SOCK_*, host, port).
-
-        For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+def parse (url, port=None) :
     """
-   
-    family, socktype = URL_SCHEMES[scheme]
-    url = urlparse.urlparse(str)
-    
-    # TODO: UNIX?
-    if url.scheme and url.netloc :
-        # proper url
-        family, socktype = URL_SCHEMES[url.scheme]
-
-        return family, socktype, url.hostname, url.port or port
+        Parse given string into host, port, path parts.
 
-    elif url.scheme and url.path :
-        # host:port
-        return family, socktype, url.scheme, int(url.path)
-
-    elif url.path :
-        # host
-        return family, socktype, url.path, port
-
-    else :
-        raise ValueError("unparseable connect URL: %s", str)
-
-def connect (str, *args, **kwargs) :
-    """
-        Returns a connected socket for given parse()'d string.
+        >>> parse_url('')
+        (None, None, None)
+        >>> parse_url('foo')
+        ('foo', None, None)
+        >>> parse_url('foo:80')
+        ('foo', 80, None)
+        >>> parse_url('foo:http')
+        ('foo', 'http', None)
+        >>> parse_url('/run/foo.sock')
+        (None, None, 'run/foo.sock')
     """
 
-    family, socktype, host, port = parse(str, *args, **kwargs)
+    if '/' in url :
+        url, path = url.split('/', 1)
+    else :
+        path = None
 
+    if ':' in url :
+        url, port = url.split(':')
+
+        if port.isdigit() :
+            port = int(port)
+    
+    if url :
+        host = url
+    else :
+        host = None
+
+    return host, port, path
+
+def connect (socktype, url, port=None, family=None) :
+    """
+        Returns a connected socket for given string, based on parse().
+    """
+
+    host, port, path = parse(url, port=port)
+    
+    # autodetect as AF_UNIX
+    if path and not family :
+        family = socket.AF_UNIX
+    
     if family == socket.AF_UNIX :
-        raise ValueError("XXX: AF_UNIX is not yet supported", str)
+        raise ValueError("TODO: AF_UNIX is not yet supported", str)
 
-    else : # AF_UNSPEC
-        return connect_inet(host, port, family=family, socktype=socktype)
- 
-def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+    else : # AF_UNSPEC or AF_INET/AF_INET6
+        return connect_inet(socktype, host, port, family=family)
+
+def connect_inet (socktype, host, port, family=None) :
     """
         Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
 
+        socktype: SOCK_STREAM or SOCK_DGRAM
+        host: IP address, hostname, or None for localhost.
+        port: integer port, or string service.
+        family: AF_UNSPEC for IP/DNS dependent IPv6/4, or AF_INET or AF_INET6.
+
         TODO: timeout?
     """
 
     log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+
+    if family is None :
+        family = socket.AF_UNSPEC
     
     if host :
         flags = socket.AI_CANONNAME
@@ -102,7 +108,6 @@
             continue
 
         log.debug("%s:%s: connect", host, port)
-        log.info("%s", name)
         
         return sock
 
@@ -125,6 +130,10 @@
     return socket.getnameinfo(sockaddr, flags)
 
 def socket_str (sock) :
+    """
+        Return url str for socket peer.
+    """
+
     # get connected peer
     try :
         peer = sock.getpeername()
@@ -133,23 +142,9 @@
         # fails if socket is not connected XXX: even after EOF on read..?
         return str(ex)
     
-    # lookup scheme
-    for scheme, (family, socktype) in URL :
-        if family and family != sock.family :
-            continue
-        elif socktype and socktype != sock.type :
-            continue
-        else :
-            break
-    else :
-        scheme = None
-
     host, port = reverse(peer)
     
-    if scheme :
-        return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
-    else :
-        return "{host}:{port}".format(host=host, port=port)
+    return "{host}:{port}".format(host=host, port=port)
 
 def nonblocking (call, *args, **kwargs) :
     """
@@ -181,7 +176,7 @@
         Buffered stream, supporting non-blocking/line-based reads.
     """
 
-    BLOCK=512
+    BLOCK = 512
 
     def __init__ (self, sock, buffer=None) :
         """
@@ -235,6 +230,8 @@
 
         return buf
 
+    __call__ = read
+
     def readline (self) :
         """
             Read and return next waiting line from input.
@@ -296,13 +293,14 @@
     
     EOL = '\n'
 
-    def __init__ (self, sock, buffer=None) :
+    def __init__ (self, sock, eol=None, buffer=None) :
         """
             TODO:   buffer  - maximum outgoing buffer length
         """
 
         self.sock = sock
         self._buf = buffer
+        self.eol = eol or self.EOL
 
     def _write (self, buf) :
         """
@@ -357,7 +355,7 @@
         # append to outgoing buffer
         self._buf += data
 
-    def writeline (self, line, eol=EOL) :
+    def writeline (self, line, eol=None) :
         """
             Write out line.
         """
@@ -365,7 +363,7 @@
         log.debug("%s: %s", self, line)
 
         self.write(str(line))
-        self.write(eol)
+        self.write(eol or self.eol)
     
     def __call__ (self, *lines) :
         for line in lines :
@@ -376,4 +374,85 @@
     def __str__ (self) :
         return socket_str(self.sock)
 
+class SockStream (object) :
+    """
+        A (TCP) socket connection.
+        
+        Supports nonblocking reads and line-oriented protoocls.
+    """
 
+    PORT = None
+    EOL = None
+    
+    @classmethod
+    def connect (cls, host, port=None, family=None, **opts) :
+        """
+            Blocking TCP client resolve/connect.
+
+            Raises ??? if connect fails.
+        """
+
+        sock = connect(socket.SOCK_STREAM, host, port or cls.PORT,
+                family=family,
+        )
+
+        tcp = cls(sock, **opts)
+        log.info("%s", tcp)
+        return tcp
+
+    def __init__ (self, sock, nonblocking=None) :
+        # store
+        self.sock = sock
+        
+        # nonblocking mode?
+        if nonblocking :
+            self.sock.setblocking(not nonblocking)
+        
+        # read/write buffer
+        self.read = ReadStream(sock)
+        self.write = WriteStream(sock, eol=self.EOL)
+
+        # cached endpoint names
+        self._local_name = None
+        self._remote_name = None
+   
+    def fileno (self) :
+        return self.sock.fileno()
+
+    def __iter__ (self) :
+        return iter(self.read)
+
+    def __call__ (self, *args, **kwargs) :
+        return self.write(*args, **kwargs)
+
+    @property
+    def local (self) :
+        """
+            Resolve the local endpoint (host, port)
+        """
+
+        if not self._local_name :
+            try :
+                self._local_name, port = reverse(self.sock.getsockname())
+            except socket.error as ex:
+                return None
+
+        return self._local_name
+    
+    @property
+    def remote (self) :
+        """
+            Resolve the remote endpoint (host, port).
+        """
+
+        if not self._remote_name :
+            try :
+                self._remote_name, port = reverse(self.sock.getpeername())
+            except socket.error as ex:
+                return None
+
+        return self._remote_name
+
+    def __str__ (self) :
+        return "%s" % self.remote
+