--- a/pvl/syslog/fifo.py Sun Jan 13 02:16:36 2013 +0200
+++ b/pvl/syslog/fifo.py Sun Jan 13 02:43:15 2013 +0200
@@ -4,90 +4,52 @@
import os
import errno
-import select
+import fcntl
import logging
log = logging.getLogger('pvl.syslog.fifo')
-class Fifo (object) :
+class Pipe (object) :
"""
- A named pipe(7) on the filesystem.
+ A pipe from a fd.
Supports reading lines in a non-blocking fashion.
"""
- def __init__ (self, path) :
- self.path = path
- self._fd = None
- self._buf = ''
+ @classmethod
+ def file (cls, file) :
+ """
+ Create Pipe from file, e.g. sys.stdin.
- log.debug("open: %s", path)
- self._open()
+ Puts fd into nonblocking mode, which means that the given file will stop working!
+ """
+
+ fd = file.fileno()
- @property
- def fd (self) :
+ log.debug("%s: %s", file, fd)
+
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fl |= os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl)
+
+ return cls(fd)
+
+ def __init__ (self, fd) :
+ self._fd = fd
+ self._buf = ''
+
+ log.debug("pipe: %d", fd)
+
+ def fileno (self) :
"""
Fetch the internal fd, failing if we are not open..
"""
if self._fd is None :
- raise ValueError("I/O operation on closed fifo: {0}".format(self.path))
-
- return self._fd
-
- def fileno (self) :
- return self.fd
-
- def _open (self) :
- """
- Open the internal fd (nonblocking).
- """
-
- assert self._fd is None
-
- self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
-
- def _close (self) :
- """
- Close.
- """
-
- assert self._fd is not None
-
- os.close(self._fd)
-
- self._fd = None
-
- def _reopen (self) :
- """
- Re-open the FIFO in case the writing end was closed, and read gave EOF.
- """
-
- self._close()
- self._open()
-
- def poll (self, timeout=None) :
- """
- Poll for input, with given timeout in seconds (float).
-
- A timeout of None indicates to block forever, False indicates to never block.
-
- Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
- """
-
- if timeout is False :
- timeout = 0.0
-
- # select
- read, write, ex = select.select([self.fd], [], [], timeout)
-
- if read :
- return True
-
+ raise ValueError("I/O operation on closed pipe: %s" % (self, ))
else :
- # timeout
- return False
+ return self._fd
def read (self, n=512) :
"""
@@ -98,7 +60,7 @@
"""
try :
- buf = os.read(self.fd, n)
+ buf = os.read(self.fileno(), n)
except OSError as ex :
# block?
@@ -110,12 +72,11 @@
raise
# eof?
- if not buf :
+ if buf :
+ return buf
+ else :
raise EOFError()
- # ok
- return buf
-
def readline (self) :
"""
Read and return next waiting line from input.
@@ -138,6 +99,8 @@
# split out one line
line, self._buf = self._buf.split('\n', 1)
+ log.debug("%s", line)
+
return line
def readlines (self) :
@@ -150,12 +113,73 @@
"""
while True :
+ # pull line
+ line = self.readline()
+
+ if line :
+ yield line
+ else :
+ return # block
+
+ __iter__ = readlines
+
+ def __str__ (self) :
+ return "pipe:{self._fd}".format(self=self)
+
+class Fifo (Pipe) :
+ """
+ A named pipe(7) on the filesystem.
+
+ Supports reading lines in a non-blocking fashion, and re-opening on EOF.
+ """
+
+ def __init__ (self, path) :
+ self.path = path
+ Pipe.__init__(self, self._open())
+
+ def _open (self) :
+ """
+ Open the internal fd (nonblocking).
+ """
+
+ fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+ log.debug("%s: open: %s", self, fd)
+
+ return fd
+
+ def _close (self, fd) :
+ """
+ Close.
+ """
+
+ os.close(fd)
+
+ def _reopen (self) :
+ """
+ Re-open the FIFO in case the writing end was closed, and read gave EOF.
+ """
+
+ self._close(self._fd)
+ self._fd = None
+ self._fd = self._open()
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Re-opens the FIFO on EOF.
+
+ Returns None if there was no more input available, or the fifo was re-opened after EOF.
+ """
+
+ while True :
try :
# pull line
line = self.readline()
except EOFError :
- log.debug("EOF/reopen: %s", self.path)
+ log.debug("%s: EOF: reopen", self)
# reopen and go back to waiting
self._reopen()
@@ -163,14 +187,11 @@
return
if line is None :
- log.debug("fifo empty: %s", self.path)
-
- # wait
- return
-
- log.debug("%s", line)
- yield line
-
+ log.debug("%s: EOF: wait", self)
+ return # wait
+ else :
+ yield line
+
__iter__ = readlines
def close (self) :
@@ -178,16 +199,15 @@
Close the fifo.
"""
- if self._fd is None :
- raise ValueError("Fifo already closed: {0}".format(self.path))
-
- self._close()
+ self._close(self.fileno())
+ self._fd = None
def __del__ (self) :
"""
Cleanup
"""
- # will not close stdin
- if self._fd :
- self._close()
+ if self._fd is not None :
+ self._close(self._fd)
+ self._fd = None
+