terom@116: """ terom@116: A simple TCP client in the kind of syslog.fifo/file. terom@116: terom@116: Interface: fileno(), __iter__, __call__ terom@116: """ terom@116: terom@116: # XXX: absolute import plz terom@116: socket = __import__('socket') terom@116: terom@116: import select terom@118: import errno terom@116: terom@116: import urlparse terom@116: terom@116: import logging; log = logging.getLogger('pvl.socket') terom@116: terom@118: # order matters! terom@118: URL = ( terom@118: # scheme family socktype terom@118: ( 'unix', (socket.AF_UNIX, None ) ), # socktype is given terom@118: ( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC terom@118: ( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC terom@118: ) terom@118: terom@118: URL_SCHEMES = dict(URL) terom@116: terom@116: def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) : terom@116: """ terom@116: Parse given string into (AF_*, SOCK_*, host, port). terom@116: terom@116: For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value. terom@116: """ terom@116: terom@118: family, socktype = URL_SCHEMES[scheme] terom@116: url = urlparse.urlparse(str) terom@116: terom@116: # TODO: UNIX? terom@116: if url.scheme and url.netloc : terom@116: # proper url terom@118: family, socktype = URL_SCHEMES[url.scheme] terom@116: terom@116: return family, socktype, url.hostname, url.port or port terom@116: terom@116: elif url.scheme and url.path : terom@116: # host:port terom@116: return family, socktype, url.scheme, int(url.path) terom@116: terom@116: elif url.path : terom@116: # host terom@116: return family, socktype, url.path, port terom@116: terom@116: else : terom@116: raise ValueError("unparseable connect URL: %s", str) terom@116: terom@116: def connect (str, *args, **kwargs) : terom@116: """ terom@116: Returns a connected socket for given parse()'d string. terom@116: """ terom@116: terom@116: family, socktype, host, port = parse(str, *args, **kwargs) terom@116: terom@116: if family == socket.AF_UNIX : terom@116: raise ValueError("XXX: AF_UNIX is not yet supported", str) terom@118: terom@116: else : # AF_UNSPEC terom@116: return connect_inet(host, port, family=family, socktype=socktype) terom@116: terom@116: def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) : terom@116: """ terom@116: Return a TCP/UDP socket connected to the given host/port using getaddrinfo. terom@116: terom@116: TODO: timeout? terom@116: """ terom@116: terom@116: log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype) terom@116: terom@116: if host : terom@116: flags = socket.AI_CANONNAME terom@116: else : terom@116: flags = 0 terom@116: terom@116: addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags) terom@116: terom@116: if not addrinfo : terom@116: raise Exception("getaddrinfo: %s:%s: no results" % (host, port)) terom@116: terom@116: for af, st, proto, name, addr in addrinfo : terom@116: try : terom@116: sock = socket.socket(af, st, proto) terom@116: terom@116: except socket.error as error : terom@116: log.warning("%s:%s: socket: %s", host, port, error) terom@116: continue terom@116: terom@116: log.debug("%s:%s: socket: %s", host, port, sock) terom@116: terom@116: try : terom@116: sock.connect(addr) terom@116: terom@116: except socket.error as error : terom@116: log.warning("%s:%s: connect: %s", host, port, error) terom@116: continue terom@116: terom@116: log.debug("%s:%s: connect", host, port) terom@116: log.info("%s", name) terom@116: terom@116: return sock terom@116: terom@116: else : terom@116: raise Exception("Unable to connect: %s:%s: %s" % (host, port, error)) terom@116: terom@118: def reverse (sockaddr, numeric_host=False, numeric_port=True) : terom@116: """ terom@116: Resolve given sockaddr, returning (host, port). terom@116: """ terom@116: terom@116: flags = 0 terom@116: terom@116: if numeric_host : terom@116: flags |= socket.NI_NUMERICHOST terom@116: terom@116: if numeric_port : terom@116: flags |= socket.NI_NUMERICSERV terom@116: terom@116: return socket.getnameinfo(sockaddr, flags) terom@116: terom@118: def socket_str (sock) : terom@144: # get connected peer terom@144: try : terom@144: peer = sock.getpeername() terom@144: terom@144: except socket.error as ex : terom@144: # fails if socket is not connected XXX: even after EOF on read..? terom@144: return str(ex) terom@118: terom@118: # lookup scheme terom@118: for scheme, (family, socktype) in URL : terom@118: if family and family != sock.family : terom@118: continue terom@118: elif socktype and socktype != sock.type : terom@118: continue terom@118: else : terom@118: break terom@118: else : terom@118: scheme = None terom@118: terom@118: host, port = reverse(peer) terom@118: terom@118: if scheme : terom@118: return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port) terom@118: else : terom@118: return "{host}:{port}".format(host=host, port=port) terom@118: terom@116: def nonblocking (call, *args, **kwargs) : terom@116: """ terom@116: Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked. terom@144: terom@144: Raises EOFError on SIGPIPE/EPIPE. terom@144: terom@144: # XXX: does python handle SIGPIPE for us? terom@116: """ terom@116: terom@116: try : terom@116: return call(*args, **kwargs) terom@116: terom@116: except socket.error as ex : terom@116: # block? terom@116: if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK: terom@116: # empty terom@116: return None terom@144: terom@144: elif ex.errno == errno.EPIPE : terom@144: # XXX: write-eof? terom@144: raise EOFError() terom@116: terom@116: else : terom@116: raise terom@116: terom@116: class ReadStream (object) : terom@116: """ terom@116: Buffered stream, supporting non-blocking/line-based reads. terom@116: """ terom@116: terom@116: BLOCK=512 terom@116: terom@116: def __init__ (self, sock, buffer=None) : terom@116: """ terom@116: TODO: buffer - maximum line length terom@116: """ terom@116: terom@116: self.sock = sock terom@116: self._buf = '' terom@116: terom@116: def fileno (self) : terom@116: return self.sock.fileno() terom@116: terom@116: def _read (self, block=BLOCK) : terom@116: """ terom@116: Read up to n bytes from socket. terom@116: terom@116: Returns None if we would block. terom@116: Raises EOFError on EOF. terom@116: """ terom@116: terom@116: buf = nonblocking(self.sock.recv, block) terom@116: terom@118: log.debug("%s: %s", self, buf) terom@118: terom@120: if buf is None : terom@120: return None terom@120: elif buf : terom@118: return buf terom@118: else : terom@116: raise EOFError() terom@116: terom@116: def peek (self) : terom@116: """ terom@116: Peek at data in buffer. terom@116: """ terom@116: terom@116: return self._buf terom@116: terom@116: def read (self) : terom@116: """ terom@116: Read and return any available input. terom@116: terom@116: Returns None if blocking. terom@116: """ terom@116: terom@116: if self._buf : terom@116: buf, self._buf = self._buf, '' terom@116: terom@116: else : terom@116: buf = self._read() terom@116: terom@116: return buf terom@116: terom@116: def readline (self) : terom@116: """ terom@116: Read and return next waiting line from input. terom@116: terom@116: Line is returned without trailing '\r\n' or '\n'. terom@116: terom@116: Returns None if there is no line available. terom@139: terom@139: XXX: trailing data in buf when _read() raises EOFError? terom@116: """ terom@116: terom@116: while '\n' not in self._buf : terom@116: # read chunk terom@116: read = self._read() terom@116: terom@116: if read is None : terom@116: return None terom@116: terom@116: self._buf += read terom@116: terom@116: # split out one line terom@116: line, self._buf = self._buf.split('\n', 1) terom@116: terom@116: # in case we had \r\n terom@116: line = line.rstrip('\r') terom@116: terom@118: log.debug("%s: %s", self, line) terom@118: terom@116: return line terom@116: terom@116: def readlines (self) : terom@116: """ terom@116: Read any available input, yielding lines. terom@116: terom@116: Returns None if thre is no more input available. terom@116: terom@116: Raises EOFError in the socket was closed. terom@116: """ terom@116: terom@116: while True : terom@116: line = self.readline() terom@116: terom@116: if line is None : terom@116: return terom@116: else : terom@116: yield line terom@116: terom@116: __iter__ = readlines terom@116: terom@118: def __str__ (self) : terom@118: return socket_str(self.sock) terom@118: terom@116: class WriteStream (object) : terom@116: """ terom@116: Writable stream, supporting non-blocking/buffered writes. terom@116: terom@116: XXX: buffering is completely untested terom@116: """ terom@116: terom@116: EOL = '\n' terom@116: terom@116: def __init__ (self, sock, buffer=None) : terom@116: """ terom@116: TODO: buffer - maximum outgoing buffer length terom@116: """ terom@116: terom@116: self.sock = sock terom@116: self._buf = buffer terom@116: terom@116: def _write (self, buf) : terom@116: """ terom@116: Write given data to socket, returning the number of bytes written, or None, if buffering is enabled. terom@116: """ terom@116: terom@116: send = nonblocking(self.sock.send, buf) terom@116: terom@116: # eof on write? terom@120: if send is None : terom@120: return None terom@120: terom@120: elif send : terom@116: # ok, message (partially) written terom@116: return send terom@116: terom@116: else : terom@120: # XXX: zero-length send? how do we handle this? What does it actually mean? terom@116: # handle as a wouldblock... terom@116: return None terom@116: terom@116: def write (self, data) : terom@116: """ terom@116: Write given data to socket. terom@116: terom@116: TODO: buffer small chunks -> select writable -> write? terom@116: terom@116: Buffers if not able to write, or raises EOFError (hah!) terom@116: """ terom@116: terom@116: if not self._buf : terom@116: # write directly terom@116: while data : terom@116: write = self._write(data) terom@116: terom@116: if write : terom@116: # remaining data terom@116: data = data[write:] terom@116: terom@116: else : terom@116: # cannot write more terom@116: break terom@116: terom@116: if not data : terom@116: # sent terom@116: return terom@116: terom@116: if self._buf is None : terom@116: # no write buffering, and socket buffer full! terom@116: raise EOFError() terom@116: terom@116: # append to outgoing buffer terom@116: self._buf += data terom@116: terom@116: def writeline (self, line, eol=EOL) : terom@116: """ terom@116: Write out line. terom@116: """ terom@116: terom@118: log.debug("%s: %s", self, line) terom@118: terom@118: self.write(str(line)) terom@118: self.write(eol) terom@116: terom@116: def __call__ (self, *lines) : terom@116: for line in lines : terom@116: self.writeline(line) terom@116: terom@116: # TODO: flush terom@118: terom@118: def __str__ (self) : terom@118: return socket_str(self.sock) terom@118: terom@118: