"""
A simple TCP client in the kind of syslog.fifo/file.
Interface: fileno(), __iter__, __call__
"""
from __future__ import absolute_import
import errno
import select
import socket
import urlparse
import logging; log = logging.getLogger('pvl.socket')
from socket import SOCK_STREAM
def parse (url, port=None) :
"""
Parse given string into host, port, path parts.
>>> parse_url('')
(None, None, None)
>>> parse_url('foo')
('foo', None, None)
>>> parse_url('foo:80')
('foo', 80, None)
>>> parse_url('foo:http')
('foo', 'http', None)
>>> parse_url('/run/foo.sock')
(None, None, 'run/foo.sock')
"""
if '/' in url :
url, path = url.split('/', 1)
else :
path = None
if ':' in url :
url, port = url.split(':')
if port.isdigit() :
port = int(port)
if url :
host = url
else :
host = None
return host, port, path
def connect (socktype, url, port=None, family=None) :
"""
Returns a connected socket for given string, based on parse().
"""
host, port, path = parse(url, port=port)
# autodetect as AF_UNIX
if path and not family :
family = socket.AF_UNIX
if family == socket.AF_UNIX :
raise ValueError("TODO: AF_UNIX is not yet supported: %s" % (url, ))
else : # AF_UNSPEC or AF_INET/AF_INET6
return connect_inet(socktype, host, port, family=family)
def connect_inet (socktype, host, port, family=None) :
"""
Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
socktype: SOCK_STREAM or SOCK_DGRAM
host: IP address, hostname, or None for localhost.
port: integer port, or string service.
family: AF_UNSPEC for IP/DNS dependent IPv6/4, or AF_INET or AF_INET6.
TODO: timeout?
"""
log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
if family is None :
family = socket.AF_UNSPEC
if host :
flags = socket.AI_CANONNAME
else :
flags = 0
addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
if not addrinfo :
raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
for af, st, proto, name, addr in addrinfo :
try :
sock = socket.socket(af, st, proto)
except socket.error as error :
log.warning("%s:%s: socket: %s", host, port, error)
continue
log.debug("%s:%s: socket: %s", host, port, sock)
try :
sock.connect(addr)
except socket.error as error :
log.warning("%s:%s: connect: %s", host, port, error)
continue
log.debug("%s:%s: connect", host, port)
return sock
else :
raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
def reverse (sockaddr, numeric_host=False, numeric_port=True) :
"""
Resolve given sockaddr, returning (host, port).
"""
flags = 0
if numeric_host :
flags |= socket.NI_NUMERICHOST
if numeric_port :
flags |= socket.NI_NUMERICSERV
return socket.getnameinfo(sockaddr, flags)
def socket_str (sock) :
"""
Return url str for socket peer.
"""
# get connected peer
try :
peer = sock.getpeername()
except socket.error as ex :
# fails if socket is not connected XXX: even after EOF on read..?
return str(ex)
host, port = reverse(peer)
return "{host}:{port}".format(host=host, port=port)
def nonblocking (call, *args, **kwargs) :
"""
Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
Raises EOFError on SIGPIPE/EPIPE.
# XXX: does python handle SIGPIPE for us?
"""
try :
return call(*args, **kwargs)
except socket.error as ex :
# block?
if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
# empty
return None
elif ex.errno == errno.EPIPE :
# XXX: write-eof?
raise EOFError()
else :
raise
class ReadStream (object) :
"""
Buffered stream, supporting non-blocking/line-based reads.
"""
BLOCK = 512
def __init__ (self, sock, buffer=None) :
"""
TODO: buffer - maximum line length
"""
self.sock = sock
self._buf = ''
def fileno (self) :
return self.sock.fileno()
def _read (self, block=BLOCK) :
"""
Read up to n bytes from socket.
Returns None if we would block.
Raises EOFError on EOF.
"""
buf = nonblocking(self.sock.recv, block)
log.debug("%s: %s", self, buf)
if buf is None :
return None
elif buf :
return buf
else :
raise EOFError()
def peek (self) :
"""
Peek at data in buffer.
"""
return self._buf
def read (self) :
"""
Read and return any available input.
Returns None if blocking.
"""
if self._buf :
buf, self._buf = self._buf, ''
else :
buf = self._read()
return buf
__call__ = read
def readline (self) :
"""
Read and return next waiting line from input.
Line is returned without trailing '\r\n' or '\n'.
Returns None if there is no line available.
XXX: trailing data in buf when _read() raises EOFError?
"""
while '\n' not in self._buf :
# read chunk
read = self._read()
if read is None :
return None
self._buf += read
# split out one line
line, self._buf = self._buf.split('\n', 1)
# in case we had \r\n
line = line.rstrip('\r')
log.debug("%s: %s", self, line)
return line
def readlines (self) :
"""
Read any available input, yielding lines.
Returns None if thre is no more input available.
Raises EOFError in the socket was closed.
"""
while True :
line = self.readline()
if line is None :
return
else :
yield line
__iter__ = readlines
def __str__ (self) :
return socket_str(self.sock)
class WriteStream (object) :
"""
Writable stream, supporting non-blocking/buffered writes.
XXX: buffering is completely untested
"""
EOL = '\n'
def __init__ (self, sock, eol=None, buffer=None) :
"""
TODO: buffer - maximum outgoing buffer length
"""
self.sock = sock
self._buf = buffer
self.eol = eol or self.EOL
def _write (self, buf) :
"""
Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
"""
send = nonblocking(self.sock.send, buf)
# eof on write?
if send is None :
return None
elif send :
# ok, message (partially) written
return send
else :
# XXX: zero-length send? how do we handle this? What does it actually mean?
# handle as a wouldblock...
return None
def write (self, data) :
"""
Write given data to socket.
TODO: buffer small chunks -> select writable -> write?
Buffers if not able to write, or raises EOFError (hah!)
"""
if not self._buf :
# write directly
while data :
write = self._write(data)
if write :
# remaining data
data = data[write:]
else :
# cannot write more
break
if not data :
# sent
return
if self._buf is None :
# no write buffering, and socket buffer full!
raise EOFError()
# append to outgoing buffer
self._buf += data
def writeline (self, line, eol=None) :
"""
Write out line.
"""
log.debug("%s: %s", self, line)
self.write(str(line))
self.write(eol or self.eol)
def __call__ (self, *lines) :
for line in lines :
self.writeline(line)
# TODO: flush
def __str__ (self) :
return socket_str(self.sock)
class SockStream (object) :
"""
A (TCP) socket connection.
Supports nonblocking reads and line-oriented protoocls.
"""
PORT = None
EOL = None
@classmethod
def connect (cls, host, port=None, family=None, **opts) :
"""
Blocking TCP client resolve/connect.
Raises ??? if connect fails.
"""
sock = connect(socket.SOCK_STREAM, host, port or cls.PORT,
family=family,
)
tcp = cls(sock, **opts)
log.info("%s", tcp)
return tcp
def __init__ (self, sock, nonblocking=None) :
# store
self.sock = sock
# nonblocking mode?
if nonblocking :
self.sock.setblocking(not nonblocking)
# read/write buffer
self.read = ReadStream(sock)
self.write = WriteStream(sock, eol=self.EOL)
# cached endpoint names
self._local_name = None
self._remote_name = None
def fileno (self) :
return self.sock.fileno()
def __iter__ (self) :
return iter(self.read)
def __call__ (self, *args, **kwargs) :
return self.write(*args, **kwargs)
@property
def local (self) :
"""
Resolve the local endpoint (host, port)
"""
if not self._local_name :
try :
self._local_name, port = reverse(self.sock.getsockname())
except socket.error as ex:
return None
return self._local_name
@property
def remote (self) :
"""
Resolve the remote endpoint (host, port).
"""
if not self._remote_name :
try :
self._remote_name, port = reverse(self.sock.getpeername())
except socket.error as ex:
return None
return self._remote_name
def __str__ (self) :
return "%s" % self.remote