"""
Non-blocking fifo reads.
"""
import os
import errno
import select
import logging
log = logging.getLogger('pvl.syslog.fifo')
class Fifo (object) :
"""
A named pipe(7) on the filesystem.
Supports reading lines in a non-blocking fashion.
"""
def __init__ (self, path) :
self.path = path
self._fd = None
self._buf = ''
log.debug("open: %s", path)
self._open()
@property
def fd (self) :
"""
Fetch the internal fd, failing if we are not open..
"""
if self._fd is None :
raise ValueError("I/O operation on closed fifo: {0}".format(self.path))
return self._fd
def fileno (self) :
return self.fd
def _open (self) :
"""
Open the internal fd (nonblocking).
"""
assert self._fd is None
self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
def _close (self) :
"""
Close.
"""
assert self._fd is not None
os.close(self._fd)
self._fd = None
def _reopen (self) :
"""
Re-open the FIFO in case the writing end was closed, and read gave EOF.
"""
self._close()
self._open()
def poll (self, timeout=None) :
"""
Poll for input, with given timeout in seconds (float).
A timeout of None indicates to block forever, False indicates to never block.
Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
"""
if timeout is False :
timeout = 0.0
# select
read, write, ex = select.select([self.fd], [], [], timeout)
if read :
return True
else :
# timeout
return False
def read (self, n=512) :
"""
Read up to n bytes.
Returns None if we would block.
Raises EOFError on EOF.
"""
try :
buf = os.read(self.fd, n)
except OSError as ex :
# block?
if ex.errno == errno.EAGAIN :
# empty
return None
else :
raise
# eof?
if not buf :
raise EOFError()
# ok
return buf
def readline (self) :
"""
Read and return next waiting line from input.
Line is returned without trailing '\n'.
Returns None if there is no line available.
Raises EOFError if the fifo write end was closed.
"""
while '\n' not in self._buf :
# read chunk
read = self.read()
if read is None :
return None
self._buf += read
# split out one line
line, self._buf = self._buf.split('\n', 1)
return line
def readlines (self) :
"""
Read any available input, yielding lines.
Re-opens the FIFO on EOF.
Returns None if there was no more input available, or the fifo was re-opened after EOF.
"""
while True :
try :
# pull line
line = self.readline()
except EOFError :
log.debug("EOF/reopen: %s", self.path)
# reopen and go back to waiting
self._reopen()
return
if line is None :
log.debug("fifo empty: %s", self.path)
# wait
return
log.debug("%s", line)
yield line
__iter__ = readlines
def close (self) :
"""
Close the fifo.
"""
if self._fd is None :
raise ValueError("Fifo already closed: {0}".format(self.path))
self._close()
def __del__ (self) :
"""
Cleanup
"""
# will not close stdin
if self._fd :
self._close()