"""
Non-blocking fifo reads.
"""
import os
import errno
import fcntl
import logging
log = logging.getLogger('pvl.syslog.fifo')
class Pipe (object) :
"""
A pipe from a fd.
Supports reading lines in a non-blocking fashion.
"""
@classmethod
def file (cls, file) :
"""
Create Pipe from file, e.g. sys.stdin.
Puts fd into nonblocking mode, which means that the given file will stop working!
"""
fd = file.fileno()
log.debug("%s: %s", file, fd)
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fl |= os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, fl)
return cls(fd)
def __init__ (self, fd) :
self._fd = fd
self._buf = ''
log.debug("pipe: %d", fd)
def fileno (self) :
"""
Fetch the internal fd, failing if we are not open..
"""
if self._fd is None :
raise ValueError("I/O operation on closed pipe: %s" % (self, ))
else :
return self._fd
def read (self, n=512) :
"""
Read up to n bytes.
Returns None if we would block.
Raises EOFError on EOF.
"""
try :
buf = os.read(self.fileno(), n)
except OSError as ex :
# block?
if ex.errno == errno.EAGAIN :
# empty
return None
else :
raise
# eof?
if buf :
return buf
else :
raise EOFError()
def readline (self) :
"""
Read and return next waiting line from input.
Line is returned without trailing '\n'.
Returns None if there is no line available.
Raises EOFError if the fifo write end was closed.
"""
while '\n' not in self._buf :
# read chunk
read = self.read()
if read is None :
return None
self._buf += read
# split out one line
line, self._buf = self._buf.split('\n', 1)
log.debug("%s", line)
return line
def readlines (self) :
"""
Read any available input, yielding lines.
Re-opens the FIFO on EOF.
Returns None if there was no more input available, or the fifo was re-opened after EOF.
"""
while True :
# pull line
line = self.readline()
if line :
yield line
else :
return # block
__iter__ = readlines
def __str__ (self) :
return "pipe:{self._fd}".format(self=self)
class Fifo (Pipe) :
"""
A named pipe(7) on the filesystem.
Supports reading lines in a non-blocking fashion, and re-opening on EOF.
"""
def __init__ (self, path) :
self.path = path
Pipe.__init__(self, self._open())
def _open (self) :
"""
Open the internal fd (nonblocking).
"""
fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
log.debug("%s: open: %s", self, fd)
return fd
def _close (self, fd) :
"""
Close.
"""
os.close(fd)
def _reopen (self) :
"""
Re-open the FIFO in case the writing end was closed, and read gave EOF.
"""
self._close(self._fd)
self._fd = None
self._fd = self._open()
def readlines (self) :
"""
Read any available input, yielding lines.
Re-opens the FIFO on EOF.
Returns None if there was no more input available, or the fifo was re-opened after EOF.
"""
while True :
try :
# pull line
line = self.readline()
except EOFError :
log.debug("%s: EOF: reopen", self)
# reopen and go back to waiting
self._reopen()
return
if line is None :
log.debug("%s: EOF: wait", self)
return # wait
else :
yield line
__iter__ = readlines
def close (self) :
"""
Close the fifo.
"""
self._close(self.fileno())
self._fd = None
def __del__ (self) :
"""
Cleanup
"""
if self._fd is not None :
self._close(self._fd)
self._fd = None