pvl/socket.py
author Tero Marttila <tero.marttila@aalto.fi>
Mon, 28 Jul 2014 13:32:41 +0300
changeset 35 4c7905e1cad7
parent 17 9bd35c091f05
permissions -rw-r--r--
version 0.5.1: bugfix for working around conflicting -c/--config options
"""
    A simple TCP client in the kind of syslog.fifo/file.

    Interface: fileno(), __iter__, __call__
"""

from __future__ import absolute_import

import errno
import select
import socket
import urlparse
import logging; log = logging.getLogger('pvl.socket')

from socket import SOCK_STREAM

def parse (url, port=None) :
    """
        Parse given string into host, port, path parts.

        >>> parse_url('')
        (None, None, None)
        >>> parse_url('foo')
        ('foo', None, None)
        >>> parse_url('foo:80')
        ('foo', 80, None)
        >>> parse_url('foo:http')
        ('foo', 'http', None)
        >>> parse_url('/run/foo.sock')
        (None, None, 'run/foo.sock')
    """

    if '/' in url :
        url, path = url.split('/', 1)
    else :
        path = None

    if ':' in url :
        url, port = url.split(':')

        if port.isdigit() :
            port = int(port)
    
    if url :
        host = url
    else :
        host = None

    return host, port, path

def connect (socktype, url, port=None, family=None) :
    """
        Returns a connected socket for given string, based on parse().
    """

    host, port, path = parse(url, port=port)
    
    # autodetect as AF_UNIX
    if path and not family :
        family = socket.AF_UNIX
    
    if family == socket.AF_UNIX :
        raise ValueError("TODO: AF_UNIX is not yet supported: %s" % (url, ))

    else : # AF_UNSPEC or AF_INET/AF_INET6
        return connect_inet(socktype, host, port, family=family)

def connect_inet (socktype, host, port, family=None) :
    """
        Return a TCP/UDP socket connected to the given host/port using getaddrinfo.

        socktype: SOCK_STREAM or SOCK_DGRAM
        host: IP address, hostname, or None for localhost.
        port: integer port, or string service.
        family: AF_UNSPEC for IP/DNS dependent IPv6/4, or AF_INET or AF_INET6.

        TODO: timeout?
    """

    log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)

    if family is None :
        family = socket.AF_UNSPEC
    
    if host :
        flags = socket.AI_CANONNAME
    else :
        flags = 0

    addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)

    if not addrinfo :
        raise Exception("getaddrinfo: %s:%s: no results" % (host, port))

    for af, st, proto, name, addr in addrinfo :
        try :
            sock = socket.socket(af, st, proto)

        except socket.error as error :
            log.warning("%s:%s: socket: %s", host, port, error)
            continue
        
        log.debug("%s:%s: socket: %s", host, port, sock)

        try :
            sock.connect(addr)

        except socket.error as error :
            log.warning("%s:%s: connect: %s", host, port, error)
            continue

        log.debug("%s:%s: connect", host, port)
        
        return sock

    else :
        raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))

def reverse (sockaddr, numeric_host=False, numeric_port=True) :
    """
        Resolve given sockaddr, returning (host, port).
    """

    flags = 0

    if numeric_host :
        flags |= socket.NI_NUMERICHOST
    
    if numeric_port :
        flags |= socket.NI_NUMERICSERV

    return socket.getnameinfo(sockaddr, flags)

def socket_str (sock) :
    """
        Return url str for socket peer.
    """

    # get connected peer
    try :
        peer = sock.getpeername()

    except socket.error as ex :
        # fails if socket is not connected XXX: even after EOF on read..?
        return str(ex)
    
    host, port = reverse(peer)
    
    return "{host}:{port}".format(host=host, port=port)

def nonblocking (call, *args, **kwargs) :
    """
        Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.

        Raises EOFError on SIGPIPE/EPIPE.

        # XXX: does python handle SIGPIPE for us?
    """

    try :
        return call(*args, **kwargs)

    except socket.error as ex :
        # block?
        if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
            # empty
            return None
        
        elif ex.errno == errno.EPIPE :
            # XXX: write-eof?
            raise EOFError()

        else :
            raise

class ReadStream (object) :
    """
        Buffered stream, supporting non-blocking/line-based reads.
    """

    BLOCK = 512

    def __init__ (self, sock, buffer=None) :
        """
            TODO: buffer    - maximum line length
        """

        self.sock = sock
        self._buf = ''

    def fileno (self) :
        return self.sock.fileno()

    def _read (self, block=BLOCK) :
        """
            Read up to n bytes from socket.
            
            Returns None if we would block.
            Raises EOFError on EOF.
        """
        
        buf = nonblocking(self.sock.recv, block)

        log.debug("%s: %s", self, buf)

        if buf is None :
            return None
        elif buf :
            return buf
        else :
            raise EOFError()

    def peek (self) :
        """
            Peek at data in buffer.
        """

        return self._buf

    def read (self) :
        """
            Read and return any available input.

            Returns None if blocking.
        """

        if self._buf :
            buf, self._buf = self._buf, ''
            
        else :
            buf = self._read()

        return buf

    __call__ = read

    def readline (self) :
        """
            Read and return next waiting line from input.

            Line is returned without trailing '\r\n' or '\n'.

            Returns None if there is no line available.

            XXX: trailing data in buf when _read() raises EOFError?
        """

        while '\n' not in self._buf :
            # read chunk
            read = self._read()

            if read is None :
                return None
            
            self._buf += read
        
        # split out one line
        line, self._buf = self._buf.split('\n', 1)
        
        # in case we had \r\n
        line = line.rstrip('\r')

        log.debug("%s: %s", self, line)

        return line
    
    def readlines (self) :
        """
            Read any available input, yielding lines.

            Returns None if thre is no more input available.

            Raises EOFError in the socket was closed.
        """

        while True :
            line = self.readline()

            if line is None :
                return
            else :
                yield line

    __iter__ = readlines

    def __str__ (self) :
        return socket_str(self.sock)

class WriteStream (object) :
    """
        Writable stream, supporting non-blocking/buffered writes.

        XXX: buffering is completely untested
    """
    
    EOL = '\n'

    def __init__ (self, sock, eol=None, buffer=None) :
        """
            TODO:   buffer  - maximum outgoing buffer length
        """

        self.sock = sock
        self._buf = buffer
        self.eol = eol or self.EOL

    def _write (self, buf) :
        """
            Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
        """
        
        send = nonblocking(self.sock.send, buf)
        
        # eof on write?
        if send is None :
            return None

        elif send :
            # ok, message (partially) written
            return send

        else :
            # XXX: zero-length send? how do we handle this? What does it actually mean?
            # handle as a wouldblock...
            return None

    def write (self, data) :
        """
            Write given data to socket.

            TODO: buffer small chunks -> select writable -> write?

            Buffers if not able to write, or raises EOFError (hah!)
        """

        if not self._buf :
            # write directly
            while data :
                write = self._write(data)
                
                if write :
                    # remaining data
                    data = data[write:]

                else :
                    # cannot write more
                    break

        if not data :
            # sent
            return

        if self._buf is None :
            # no write buffering, and socket buffer full!
            raise EOFError()

        # append to outgoing buffer
        self._buf += data

    def writeline (self, line, eol=None) :
        """
            Write out line.
        """

        log.debug("%s: %s", self, line)

        self.write(str(line))
        self.write(eol or self.eol)
    
    def __call__ (self, *lines) :
        for line in lines :
            self.writeline(line)
        
        # TODO: flush

    def __str__ (self) :
        return socket_str(self.sock)

class SockStream (object) :
    """
        A (TCP) socket connection.
        
        Supports nonblocking reads and line-oriented protoocls.
    """

    PORT = None
    EOL = None
    
    @classmethod
    def connect (cls, host, port=None, family=None, **opts) :
        """
            Blocking TCP client resolve/connect.

            Raises ??? if connect fails.
        """

        sock = connect(socket.SOCK_STREAM, host, port or cls.PORT,
                family=family,
        )

        tcp = cls(sock, **opts)
        log.info("%s", tcp)
        return tcp

    def __init__ (self, sock, nonblocking=None) :
        # store
        self.sock = sock
        
        # nonblocking mode?
        if nonblocking :
            self.sock.setblocking(not nonblocking)
        
        # read/write buffer
        self.read = ReadStream(sock)
        self.write = WriteStream(sock, eol=self.EOL)

        # cached endpoint names
        self._local_name = None
        self._remote_name = None
   
    def fileno (self) :
        return self.sock.fileno()

    def __iter__ (self) :
        return iter(self.read)

    def __call__ (self, *args, **kwargs) :
        return self.write(*args, **kwargs)

    @property
    def local (self) :
        """
            Resolve the local endpoint (host, port)
        """

        if not self._local_name :
            try :
                self._local_name, port = reverse(self.sock.getsockname())
            except socket.error as ex:
                return None

        return self._local_name
    
    @property
    def remote (self) :
        """
            Resolve the remote endpoint (host, port).
        """

        if not self._remote_name :
            try :
                self._remote_name, port = reverse(self.sock.getpeername())
            except socket.error as ex:
                return None

        return self._remote_name

    def __str__ (self) :
        return "%s" % self.remote