"""
Non-blocking fifo reads.
"""
import os
import errno
import fcntl
import logging
log = logging.getLogger('pvl.syslog.fifo')
class Pipe (object) :
"""
A pipe from a fd.
Supports reading lines in a non-blocking fashion.
"""
@classmethod
def file (cls, file) :
"""
Create Pipe from file, e.g. sys.stdin.
Puts fd into nonblocking mode, which means that the given file will stop working!
"""
fd = file.fileno()
log.debug("%s: %s", file, fd)
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fl |= os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, fl)
return cls(fd)
def __init__ (self, fd) :
"""
May pass fd=None to open as closed.
"""
self._fd = fd
self._buf = ''
log.debug("pipe: %d", fd)
def open (self, fd) :
"""
re-open closed pipe to use the given fd.
Raises ValueError if already open.
"""
if self._fd is None :
self._fd = fd
else :
raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
# XXX: good idea?
def __nonzero__ (self) :
"""
Test if we are open.
XXX: signal EOF as well?
"""
return self._fd is not None
def fileno (self) :
"""
Return the internal fd.
Raises ValueError if we are closed.
XXX: EOFError?
"""
if self._fd is None :
raise ValueError("I/O operation on closed pipe: %s" % (self, ))
else :
return self._fd
# XXX: this is almost identical to pvl.socket.ReadStream
def read (self, n=512) :
"""
Read up to n bytes.
Returns None if we would block.
Raises EOFError on EOF, or closed.
"""
try :
buf = os.read(self.fileno(), n)
except OSError as ex :
# block?
if ex.errno == errno.EAGAIN :
# empty
buf = None
else :
raise
log.debug("%s: %s", self, buf)
if buf is None :
return None
elif buf :
return buf
else :
raise EOFError()
def readline (self) :
"""
Read and return next waiting line from input.
Line is returned without trailing '\n'.
Returns None if there is no line available.
Raises EOFError if the fifo write end was closed.
"""
while '\n' not in self._buf :
# read chunk
read = self.read()
if read is None :
return None
self._buf += read
# split out one line
line, self._buf = self._buf.split('\n', 1)
log.debug("%s", line)
return line
def readlines (self) :
"""
Read any available input, yielding lines.
Re-opens the FIFO on EOF.
Returns None if there was no more input available, or the fifo was re-opened after EOF.
"""
while True :
# pull line
line = self.readline()
if line is not None :
yield line
else :
return # block
__iter__ = readlines
def close (self) :
"""
Close our fd, if open.
May be open()'d again. Meanwhile, all operatations will raise EOFError.
log.warn's if already closed.
"""
if self._fd is None :
log.warn("%s: already closed", self)
else :
log.debug("%s: %s", self, self._fd)
os.close(self._fd)
self._fd = None
def __str__ (self) :
return "pipe({self._fd})".format(self=self)
class Fifo (Pipe) :
"""
A named pipe(7) on the filesystem.
Supports reading lines in a non-blocking fashion, and re-opening on EOF.
"""
def __init__ (self, path) :
self.path = path
Pipe.__init__(self, self._open())
def _open (self) :
"""
Open the internal fd (nonblocking).
"""
fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
log.debug("%s: open: %s", self, fd)
return fd
def open (self) :
"""
Re-open the FIFO.
Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
and resume nonblocking mode.
Raises ValueError() if already open. close() first.
"""
Pipe.open(self, self._open())
def readlines (self) :
"""
Read any available input, yielding lines.
Re-opens the FIFO on EOF.
Returns None if there was no more input available, or the fifo was re-opened after EOF.
"""
while True :
try :
# pull line
line = self.readline()
except EOFError :
log.debug("%s: EOF: reopen", self)
# reopen and go back to waiting
self.close()
self.open()
return
if line is None :
log.debug("%s: EOF: wait", self)
return # wait
else :
yield line
__iter__ = readlines
def __str__ (self) :
return self.path
# XXX: we need to figure out what references we have lying around, and clean those out!
def __del__ (self) :
"""
Cleanup
"""
if self._fd is not None :
self.close()