--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/socket.py Mon Apr 01 03:11:43 2013 +0300
@@ -0,0 +1,379 @@
+"""
+ A simple TCP client in the kind of syslog.fifo/file.
+
+ Interface: fileno(), __iter__, __call__
+"""
+
+# XXX: absolute import plz
+socket = __import__('socket')
+
+import select
+import errno
+
+import urlparse
+
+import logging; log = logging.getLogger('pvl.socket')
+
+# order matters!
+URL = (
+ # scheme family socktype
+ ( 'unix', (socket.AF_UNIX, None ) ), # socktype is given
+ ( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC
+ ( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC
+)
+
+URL_SCHEMES = dict(URL)
+
+def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
+ """
+ Parse given string into (AF_*, SOCK_*, host, port).
+
+ For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+ """
+
+ family, socktype = URL_SCHEMES[scheme]
+ url = urlparse.urlparse(str)
+
+ # TODO: UNIX?
+ if url.scheme and url.netloc :
+ # proper url
+ family, socktype = URL_SCHEMES[url.scheme]
+
+ return family, socktype, url.hostname, url.port or port
+
+ elif url.scheme and url.path :
+ # host:port
+ return family, socktype, url.scheme, int(url.path)
+
+ elif url.path :
+ # host
+ return family, socktype, url.path, port
+
+ else :
+ raise ValueError("unparseable connect URL: %s", str)
+
+def connect (str, *args, **kwargs) :
+ """
+ Returns a connected socket for given parse()'d string.
+ """
+
+ family, socktype, host, port = parse(str, *args, **kwargs)
+
+ if family == socket.AF_UNIX :
+ raise ValueError("XXX: AF_UNIX is not yet supported", str)
+
+ else : # AF_UNSPEC
+ return connect_inet(host, port, family=family, socktype=socktype)
+
+def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+ """
+ Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
+
+ TODO: timeout?
+ """
+
+ log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+
+ if host :
+ flags = socket.AI_CANONNAME
+ else :
+ flags = 0
+
+ addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
+
+ if not addrinfo :
+ raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
+
+ for af, st, proto, name, addr in addrinfo :
+ try :
+ sock = socket.socket(af, st, proto)
+
+ except socket.error as error :
+ log.warning("%s:%s: socket: %s", host, port, error)
+ continue
+
+ log.debug("%s:%s: socket: %s", host, port, sock)
+
+ try :
+ sock.connect(addr)
+
+ except socket.error as error :
+ log.warning("%s:%s: connect: %s", host, port, error)
+ continue
+
+ log.debug("%s:%s: connect", host, port)
+ log.info("%s", name)
+
+ return sock
+
+ else :
+ raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
+
+def reverse (sockaddr, numeric_host=False, numeric_port=True) :
+ """
+ Resolve given sockaddr, returning (host, port).
+ """
+
+ flags = 0
+
+ if numeric_host :
+ flags |= socket.NI_NUMERICHOST
+
+ if numeric_port :
+ flags |= socket.NI_NUMERICSERV
+
+ return socket.getnameinfo(sockaddr, flags)
+
+def socket_str (sock) :
+ # get connected peer
+ try :
+ peer = sock.getpeername()
+
+ except socket.error as ex :
+ # fails if socket is not connected XXX: even after EOF on read..?
+ return str(ex)
+
+ # lookup scheme
+ for scheme, (family, socktype) in URL :
+ if family and family != sock.family :
+ continue
+ elif socktype and socktype != sock.type :
+ continue
+ else :
+ break
+ else :
+ scheme = None
+
+ host, port = reverse(peer)
+
+ if scheme :
+ return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
+ else :
+ return "{host}:{port}".format(host=host, port=port)
+
+def nonblocking (call, *args, **kwargs) :
+ """
+ Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
+
+ Raises EOFError on SIGPIPE/EPIPE.
+
+ # XXX: does python handle SIGPIPE for us?
+ """
+
+ try :
+ return call(*args, **kwargs)
+
+ except socket.error as ex :
+ # block?
+ if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
+ # empty
+ return None
+
+ elif ex.errno == errno.EPIPE :
+ # XXX: write-eof?
+ raise EOFError()
+
+ else :
+ raise
+
+class ReadStream (object) :
+ """
+ Buffered stream, supporting non-blocking/line-based reads.
+ """
+
+ BLOCK=512
+
+ def __init__ (self, sock, buffer=None) :
+ """
+ TODO: buffer - maximum line length
+ """
+
+ self.sock = sock
+ self._buf = ''
+
+ def fileno (self) :
+ return self.sock.fileno()
+
+ def _read (self, block=BLOCK) :
+ """
+ Read up to n bytes from socket.
+
+ Returns None if we would block.
+ Raises EOFError on EOF.
+ """
+
+ buf = nonblocking(self.sock.recv, block)
+
+ log.debug("%s: %s", self, buf)
+
+ if buf is None :
+ return None
+ elif buf :
+ return buf
+ else :
+ raise EOFError()
+
+ def peek (self) :
+ """
+ Peek at data in buffer.
+ """
+
+ return self._buf
+
+ def read (self) :
+ """
+ Read and return any available input.
+
+ Returns None if blocking.
+ """
+
+ if self._buf :
+ buf, self._buf = self._buf, ''
+
+ else :
+ buf = self._read()
+
+ return buf
+
+ def readline (self) :
+ """
+ Read and return next waiting line from input.
+
+ Line is returned without trailing '\r\n' or '\n'.
+
+ Returns None if there is no line available.
+
+ XXX: trailing data in buf when _read() raises EOFError?
+ """
+
+ while '\n' not in self._buf :
+ # read chunk
+ read = self._read()
+
+ if read is None :
+ return None
+
+ self._buf += read
+
+ # split out one line
+ line, self._buf = self._buf.split('\n', 1)
+
+ # in case we had \r\n
+ line = line.rstrip('\r')
+
+ log.debug("%s: %s", self, line)
+
+ return line
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Returns None if thre is no more input available.
+
+ Raises EOFError in the socket was closed.
+ """
+
+ while True :
+ line = self.readline()
+
+ if line is None :
+ return
+ else :
+ yield line
+
+ __iter__ = readlines
+
+ def __str__ (self) :
+ return socket_str(self.sock)
+
+class WriteStream (object) :
+ """
+ Writable stream, supporting non-blocking/buffered writes.
+
+ XXX: buffering is completely untested
+ """
+
+ EOL = '\n'
+
+ def __init__ (self, sock, buffer=None) :
+ """
+ TODO: buffer - maximum outgoing buffer length
+ """
+
+ self.sock = sock
+ self._buf = buffer
+
+ def _write (self, buf) :
+ """
+ Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
+ """
+
+ send = nonblocking(self.sock.send, buf)
+
+ # eof on write?
+ if send is None :
+ return None
+
+ elif send :
+ # ok, message (partially) written
+ return send
+
+ else :
+ # XXX: zero-length send? how do we handle this? What does it actually mean?
+ # handle as a wouldblock...
+ return None
+
+ def write (self, data) :
+ """
+ Write given data to socket.
+
+ TODO: buffer small chunks -> select writable -> write?
+
+ Buffers if not able to write, or raises EOFError (hah!)
+ """
+
+ if not self._buf :
+ # write directly
+ while data :
+ write = self._write(data)
+
+ if write :
+ # remaining data
+ data = data[write:]
+
+ else :
+ # cannot write more
+ break
+
+ if not data :
+ # sent
+ return
+
+ if self._buf is None :
+ # no write buffering, and socket buffer full!
+ raise EOFError()
+
+ # append to outgoing buffer
+ self._buf += data
+
+ def writeline (self, line, eol=EOL) :
+ """
+ Write out line.
+ """
+
+ log.debug("%s: %s", self, line)
+
+ self.write(str(line))
+ self.write(eol)
+
+ def __call__ (self, *lines) :
+ for line in lines :
+ self.writeline(line)
+
+ # TODO: flush
+
+ def __str__ (self) :
+ return socket_str(self.sock)
+
+