terom@31: """ terom@31: Non-blocking fifo reads. terom@31: """ terom@31: terom@31: import os terom@31: import errno terom@31: import select terom@31: terom@31: import logging terom@31: terom@77: log = logging.getLogger('pvl.syslog.fifo') terom@31: terom@31: class Fifo (object) : terom@31: """ terom@31: A named pipe(7) on the filesystem. terom@31: terom@31: Supports reading lines in a non-blocking fashion. terom@31: """ terom@31: terom@31: def __init__ (self, path) : terom@31: self.path = path terom@31: self._fd = None terom@31: self._buf = '' terom@31: terom@31: log.debug("open: %s", path) terom@31: self._open() terom@31: terom@31: @property terom@31: def fd (self) : terom@31: """ terom@31: Fetch the internal fd, failing if we are not open.. terom@31: """ terom@31: terom@31: if self._fd is None : terom@31: raise ValueError("I/O operation on closed fifo: {0}".format(self.path)) terom@31: terom@31: return self._fd terom@31: terom@77: def fileno (self) : terom@77: return self.fd terom@77: terom@31: def _open (self) : terom@31: """ terom@43: Open the internal fd (nonblocking). terom@31: """ terom@31: terom@31: assert self._fd is None terom@31: terom@31: self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) terom@31: terom@31: def _close (self) : terom@31: """ terom@31: Close. terom@31: """ terom@31: terom@31: assert self._fd is not None terom@31: terom@31: os.close(self._fd) terom@31: terom@31: self._fd = None terom@31: terom@31: def _reopen (self) : terom@31: """ terom@31: Re-open the FIFO in case the writing end was closed, and read gave EOF. terom@31: """ terom@31: terom@31: self._close() terom@31: self._open() terom@31: terom@31: def poll (self, timeout=None) : terom@31: """ terom@31: Poll for input, with given timeout in seconds (float). terom@31: terom@31: A timeout of None indicates to block forever, False indicates to never block. terom@31: terom@31: Returns True if we have input waiting, False on timeout with no input. None on indeterminate. terom@31: """ terom@31: terom@31: if timeout is False : terom@31: timeout = 0.0 terom@31: terom@31: # select terom@31: read, write, ex = select.select([self.fd], [], [], timeout) terom@31: terom@31: if read : terom@31: return True terom@31: terom@31: else : terom@31: # timeout terom@31: return False terom@31: terom@31: def read (self, n=512) : terom@31: """ terom@31: Read up to n bytes. terom@31: terom@31: Returns None if we would block. terom@31: Raises EOFError on EOF. terom@31: """ terom@31: terom@31: try : terom@31: buf = os.read(self.fd, n) terom@31: terom@31: except OSError as ex : terom@31: # block? terom@31: if ex.errno == errno.EAGAIN : terom@31: # empty terom@31: return None terom@31: terom@31: else : terom@31: raise terom@31: terom@31: # eof? terom@31: if not buf : terom@31: raise EOFError() terom@31: terom@31: # ok terom@31: return buf terom@31: terom@31: def readline (self) : terom@31: """ terom@31: Read and return next waiting line from input. terom@31: terom@31: Line is returned without trailing '\n'. terom@31: terom@31: Returns None if there is no line available. terom@31: Raises EOFError if the fifo write end was closed. terom@31: """ terom@31: terom@31: while '\n' not in self._buf : terom@31: # read chunk terom@31: read = self.read() terom@31: terom@31: if read is None : terom@31: return None terom@31: terom@31: self._buf += read terom@31: terom@31: # split out one line terom@31: line, self._buf = self._buf.split('\n', 1) terom@31: terom@31: return line terom@31: terom@31: def readlines (self) : terom@31: """ terom@31: Read any available input, yielding lines. terom@31: terom@31: Re-opens the FIFO on EOF. terom@31: terom@31: Returns None if there was no more input available, or the fifo was re-opened after EOF. terom@31: """ terom@31: terom@31: while True : terom@31: try : terom@31: # pull line terom@31: line = self.readline() terom@31: terom@31: except EOFError : terom@31: log.debug("EOF/reopen: %s", self.path) terom@31: terom@31: # reopen and go back to waiting terom@31: self._reopen() terom@31: terom@31: return terom@31: terom@31: if line is None : terom@31: log.debug("fifo empty: %s", self.path) terom@31: terom@31: # wait terom@31: return terom@31: terom@31: log.debug("%s", line) terom@31: yield line terom@31: terom@31: __iter__ = readlines terom@31: terom@31: def close (self) : terom@31: """ terom@31: Close the fifo. terom@31: """ terom@31: terom@31: if self._fd is None : terom@31: raise ValueError("Fifo already closed: {0}".format(self.path)) terom@31: terom@31: self._close() terom@31: terom@31: def __del__ (self) : terom@31: """ terom@31: Cleanup terom@31: """ terom@31: terom@31: # will not close stdin terom@31: if self._fd : terom@31: self._close()