"""
A simple TCP client in the kind of syslog.fifo/file.
Interface: fileno(), __iter__, __call__
"""
# XXX: absolute import plz
socket = __import__('socket')
import select
import errno
import urlparse
import logging; log = logging.getLogger('pvl.socket')
# order matters!
URL = (
# scheme family socktype
( 'unix', (socket.AF_UNIX, None ) ), # socktype is given
( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC
( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC
)
URL_SCHEMES = dict(URL)
def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
"""
Parse given string into (AF_*, SOCK_*, host, port).
For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
"""
family, socktype = URL_SCHEMES[scheme]
url = urlparse.urlparse(str)
# TODO: UNIX?
if url.scheme and url.netloc :
# proper url
family, socktype = URL_SCHEMES[url.scheme]
return family, socktype, url.hostname, url.port or port
elif url.scheme and url.path :
# host:port
return family, socktype, url.scheme, int(url.path)
elif url.path :
# host
return family, socktype, url.path, port
else :
raise ValueError("unparseable connect URL: %s", str)
def connect (str, *args, **kwargs) :
"""
Returns a connected socket for given parse()'d string.
"""
family, socktype, host, port = parse(str, *args, **kwargs)
if family == socket.AF_UNIX :
raise ValueError("XXX: AF_UNIX is not yet supported", str)
else : # AF_UNSPEC
return connect_inet(host, port, family=family, socktype=socktype)
def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
"""
Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
TODO: timeout?
"""
log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
if host :
flags = socket.AI_CANONNAME
else :
flags = 0
addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
if not addrinfo :
raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
for af, st, proto, name, addr in addrinfo :
try :
sock = socket.socket(af, st, proto)
except socket.error as error :
log.warning("%s:%s: socket: %s", host, port, error)
continue
log.debug("%s:%s: socket: %s", host, port, sock)
try :
sock.connect(addr)
except socket.error as error :
log.warning("%s:%s: connect: %s", host, port, error)
continue
log.debug("%s:%s: connect", host, port)
log.info("%s", name)
return sock
else :
raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
def reverse (sockaddr, numeric_host=False, numeric_port=True) :
"""
Resolve given sockaddr, returning (host, port).
"""
flags = 0
if numeric_host :
flags |= socket.NI_NUMERICHOST
if numeric_port :
flags |= socket.NI_NUMERICSERV
return socket.getnameinfo(sockaddr, flags)
def socket_str (sock) :
# get connected peer
try :
peer = sock.getpeername()
except socket.error as ex :
# fails if socket is not connected XXX: even after EOF on read..?
return str(ex)
# lookup scheme
for scheme, (family, socktype) in URL :
if family and family != sock.family :
continue
elif socktype and socktype != sock.type :
continue
else :
break
else :
scheme = None
host, port = reverse(peer)
if scheme :
return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
else :
return "{host}:{port}".format(host=host, port=port)
def nonblocking (call, *args, **kwargs) :
"""
Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
Raises EOFError on SIGPIPE/EPIPE.
# XXX: does python handle SIGPIPE for us?
"""
try :
return call(*args, **kwargs)
except socket.error as ex :
# block?
if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
# empty
return None
elif ex.errno == errno.EPIPE :
# XXX: write-eof?
raise EOFError()
else :
raise
class ReadStream (object) :
"""
Buffered stream, supporting non-blocking/line-based reads.
"""
BLOCK=512
def __init__ (self, sock, buffer=None) :
"""
TODO: buffer - maximum line length
"""
self.sock = sock
self._buf = ''
def fileno (self) :
return self.sock.fileno()
def _read (self, block=BLOCK) :
"""
Read up to n bytes from socket.
Returns None if we would block.
Raises EOFError on EOF.
"""
buf = nonblocking(self.sock.recv, block)
log.debug("%s: %s", self, buf)
if buf is None :
return None
elif buf :
return buf
else :
raise EOFError()
def peek (self) :
"""
Peek at data in buffer.
"""
return self._buf
def read (self) :
"""
Read and return any available input.
Returns None if blocking.
"""
if self._buf :
buf, self._buf = self._buf, ''
else :
buf = self._read()
return buf
def readline (self) :
"""
Read and return next waiting line from input.
Line is returned without trailing '\r\n' or '\n'.
Returns None if there is no line available.
XXX: trailing data in buf when _read() raises EOFError?
"""
while '\n' not in self._buf :
# read chunk
read = self._read()
if read is None :
return None
self._buf += read
# split out one line
line, self._buf = self._buf.split('\n', 1)
# in case we had \r\n
line = line.rstrip('\r')
log.debug("%s: %s", self, line)
return line
def readlines (self) :
"""
Read any available input, yielding lines.
Returns None if thre is no more input available.
Raises EOFError in the socket was closed.
"""
while True :
line = self.readline()
if line is None :
return
else :
yield line
__iter__ = readlines
def __str__ (self) :
return socket_str(self.sock)
class WriteStream (object) :
"""
Writable stream, supporting non-blocking/buffered writes.
XXX: buffering is completely untested
"""
EOL = '\n'
def __init__ (self, sock, buffer=None) :
"""
TODO: buffer - maximum outgoing buffer length
"""
self.sock = sock
self._buf = buffer
def _write (self, buf) :
"""
Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
"""
send = nonblocking(self.sock.send, buf)
# eof on write?
if send is None :
return None
elif send :
# ok, message (partially) written
return send
else :
# XXX: zero-length send? how do we handle this? What does it actually mean?
# handle as a wouldblock...
return None
def write (self, data) :
"""
Write given data to socket.
TODO: buffer small chunks -> select writable -> write?
Buffers if not able to write, or raises EOFError (hah!)
"""
if not self._buf :
# write directly
while data :
write = self._write(data)
if write :
# remaining data
data = data[write:]
else :
# cannot write more
break
if not data :
# sent
return
if self._buf is None :
# no write buffering, and socket buffer full!
raise EOFError()
# append to outgoing buffer
self._buf += data
def writeline (self, line, eol=EOL) :
"""
Write out line.
"""
log.debug("%s: %s", self, line)
self.write(str(line))
self.write(eol)
def __call__ (self, *lines) :
for line in lines :
self.writeline(line)
# TODO: flush
def __str__ (self) :
return socket_str(self.sock)