pvl.irk: split off url/socket stuff into pvl.socket, clarify Irk __call__/__iter__ iterface
authorTero Marttila <terom@paivola.fi>
Sun, 13 Jan 2013 01:52:49 +0200
changeset 116 89b7385d19ba
parent 115 9772d43669fb
child 117 58aebcd35e1a
pvl.irk: split off url/socket stuff into pvl.socket, clarify Irk __call__/__iter__ iterface
pvl/irk.py
pvl/socket.py
--- a/pvl/irk.py	Sun Jan 13 01:52:00 2013 +0200
+++ b/pvl/irk.py	Sun Jan 13 01:52:49 2013 +0200
@@ -2,16 +2,15 @@
     Irker client.
 """
 
+import pvl.syslog.file # for stdin
+import pvl.socket # for tcp
+
 import optparse, sys
 
 import logging; log = logging.getLogger('pvl.irk')
 
-# proto
 import json
 
-# XXX: socket
-import socket, select
-
 def parser (parser, connect='tcp://localhost/', target=None) :
     """
         Optparse option group.
@@ -35,107 +34,56 @@
     # None -> stdout
     return Irker(options.irker, options.irker_notice)
 
-def connect (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
-    """
-        Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
-
-        TODO: timeout?
-    """
-
-    log.debug("%s:%s: %s/%s", host, port, family, socktype)
-    
-    if host :
-        flags = socket.AI_CANONNAME
-    else :
-        flags = 0
-
-    addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
-
-    if not addrinfo :
-        raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
-
-    for af, st, proto, name, addr in addrinfo :
-        try :
-            s = socket.socket(af, st, proto)
-
-        except socket.error as error :
-            log.warning("%s:%s: socket: %s", host, port, error)
-            continue
-        
-        try :
-            s.connect(addr)
-
-        except socket.error as error :
-            log.warning("%s:%s: connect: %s", host, port, error)
-            continue
-        
-        log.info("%s", name)
-        
-        return s
-
-    else :
-        raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
-
-import urlparse
-
 class Irk (object) :
     """
-        Irker JSON connection.
+        Irker JSON connection speaks JSON over a stream.
 
-        TODO: timeout?
+        TODO: timeouts?
     """
 
     PORT = 6659
 
-    SCHEME = {
-        'tcp':  (socket.AF_INET, socket.SOCK_STREAM),
-        'udp':  (socket.AF_INET, socket.SOCK_DGRAM),
-        'unix': (socket.AF_UNIX, socket.SOCK_DGRAM),
-    }
-
     @classmethod
     def connect (cls, url) :
         """
-            Connect to given urllib URL, or None -> stdout
+            Connect to given URL string, or None -> stdout
         """
 
         if not url :
-            return cls(sys.stdout, recv=False)
+            # no read
+            return cls(pvl.syslog.file.File(sys.stdout), recv=False)
 
-        family, socktype = cls.SCHEME[url.scheme]
-        
-        if family == socket.AF_UNIX :
-            raise Exception("unix:// is not supported")
         else :
-            # inet
-            sock = connect(url.hostname, url.port or cls.PORT, family=family, socktype=socktype)
+            sock = pvl.socket.connect(url, port=cls.PORT)
 
-        # XXX: just to make things a bit more exciting... and we really don't want to be blocking on our output..
-        sock.setblocking(False)
-        
-        return cls(sock.makefile('w'), recv=sock)
+            # just to make things a bit more exciting... and we really don't want to be blocking on our output..
+            sock.setblocking(False)
+            
+            return cls(
+                    pvl.socket.WriteStream(sock, buffer=None),
+                    pvl.socket.ReadStream(sock)
+            )
 
-    def __init__ (self, file, recv=None) :
+    def __init__ (self, send, recv=None) :
         """
             Use given file-like object (write, flush, fileno) for output.
         """
 
-        self.file = file
-
-        # XXX
+        self.send = send
         self.recv = recv
-        self._buf = ''
-
-        log.debug("%s", file)
+        
+        log.debug("%s <-> %s", send, recv)
 
     def fileno (self) :
         """
             Return fd. Useful for detecting error conditions (connection lost).
+
+            Only valid if self.recv is True.
         """
 
         return self.recv.fileno()
 
-    def send (self, **opts) :
+    def __call__ (self, **opts) :
         """
             Raises IOError on write errors.
         """
@@ -143,65 +91,18 @@
         log.debug("%s", opts)
         
         # write line + flush
-        json.dump(opts, self.file)
-        self.file.write('\n')
-        self.file.flush()
-
-    def read (self, size=512) :
-        """
-            Recieve data back from irkerd.
-            
-            Raise EOFError.
-
-            XXX: this is seriously crazy on a buffered file-like object..?
-        """
+        self.send(json.dumps(opts))
         
-        # poll
-        read, _, _ = select.select((self.recv, ), (), (), 0.0)
-        
-        if read :
-            read = self.recv.recv(size)
+        # XXX: self.send.flush()
 
-            if read :
-                return read
-            else :
-                raise EOFError()
-        else :
-            return None # block
-    
-    def readline (self) :
+    def __iter__ (self) :
         """
-            Yield line of input, or None.
-
-            Raise EOFError.
+            Yield JSON inputs from source.
         """
 
-        while '\n' not in self._buf :
-            read = self.read()
-
-            if not read :
-                return None # block
-            
-            self._buf += read
-
-        line, self._buf = self._buf.split('\n')
-
-        return line
-    
-    def readlines (self) :
-        """
-            Yield lines of input, until blocking.
-        """
-
-        while True :
-            line = self.readline()
-
-            if line :
-                yield line
-            else :
-                return
-    
-    __iter__ = readlines
+        for line in self.recv :
+            # XXX: error handling?
+            yield json.loads(line)
 
 class IrkerTarget (object) :
     """
@@ -216,17 +117,17 @@
         
     def join (self) :
         log.info("%s", self)
-        self.irker.send(to=str(self), privmsg='')
+        self.irker(to=str(self), privmsg='')
 
     def privmsg (self, *args) :
         for arg in args :
             log.info("%s: %s", self, arg)
-            self.irker.send(to=str(self), privmsg=arg)
+            self.irker(to=str(self), privmsg=arg)
 
     def notice (self, *args) :
         for arg in args :
             log.info("%s: %s", self, arg)
-            self.irker.send(to=str(self), notice=arg)
+            self.irker(to=str(self), notice=arg)
 
     def __call__ (self, *args) :
         # default msg policy
@@ -240,15 +141,11 @@
 
 class Irker (object) :
     """
-        Reconnecting irker.
+        Reconnecting Irk.
     """
 
     def __init__ (self, url=None, notice=False) :
-        if url :
-            self.url = urlparse.urlparse(url)
-        else :
-            self.url = None
-
+        self.url = url
         self.targets = {}
         self.notice = notice
         
@@ -265,16 +162,16 @@
         for target in self.targets.itervalues() :
             target.join()
     
-    def send (self, **opts) :
+    def __call__ (self, **opts) :
         """
             Send on current irker connection.
 
             TODO: handle errors and reconnect?
         """
 
-        self.irk.send(**opts)
+        self.irk(**opts)
 
-    def target (self, target) :
+    def __getitem__ (self, target) :
         """
             Bind to given target URL, returning an IrkerTarget for sending messages.
         """
@@ -284,5 +181,3 @@
             self.targets[target].join()
             
         return self.targets[target]
-
-    __getitem__ = target
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/socket.py	Sun Jan 13 01:52:49 2013 +0200
@@ -0,0 +1,320 @@
+"""
+    A simple TCP client in the kind of syslog.fifo/file.
+
+    Interface: fileno(), __iter__, __call__
+"""
+
+# XXX: absolute import plz
+socket = __import__('socket')
+
+import select
+
+import urlparse
+
+import logging; log = logging.getLogger('pvl.socket')
+
+URL = {
+    'tcp':  (0,                 socket.SOCK_STREAM  ), # AF_UNSPEC
+    'udp':  (0,                 socket.SOCK_DGRAM   ), # AF_UNSPEC
+    'unix': (socket.AF_UNIX,    None                ), # socktype is given
+}
+
+def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
+    """
+        Parse given string into (AF_*, SOCK_*, host, port).
+
+        For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+    """
+   
+    family, socktype = URL[scheme]
+    url = urlparse.urlparse(str)
+    
+    # TODO: UNIX?
+    if url.scheme and url.netloc :
+        # proper url
+        family, socktype = URL[url.scheme]
+
+        return family, socktype, url.hostname, url.port or port
+
+    elif url.scheme and url.path :
+        # host:port
+        return family, socktype, url.scheme, int(url.path)
+
+    elif url.path :
+        # host
+        return family, socktype, url.path, port
+
+    else :
+        raise ValueError("unparseable connect URL: %s", str)
+
+def connect (str, *args, **kwargs) :
+    """
+        Returns a connected socket for given parse()'d string.
+    """
+
+    family, socktype, host, port = parse(str, *args, **kwargs)
+
+    if family == socket.AF_UNIX :
+        raise ValueError("XXX: AF_UNIX is not yet supported", str)
+    else : # AF_UNSPEC
+        return connect_inet(host, port, family=family, socktype=socktype)
+ 
+def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+    """
+        Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
+
+        TODO: timeout?
+    """
+
+    log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+    
+    if host :
+        flags = socket.AI_CANONNAME
+    else :
+        flags = 0
+
+    addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
+
+    if not addrinfo :
+        raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
+
+    for af, st, proto, name, addr in addrinfo :
+        try :
+            sock = socket.socket(af, st, proto)
+
+        except socket.error as error :
+            log.warning("%s:%s: socket: %s", host, port, error)
+            continue
+        
+        log.debug("%s:%s: socket: %s", host, port, sock)
+
+        try :
+            sock.connect(addr)
+
+        except socket.error as error :
+            log.warning("%s:%s: connect: %s", host, port, error)
+            continue
+
+        log.debug("%s:%s: connect", host, port)
+        log.info("%s", name)
+        
+        return sock
+
+    else :
+        raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
+
+def reverse (self, sockaddr, numeric_host=False, numeric_port=True) :
+    """
+        Resolve given sockaddr, returning (host, port).
+    """
+
+    flags = 0
+
+    if numeric_host :
+        flags |= socket.NI_NUMERICHOST
+    
+    if numeric_port :
+        flags |= socket.NI_NUMERICSERV
+
+    return socket.getnameinfo(sockaddr, flags)
+
+def nonblocking (call, *args, **kwargs) :
+    """
+        Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
+    """
+
+    try :
+        return call(*args, **kwargs)
+
+    except socket.error as ex :
+        # block?
+        if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
+            # empty
+            return None
+
+        else :
+            raise
+
+class ReadStream (object) :
+    """
+        Buffered stream, supporting non-blocking/line-based reads.
+    """
+
+    BLOCK=512
+
+    def __init__ (self, sock, buffer=None) :
+        """
+            TODO: buffer    - maximum line length
+        """
+
+        self.sock = sock
+        self._buf = ''
+
+    def fileno (self) :
+        return self.sock.fileno()
+
+    def _read (self, block=BLOCK) :
+        """
+            Read up to n bytes from socket.
+            
+            Returns None if we would block.
+            Raises EOFError on EOF.
+        """
+        
+        buf = nonblocking(self.sock.recv, block)
+
+        # eof?
+        if not buf :
+            raise EOFError()
+
+        # ok
+        return buf
+
+    def peek (self) :
+        """
+            Peek at data in buffer.
+        """
+
+        return self._buf
+
+    def read (self) :
+        """
+            Read and return any available input.
+
+            Returns None if blocking.
+        """
+
+        if self._buf :
+            buf, self._buf = self._buf, ''
+            
+        else :
+            buf = self._read()
+
+        return buf
+
+    def readline (self) :
+        """
+            Read and return next waiting line from input.
+
+            Line is returned without trailing '\r\n' or '\n'.
+
+            Returns None if there is no line available.
+        """
+
+        while '\n' not in self._buf :
+            # read chunk
+            read = self._read()
+
+            if read is None :
+                return None
+            
+            self._buf += read
+        
+        # split out one line
+        line, self._buf = self._buf.split('\n', 1)
+        
+        # in case we had \r\n
+        line = line.rstrip('\r')
+
+        return line
+    
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+
+            Returns None if thre is no more input available.
+
+            Raises EOFError in the socket was closed.
+        """
+
+        while True :
+            line = self.readline()
+
+            if line is None :
+                # no more
+                return
+            else :
+                yield line
+
+    __iter__ = readlines
+
+class WriteStream (object) :
+    """
+        Writable stream, supporting non-blocking/buffered writes.
+
+        XXX: buffering is completely untested
+    """
+    
+    EOL = '\n'
+
+    def __init__ (self, sock, buffer=None) :
+        """
+            TODO:   buffer  - maximum outgoing buffer length
+        """
+
+        self.sock = sock
+        self._buf = buffer
+
+    def _write (self, buf) :
+        """
+            Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
+        """
+        
+        send = nonblocking(self.sock.send, buf)
+        
+        # eof on write?
+        if send :
+            # ok, message (partially) written
+            return send
+
+        else :
+            # XXX: how do we handle this? What does it actually mean?
+            # handle as a wouldblock...
+            return None
+
+
+    def write (self, data) :
+        """
+            Write given data to socket.
+
+            TODO: buffer small chunks -> select writable -> write?
+
+            Buffers if not able to write, or raises EOFError (hah!)
+        """
+
+        if not self._buf :
+            # write directly
+            while data :
+                write = self._write(data)
+                
+                if write :
+                    # remaining data
+                    data = data[write:]
+
+                else :
+                    # cannot write more
+                    break
+
+        if not data :
+            # sent
+            return
+
+        if self._buf is None :
+            # no write buffering, and socket buffer full!
+            raise EOFError()
+
+        # append to outgoing buffer
+        self._buf += data
+
+    def writeline (self, line, eol=EOL) :
+        """
+            Write out line.
+        """
+
+        self.write(str(line) + eol)
+    
+    def __call__ (self, *lines) :
+        for line in lines :
+            self.writeline(line)
+        
+        # TODO: flush