--- a/pvl/socket.py Wed May 01 00:22:58 2013 +0300
+++ b/pvl/socket.py Fri Sep 20 14:45:27 2013 +0300
@@ -4,75 +4,81 @@
Interface: fileno(), __iter__, __call__
"""
-# XXX: absolute import plz
-socket = __import__('socket')
+from __future__ import absolute_import
+import errno
import select
-import errno
-
+import socket
import urlparse
-
import logging; log = logging.getLogger('pvl.socket')
-# order matters!
-URL = (
- # scheme family socktype
- ( 'unix', (socket.AF_UNIX, None ) ), # socktype is given
- ( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC
- ( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC
-)
-
-URL_SCHEMES = dict(URL)
-
-def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
- """
- Parse given string into (AF_*, SOCK_*, host, port).
-
- For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+def parse (url, port=None) :
"""
-
- family, socktype = URL_SCHEMES[scheme]
- url = urlparse.urlparse(str)
-
- # TODO: UNIX?
- if url.scheme and url.netloc :
- # proper url
- family, socktype = URL_SCHEMES[url.scheme]
-
- return family, socktype, url.hostname, url.port or port
+ Parse given string into host, port, path parts.
- elif url.scheme and url.path :
- # host:port
- return family, socktype, url.scheme, int(url.path)
-
- elif url.path :
- # host
- return family, socktype, url.path, port
-
- else :
- raise ValueError("unparseable connect URL: %s", str)
-
-def connect (str, *args, **kwargs) :
- """
- Returns a connected socket for given parse()'d string.
+ >>> parse_url('')
+ (None, None, None)
+ >>> parse_url('foo')
+ ('foo', None, None)
+ >>> parse_url('foo:80')
+ ('foo', 80, None)
+ >>> parse_url('foo:http')
+ ('foo', 'http', None)
+ >>> parse_url('/run/foo.sock')
+ (None, None, 'run/foo.sock')
"""
- family, socktype, host, port = parse(str, *args, **kwargs)
+ if '/' in url :
+ url, path = url.split('/', 1)
+ else :
+ path = None
+ if ':' in url :
+ url, port = url.split(':')
+
+ if port.isdigit() :
+ port = int(port)
+
+ if url :
+ host = url
+ else :
+ host = None
+
+ return host, port, path
+
+def connect (socktype, url, port=None, family=None) :
+ """
+ Returns a connected socket for given string, based on parse().
+ """
+
+ host, port, path = parse(url, port=port)
+
+ # autodetect as AF_UNIX
+ if path and not family :
+ family = socket.AF_UNIX
+
if family == socket.AF_UNIX :
- raise ValueError("XXX: AF_UNIX is not yet supported", str)
+ raise ValueError("TODO: AF_UNIX is not yet supported", str)
- else : # AF_UNSPEC
- return connect_inet(host, port, family=family, socktype=socktype)
-
-def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+ else : # AF_UNSPEC or AF_INET/AF_INET6
+ return connect_inet(socktype, host, port, family=family)
+
+def connect_inet (socktype, host, port, family=None) :
"""
Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
+ socktype: SOCK_STREAM or SOCK_DGRAM
+ host: IP address, hostname, or None for localhost.
+ port: integer port, or string service.
+ family: AF_UNSPEC for IP/DNS dependent IPv6/4, or AF_INET or AF_INET6.
+
TODO: timeout?
"""
log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+
+ if family is None :
+ family = socket.AF_UNSPEC
if host :
flags = socket.AI_CANONNAME
@@ -102,7 +108,6 @@
continue
log.debug("%s:%s: connect", host, port)
- log.info("%s", name)
return sock
@@ -125,6 +130,10 @@
return socket.getnameinfo(sockaddr, flags)
def socket_str (sock) :
+ """
+ Return url str for socket peer.
+ """
+
# get connected peer
try :
peer = sock.getpeername()
@@ -133,23 +142,9 @@
# fails if socket is not connected XXX: even after EOF on read..?
return str(ex)
- # lookup scheme
- for scheme, (family, socktype) in URL :
- if family and family != sock.family :
- continue
- elif socktype and socktype != sock.type :
- continue
- else :
- break
- else :
- scheme = None
-
host, port = reverse(peer)
- if scheme :
- return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
- else :
- return "{host}:{port}".format(host=host, port=port)
+ return "{host}:{port}".format(host=host, port=port)
def nonblocking (call, *args, **kwargs) :
"""
@@ -181,7 +176,7 @@
Buffered stream, supporting non-blocking/line-based reads.
"""
- BLOCK=512
+ BLOCK = 512
def __init__ (self, sock, buffer=None) :
"""
@@ -235,6 +230,8 @@
return buf
+ __call__ = read
+
def readline (self) :
"""
Read and return next waiting line from input.
@@ -296,13 +293,14 @@
EOL = '\n'
- def __init__ (self, sock, buffer=None) :
+ def __init__ (self, sock, eol=None, buffer=None) :
"""
TODO: buffer - maximum outgoing buffer length
"""
self.sock = sock
self._buf = buffer
+ self.eol = eol or self.EOL
def _write (self, buf) :
"""
@@ -357,7 +355,7 @@
# append to outgoing buffer
self._buf += data
- def writeline (self, line, eol=EOL) :
+ def writeline (self, line, eol=None) :
"""
Write out line.
"""
@@ -365,7 +363,7 @@
log.debug("%s: %s", self, line)
self.write(str(line))
- self.write(eol)
+ self.write(eol or self.eol)
def __call__ (self, *lines) :
for line in lines :
@@ -376,4 +374,85 @@
def __str__ (self) :
return socket_str(self.sock)
+class SockStream (object) :
+ """
+ A (TCP) socket connection.
+
+ Supports nonblocking reads and line-oriented protoocls.
+ """
+ PORT = None
+ EOL = None
+
+ @classmethod
+ def connect (cls, host, port=None, family=None, **opts) :
+ """
+ Blocking TCP client resolve/connect.
+
+ Raises ??? if connect fails.
+ """
+
+ sock = connect(socket.SOCK_STREAM, host, port or cls.PORT,
+ family=family,
+ )
+
+ tcp = cls(sock, **opts)
+ log.info("%s", tcp)
+ return tcp
+
+ def __init__ (self, sock, nonblocking=None) :
+ # store
+ self.sock = sock
+
+ # nonblocking mode?
+ if nonblocking :
+ self.sock.setblocking(not nonblocking)
+
+ # read/write buffer
+ self.read = ReadStream(sock)
+ self.write = WriteStream(sock, eol=self.EOL)
+
+ # cached endpoint names
+ self._local_name = None
+ self._remote_name = None
+
+ def fileno (self) :
+ return self.sock.fileno()
+
+ def __iter__ (self) :
+ return iter(self.read)
+
+ def __call__ (self, *args, **kwargs) :
+ return self.write(*args, **kwargs)
+
+ @property
+ def local (self) :
+ """
+ Resolve the local endpoint (host, port)
+ """
+
+ if not self._local_name :
+ try :
+ self._local_name, port = reverse(self.sock.getsockname())
+ except socket.error as ex:
+ return None
+
+ return self._local_name
+
+ @property
+ def remote (self) :
+ """
+ Resolve the remote endpoint (host, port).
+ """
+
+ if not self._remote_name :
+ try :
+ self._remote_name, port = reverse(self.sock.getpeername())
+ except socket.error as ex:
+ return None
+
+ return self._remote_name
+
+ def __str__ (self) :
+ return "%s" % self.remote
+