pvl.syslog.fifo: separate Pipe/Fifo
authorTero Marttila <terom@paivola.fi>
Sun, 13 Jan 2013 02:43:15 +0200
changeset 122 f742c866c765
parent 121 4f16bf6365f1
child 123 f35b2940b7fc
pvl.syslog.fifo: separate Pipe/Fifo
pvl/syslog/fifo.py
pvl/syslog/tail.py
--- a/pvl/syslog/fifo.py	Sun Jan 13 02:16:36 2013 +0200
+++ b/pvl/syslog/fifo.py	Sun Jan 13 02:43:15 2013 +0200
@@ -4,90 +4,52 @@
 
 import os
 import errno
-import select
+import fcntl
 
 import logging
 
 log = logging.getLogger('pvl.syslog.fifo')
 
-class Fifo (object) :
+class Pipe (object) :
     """
-        A named pipe(7) on the filesystem.
+        A pipe from a fd.
 
         Supports reading lines in a non-blocking fashion.
     """
 
-    def __init__ (self, path) :
-        self.path = path
-        self._fd = None
-        self._buf = ''
+    @classmethod
+    def file (cls, file) :
+        """
+            Create Pipe from file, e.g. sys.stdin.
 
-        log.debug("open: %s", path)
-        self._open()
+            Puts fd into nonblocking mode, which means that the given file will stop working!
+        """
+        
+        fd = file.fileno()
 
-    @property
-    def fd (self) :
+        log.debug("%s: %s", file, fd)
+
+        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+        fl |= os.O_NONBLOCK
+        fcntl.fcntl(fd, fcntl.F_SETFL, fl)
+        
+        return cls(fd)
+
+    def __init__ (self, fd) :
+        self._fd = fd
+        self._buf = ''
+        
+        log.debug("pipe: %d", fd)
+
+    def fileno (self) :
         """
             Fetch the internal fd, failing if we are not open..
         """
 
         if self._fd is None :
-            raise ValueError("I/O operation on closed fifo: {0}".format(self.path))
-
-        return self._fd
-
-    def fileno (self) :
-        return self.fd
-
-    def _open (self) :
-        """
-            Open the internal fd (nonblocking).
-        """
-
-        assert self._fd is None
-
-        self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
-
-    def _close (self) :
-        """
-            Close.
-        """
-
-        assert self._fd is not None
-
-        os.close(self._fd)
-
-        self._fd = None
-    
-    def _reopen (self) :
-        """
-            Re-open the FIFO in case the writing end was closed, and read gave EOF.
-        """
-
-        self._close()
-        self._open()
-
-    def poll (self, timeout=None) :
-        """
-            Poll for input, with given timeout in seconds (float).
-
-            A timeout of None indicates to block forever, False indicates to never block.
-
-            Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
-        """
-
-        if timeout is False :
-            timeout = 0.0
-    
-        # select
-        read, write, ex = select.select([self.fd], [], [], timeout)
-
-        if read :
-            return True
-
+            raise ValueError("I/O operation on closed pipe: %s" % (self, ))
         else :
-            # timeout
-            return False
+            return self._fd
 
     def read (self, n=512) :
         """
@@ -98,7 +60,7 @@
         """
 
         try :
-            buf = os.read(self.fd, n)
+            buf = os.read(self.fileno(), n)
 
         except OSError as ex :
             # block?
@@ -110,12 +72,11 @@
                 raise
 
         # eof?
-        if not buf :
+        if buf :
+            return buf
+        else :
             raise EOFError()
 
-        # ok
-        return buf
-
     def readline (self) :
         """
             Read and return next waiting line from input.
@@ -138,6 +99,8 @@
         # split out one line
         line, self._buf = self._buf.split('\n', 1)
 
+        log.debug("%s", line)
+
         return line
 
     def readlines (self) :
@@ -150,12 +113,73 @@
         """
 
         while True :
+            # pull line
+            line = self.readline()
+
+            if line :
+                yield line
+            else :
+                return # block
+
+    __iter__ = readlines
+
+    def __str__ (self) :
+        return "pipe:{self._fd}".format(self=self)
+
+class Fifo (Pipe) :
+    """
+        A named pipe(7) on the filesystem.
+
+        Supports reading lines in a non-blocking fashion, and re-opening on EOF.
+    """
+
+    def __init__ (self, path) :
+        self.path = path
+        Pipe.__init__(self, self._open())
+
+    def _open (self) :
+        """
+            Open the internal fd (nonblocking).
+        """
+        
+        fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+        log.debug("%s: open: %s", self, fd)
+
+        return fd
+
+    def _close (self, fd) :
+        """
+            Close.
+        """
+
+        os.close(fd)
+    
+    def _reopen (self) :
+        """
+            Re-open the FIFO in case the writing end was closed, and read gave EOF.
+        """
+
+        self._close(self._fd)
+        self._fd = None
+        self._fd = self._open()
+
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+            
+            Re-opens the FIFO on EOF.
+
+            Returns None if there was no more input available, or the fifo was re-opened after EOF.
+        """
+
+        while True :
             try :
                 # pull line
                 line = self.readline()
 
             except EOFError :
-                log.debug("EOF/reopen: %s", self.path)
+                log.debug("%s: EOF: reopen", self)
 
                 # reopen and go back to waiting
                 self._reopen()
@@ -163,14 +187,11 @@
                 return
             
             if line is None :
-                log.debug("fifo empty: %s", self.path)
-
-                # wait
-                return
-
-            log.debug("%s", line)
-            yield line
-
+                log.debug("%s: EOF: wait", self)
+                return # wait
+            else :
+                yield line
+    
     __iter__ = readlines
 
     def close (self) :
@@ -178,16 +199,15 @@
             Close the fifo.
         """
 
-        if self._fd is None :
-            raise ValueError("Fifo already closed: {0}".format(self.path))
-
-        self._close()
+        self._close(self.fileno())
+        self._fd = None
 
     def __del__ (self) :
         """
             Cleanup
         """
 
-        # will not close stdin
-        if self._fd :
-            self._close()
+        if self._fd is not None :
+            self._close(self._fd)
+            self._fd = None
+
--- a/pvl/syslog/tail.py	Sun Jan 13 02:16:36 2013 +0200
+++ b/pvl/syslog/tail.py	Sun Jan 13 02:43:15 2013 +0200
@@ -59,13 +59,6 @@
 
         return self.stat() != self._stat
 
-    def raedline (self) :
-        """
-            Reads any available line from file.
-
-            Returns None if at EOF.
-        """
-
     def readline (self) :
         """
             Reads a line from the file, without trailing \n.