pvl.irk: split off url/socket stuff into pvl.socket, clarify Irk __call__/__iter__ iterface
--- a/pvl/irk.py Sun Jan 13 01:52:00 2013 +0200
+++ b/pvl/irk.py Sun Jan 13 01:52:49 2013 +0200
@@ -2,16 +2,15 @@
Irker client.
"""
+import pvl.syslog.file # for stdin
+import pvl.socket # for tcp
+
import optparse, sys
import logging; log = logging.getLogger('pvl.irk')
-# proto
import json
-# XXX: socket
-import socket, select
-
def parser (parser, connect='tcp://localhost/', target=None) :
"""
Optparse option group.
@@ -35,107 +34,56 @@
# None -> stdout
return Irker(options.irker, options.irker_notice)
-def connect (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
- """
- Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
-
- TODO: timeout?
- """
-
- log.debug("%s:%s: %s/%s", host, port, family, socktype)
-
- if host :
- flags = socket.AI_CANONNAME
- else :
- flags = 0
-
- addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
-
- if not addrinfo :
- raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
-
- for af, st, proto, name, addr in addrinfo :
- try :
- s = socket.socket(af, st, proto)
-
- except socket.error as error :
- log.warning("%s:%s: socket: %s", host, port, error)
- continue
-
- try :
- s.connect(addr)
-
- except socket.error as error :
- log.warning("%s:%s: connect: %s", host, port, error)
- continue
-
- log.info("%s", name)
-
- return s
-
- else :
- raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
-
-import urlparse
-
class Irk (object) :
"""
- Irker JSON connection.
+ Irker JSON connection speaks JSON over a stream.
- TODO: timeout?
+ TODO: timeouts?
"""
PORT = 6659
- SCHEME = {
- 'tcp': (socket.AF_INET, socket.SOCK_STREAM),
- 'udp': (socket.AF_INET, socket.SOCK_DGRAM),
- 'unix': (socket.AF_UNIX, socket.SOCK_DGRAM),
- }
-
@classmethod
def connect (cls, url) :
"""
- Connect to given urllib URL, or None -> stdout
+ Connect to given URL string, or None -> stdout
"""
if not url :
- return cls(sys.stdout, recv=False)
+ # no read
+ return cls(pvl.syslog.file.File(sys.stdout), recv=False)
- family, socktype = cls.SCHEME[url.scheme]
-
- if family == socket.AF_UNIX :
- raise Exception("unix:// is not supported")
else :
- # inet
- sock = connect(url.hostname, url.port or cls.PORT, family=family, socktype=socktype)
+ sock = pvl.socket.connect(url, port=cls.PORT)
- # XXX: just to make things a bit more exciting... and we really don't want to be blocking on our output..
- sock.setblocking(False)
-
- return cls(sock.makefile('w'), recv=sock)
+ # just to make things a bit more exciting... and we really don't want to be blocking on our output..
+ sock.setblocking(False)
+
+ return cls(
+ pvl.socket.WriteStream(sock, buffer=None),
+ pvl.socket.ReadStream(sock)
+ )
- def __init__ (self, file, recv=None) :
+ def __init__ (self, send, recv=None) :
"""
Use given file-like object (write, flush, fileno) for output.
"""
- self.file = file
-
- # XXX
+ self.send = send
self.recv = recv
- self._buf = ''
-
- log.debug("%s", file)
+
+ log.debug("%s <-> %s", send, recv)
def fileno (self) :
"""
Return fd. Useful for detecting error conditions (connection lost).
+
+ Only valid if self.recv is True.
"""
return self.recv.fileno()
- def send (self, **opts) :
+ def __call__ (self, **opts) :
"""
Raises IOError on write errors.
"""
@@ -143,65 +91,18 @@
log.debug("%s", opts)
# write line + flush
- json.dump(opts, self.file)
- self.file.write('\n')
- self.file.flush()
-
- def read (self, size=512) :
- """
- Recieve data back from irkerd.
-
- Raise EOFError.
-
- XXX: this is seriously crazy on a buffered file-like object..?
- """
+ self.send(json.dumps(opts))
- # poll
- read, _, _ = select.select((self.recv, ), (), (), 0.0)
-
- if read :
- read = self.recv.recv(size)
+ # XXX: self.send.flush()
- if read :
- return read
- else :
- raise EOFError()
- else :
- return None # block
-
- def readline (self) :
+ def __iter__ (self) :
"""
- Yield line of input, or None.
-
- Raise EOFError.
+ Yield JSON inputs from source.
"""
- while '\n' not in self._buf :
- read = self.read()
-
- if not read :
- return None # block
-
- self._buf += read
-
- line, self._buf = self._buf.split('\n')
-
- return line
-
- def readlines (self) :
- """
- Yield lines of input, until blocking.
- """
-
- while True :
- line = self.readline()
-
- if line :
- yield line
- else :
- return
-
- __iter__ = readlines
+ for line in self.recv :
+ # XXX: error handling?
+ yield json.loads(line)
class IrkerTarget (object) :
"""
@@ -216,17 +117,17 @@
def join (self) :
log.info("%s", self)
- self.irker.send(to=str(self), privmsg='')
+ self.irker(to=str(self), privmsg='')
def privmsg (self, *args) :
for arg in args :
log.info("%s: %s", self, arg)
- self.irker.send(to=str(self), privmsg=arg)
+ self.irker(to=str(self), privmsg=arg)
def notice (self, *args) :
for arg in args :
log.info("%s: %s", self, arg)
- self.irker.send(to=str(self), notice=arg)
+ self.irker(to=str(self), notice=arg)
def __call__ (self, *args) :
# default msg policy
@@ -240,15 +141,11 @@
class Irker (object) :
"""
- Reconnecting irker.
+ Reconnecting Irk.
"""
def __init__ (self, url=None, notice=False) :
- if url :
- self.url = urlparse.urlparse(url)
- else :
- self.url = None
-
+ self.url = url
self.targets = {}
self.notice = notice
@@ -265,16 +162,16 @@
for target in self.targets.itervalues() :
target.join()
- def send (self, **opts) :
+ def __call__ (self, **opts) :
"""
Send on current irker connection.
TODO: handle errors and reconnect?
"""
- self.irk.send(**opts)
+ self.irk(**opts)
- def target (self, target) :
+ def __getitem__ (self, target) :
"""
Bind to given target URL, returning an IrkerTarget for sending messages.
"""
@@ -284,5 +181,3 @@
self.targets[target].join()
return self.targets[target]
-
- __getitem__ = target
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/socket.py Sun Jan 13 01:52:49 2013 +0200
@@ -0,0 +1,320 @@
+"""
+ A simple TCP client in the kind of syslog.fifo/file.
+
+ Interface: fileno(), __iter__, __call__
+"""
+
+# XXX: absolute import plz
+socket = __import__('socket')
+
+import select
+
+import urlparse
+
+import logging; log = logging.getLogger('pvl.socket')
+
+URL = {
+ 'tcp': (0, socket.SOCK_STREAM ), # AF_UNSPEC
+ 'udp': (0, socket.SOCK_DGRAM ), # AF_UNSPEC
+ 'unix': (socket.AF_UNIX, None ), # socktype is given
+}
+
+def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
+ """
+ Parse given string into (AF_*, SOCK_*, host, port).
+
+ For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
+ """
+
+ family, socktype = URL[scheme]
+ url = urlparse.urlparse(str)
+
+ # TODO: UNIX?
+ if url.scheme and url.netloc :
+ # proper url
+ family, socktype = URL[url.scheme]
+
+ return family, socktype, url.hostname, url.port or port
+
+ elif url.scheme and url.path :
+ # host:port
+ return family, socktype, url.scheme, int(url.path)
+
+ elif url.path :
+ # host
+ return family, socktype, url.path, port
+
+ else :
+ raise ValueError("unparseable connect URL: %s", str)
+
+def connect (str, *args, **kwargs) :
+ """
+ Returns a connected socket for given parse()'d string.
+ """
+
+ family, socktype, host, port = parse(str, *args, **kwargs)
+
+ if family == socket.AF_UNIX :
+ raise ValueError("XXX: AF_UNIX is not yet supported", str)
+ else : # AF_UNSPEC
+ return connect_inet(host, port, family=family, socktype=socktype)
+
+def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
+ """
+ Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
+
+ TODO: timeout?
+ """
+
+ log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
+
+ if host :
+ flags = socket.AI_CANONNAME
+ else :
+ flags = 0
+
+ addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
+
+ if not addrinfo :
+ raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
+
+ for af, st, proto, name, addr in addrinfo :
+ try :
+ sock = socket.socket(af, st, proto)
+
+ except socket.error as error :
+ log.warning("%s:%s: socket: %s", host, port, error)
+ continue
+
+ log.debug("%s:%s: socket: %s", host, port, sock)
+
+ try :
+ sock.connect(addr)
+
+ except socket.error as error :
+ log.warning("%s:%s: connect: %s", host, port, error)
+ continue
+
+ log.debug("%s:%s: connect", host, port)
+ log.info("%s", name)
+
+ return sock
+
+ else :
+ raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
+
+def reverse (self, sockaddr, numeric_host=False, numeric_port=True) :
+ """
+ Resolve given sockaddr, returning (host, port).
+ """
+
+ flags = 0
+
+ if numeric_host :
+ flags |= socket.NI_NUMERICHOST
+
+ if numeric_port :
+ flags |= socket.NI_NUMERICSERV
+
+ return socket.getnameinfo(sockaddr, flags)
+
+def nonblocking (call, *args, **kwargs) :
+ """
+ Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
+ """
+
+ try :
+ return call(*args, **kwargs)
+
+ except socket.error as ex :
+ # block?
+ if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
+ # empty
+ return None
+
+ else :
+ raise
+
+class ReadStream (object) :
+ """
+ Buffered stream, supporting non-blocking/line-based reads.
+ """
+
+ BLOCK=512
+
+ def __init__ (self, sock, buffer=None) :
+ """
+ TODO: buffer - maximum line length
+ """
+
+ self.sock = sock
+ self._buf = ''
+
+ def fileno (self) :
+ return self.sock.fileno()
+
+ def _read (self, block=BLOCK) :
+ """
+ Read up to n bytes from socket.
+
+ Returns None if we would block.
+ Raises EOFError on EOF.
+ """
+
+ buf = nonblocking(self.sock.recv, block)
+
+ # eof?
+ if not buf :
+ raise EOFError()
+
+ # ok
+ return buf
+
+ def peek (self) :
+ """
+ Peek at data in buffer.
+ """
+
+ return self._buf
+
+ def read (self) :
+ """
+ Read and return any available input.
+
+ Returns None if blocking.
+ """
+
+ if self._buf :
+ buf, self._buf = self._buf, ''
+
+ else :
+ buf = self._read()
+
+ return buf
+
+ def readline (self) :
+ """
+ Read and return next waiting line from input.
+
+ Line is returned without trailing '\r\n' or '\n'.
+
+ Returns None if there is no line available.
+ """
+
+ while '\n' not in self._buf :
+ # read chunk
+ read = self._read()
+
+ if read is None :
+ return None
+
+ self._buf += read
+
+ # split out one line
+ line, self._buf = self._buf.split('\n', 1)
+
+ # in case we had \r\n
+ line = line.rstrip('\r')
+
+ return line
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Returns None if thre is no more input available.
+
+ Raises EOFError in the socket was closed.
+ """
+
+ while True :
+ line = self.readline()
+
+ if line is None :
+ # no more
+ return
+ else :
+ yield line
+
+ __iter__ = readlines
+
+class WriteStream (object) :
+ """
+ Writable stream, supporting non-blocking/buffered writes.
+
+ XXX: buffering is completely untested
+ """
+
+ EOL = '\n'
+
+ def __init__ (self, sock, buffer=None) :
+ """
+ TODO: buffer - maximum outgoing buffer length
+ """
+
+ self.sock = sock
+ self._buf = buffer
+
+ def _write (self, buf) :
+ """
+ Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
+ """
+
+ send = nonblocking(self.sock.send, buf)
+
+ # eof on write?
+ if send :
+ # ok, message (partially) written
+ return send
+
+ else :
+ # XXX: how do we handle this? What does it actually mean?
+ # handle as a wouldblock...
+ return None
+
+
+ def write (self, data) :
+ """
+ Write given data to socket.
+
+ TODO: buffer small chunks -> select writable -> write?
+
+ Buffers if not able to write, or raises EOFError (hah!)
+ """
+
+ if not self._buf :
+ # write directly
+ while data :
+ write = self._write(data)
+
+ if write :
+ # remaining data
+ data = data[write:]
+
+ else :
+ # cannot write more
+ break
+
+ if not data :
+ # sent
+ return
+
+ if self._buf is None :
+ # no write buffering, and socket buffer full!
+ raise EOFError()
+
+ # append to outgoing buffer
+ self._buf += data
+
+ def writeline (self, line, eol=EOL) :
+ """
+ Write out line.
+ """
+
+ self.write(str(line) + eol)
+
+ def __call__ (self, *lines) :
+ for line in lines :
+ self.writeline(line)
+
+ # TODO: flush