pvl/socket.py
author Tero Marttila <terom@paivola.fi>
Sun, 20 Jan 2013 15:39:44 +0200
changeset 144 9966d35a63df
parent 139 515b74c6b456
permissions -rw-r--r--
pvl.socket: Raise EOFError on write() EPIPE, fix socket_str when not connected
"""
    A simple TCP client in the kind of syslog.fifo/file.

    Interface: fileno(), __iter__, __call__
"""

# XXX: absolute import plz
socket = __import__('socket')

import select
import errno

import urlparse

import logging; log = logging.getLogger('pvl.socket')

# order matters!
URL = (
    # scheme    family              socktype
    ( 'unix',   (socket.AF_UNIX,    None                )   ), # socktype is given
    ( 'tcp',    (0,                 socket.SOCK_STREAM  )   ), # AF_UNSPEC
    ( 'udp',    (0,                 socket.SOCK_DGRAM   )   ), # AF_UNSPEC
)

URL_SCHEMES = dict(URL)

def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
    """
        Parse given string into (AF_*, SOCK_*, host, port).

        For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
    """
   
    family, socktype = URL_SCHEMES[scheme]
    url = urlparse.urlparse(str)
    
    # TODO: UNIX?
    if url.scheme and url.netloc :
        # proper url
        family, socktype = URL_SCHEMES[url.scheme]

        return family, socktype, url.hostname, url.port or port

    elif url.scheme and url.path :
        # host:port
        return family, socktype, url.scheme, int(url.path)

    elif url.path :
        # host
        return family, socktype, url.path, port

    else :
        raise ValueError("unparseable connect URL: %s", str)

def connect (str, *args, **kwargs) :
    """
        Returns a connected socket for given parse()'d string.
    """

    family, socktype, host, port = parse(str, *args, **kwargs)

    if family == socket.AF_UNIX :
        raise ValueError("XXX: AF_UNIX is not yet supported", str)

    else : # AF_UNSPEC
        return connect_inet(host, port, family=family, socktype=socktype)
 
def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
    """
        Return a TCP/UDP socket connected to the given host/port using getaddrinfo.

        TODO: timeout?
    """

    log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
    
    if host :
        flags = socket.AI_CANONNAME
    else :
        flags = 0

    addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)

    if not addrinfo :
        raise Exception("getaddrinfo: %s:%s: no results" % (host, port))

    for af, st, proto, name, addr in addrinfo :
        try :
            sock = socket.socket(af, st, proto)

        except socket.error as error :
            log.warning("%s:%s: socket: %s", host, port, error)
            continue
        
        log.debug("%s:%s: socket: %s", host, port, sock)

        try :
            sock.connect(addr)

        except socket.error as error :
            log.warning("%s:%s: connect: %s", host, port, error)
            continue

        log.debug("%s:%s: connect", host, port)
        log.info("%s", name)
        
        return sock

    else :
        raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))

def reverse (sockaddr, numeric_host=False, numeric_port=True) :
    """
        Resolve given sockaddr, returning (host, port).
    """

    flags = 0

    if numeric_host :
        flags |= socket.NI_NUMERICHOST
    
    if numeric_port :
        flags |= socket.NI_NUMERICSERV

    return socket.getnameinfo(sockaddr, flags)

def socket_str (sock) :
    # get connected peer
    try :
        peer = sock.getpeername()

    except socket.error as ex :
        # fails if socket is not connected XXX: even after EOF on read..?
        return str(ex)
    
    # lookup scheme
    for scheme, (family, socktype) in URL :
        if family and family != sock.family :
            continue
        elif socktype and socktype != sock.type :
            continue
        else :
            break
    else :
        scheme = None

    host, port = reverse(peer)
    
    if scheme :
        return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
    else :
        return "{host}:{port}".format(host=host, port=port)

def nonblocking (call, *args, **kwargs) :
    """
        Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.

        Raises EOFError on SIGPIPE/EPIPE.

        # XXX: does python handle SIGPIPE for us?
    """

    try :
        return call(*args, **kwargs)

    except socket.error as ex :
        # block?
        if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
            # empty
            return None
        
        elif ex.errno == errno.EPIPE :
            # XXX: write-eof?
            raise EOFError()

        else :
            raise

class ReadStream (object) :
    """
        Buffered stream, supporting non-blocking/line-based reads.
    """

    BLOCK=512

    def __init__ (self, sock, buffer=None) :
        """
            TODO: buffer    - maximum line length
        """

        self.sock = sock
        self._buf = ''

    def fileno (self) :
        return self.sock.fileno()

    def _read (self, block=BLOCK) :
        """
            Read up to n bytes from socket.
            
            Returns None if we would block.
            Raises EOFError on EOF.
        """
        
        buf = nonblocking(self.sock.recv, block)

        log.debug("%s: %s", self, buf)

        if buf is None :
            return None
        elif buf :
            return buf
        else :
            raise EOFError()

    def peek (self) :
        """
            Peek at data in buffer.
        """

        return self._buf

    def read (self) :
        """
            Read and return any available input.

            Returns None if blocking.
        """

        if self._buf :
            buf, self._buf = self._buf, ''
            
        else :
            buf = self._read()

        return buf

    def readline (self) :
        """
            Read and return next waiting line from input.

            Line is returned without trailing '\r\n' or '\n'.

            Returns None if there is no line available.

            XXX: trailing data in buf when _read() raises EOFError?
        """

        while '\n' not in self._buf :
            # read chunk
            read = self._read()

            if read is None :
                return None
            
            self._buf += read
        
        # split out one line
        line, self._buf = self._buf.split('\n', 1)
        
        # in case we had \r\n
        line = line.rstrip('\r')

        log.debug("%s: %s", self, line)

        return line
    
    def readlines (self) :
        """
            Read any available input, yielding lines.

            Returns None if thre is no more input available.

            Raises EOFError in the socket was closed.
        """

        while True :
            line = self.readline()

            if line is None :
                return
            else :
                yield line

    __iter__ = readlines

    def __str__ (self) :
        return socket_str(self.sock)

class WriteStream (object) :
    """
        Writable stream, supporting non-blocking/buffered writes.

        XXX: buffering is completely untested
    """
    
    EOL = '\n'

    def __init__ (self, sock, buffer=None) :
        """
            TODO:   buffer  - maximum outgoing buffer length
        """

        self.sock = sock
        self._buf = buffer

    def _write (self, buf) :
        """
            Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
        """
        
        send = nonblocking(self.sock.send, buf)
        
        # eof on write?
        if send is None :
            return None

        elif send :
            # ok, message (partially) written
            return send

        else :
            # XXX: zero-length send? how do we handle this? What does it actually mean?
            # handle as a wouldblock...
            return None

    def write (self, data) :
        """
            Write given data to socket.

            TODO: buffer small chunks -> select writable -> write?

            Buffers if not able to write, or raises EOFError (hah!)
        """

        if not self._buf :
            # write directly
            while data :
                write = self._write(data)
                
                if write :
                    # remaining data
                    data = data[write:]

                else :
                    # cannot write more
                    break

        if not data :
            # sent
            return

        if self._buf is None :
            # no write buffering, and socket buffer full!
            raise EOFError()

        # append to outgoing buffer
        self._buf += data

    def writeline (self, line, eol=EOL) :
        """
            Write out line.
        """

        log.debug("%s: %s", self, line)

        self.write(str(line))
        self.write(eol)
    
    def __call__ (self, *lines) :
        for line in lines :
            self.writeline(line)
        
        # TODO: flush

    def __str__ (self) :
        return socket_str(self.sock)