pvl/syslog/fifo.py
author Tero Marttila <terom@paivola.fi>
Thu, 10 Jan 2013 19:01:22 +0200
changeset 77 05d6cfa9efac
parent 43 9d13b101beab
child 122 f742c866c765
permissions -rw-r--r--
fix pvl.verkko-dhcp + pvl.syslog.fifo
"""
    Non-blocking fifo reads.
"""

import os
import errno
import select

import logging

log = logging.getLogger('pvl.syslog.fifo')

class Fifo (object) :
    """
        A named pipe(7) on the filesystem.

        Supports reading lines in a non-blocking fashion.
    """

    def __init__ (self, path) :
        self.path = path
        self._fd = None
        self._buf = ''

        log.debug("open: %s", path)
        self._open()

    @property
    def fd (self) :
        """
            Fetch the internal fd, failing if we are not open..
        """

        if self._fd is None :
            raise ValueError("I/O operation on closed fifo: {0}".format(self.path))

        return self._fd

    def fileno (self) :
        return self.fd

    def _open (self) :
        """
            Open the internal fd (nonblocking).
        """

        assert self._fd is None

        self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)

    def _close (self) :
        """
            Close.
        """

        assert self._fd is not None

        os.close(self._fd)

        self._fd = None
    
    def _reopen (self) :
        """
            Re-open the FIFO in case the writing end was closed, and read gave EOF.
        """

        self._close()
        self._open()

    def poll (self, timeout=None) :
        """
            Poll for input, with given timeout in seconds (float).

            A timeout of None indicates to block forever, False indicates to never block.

            Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
        """

        if timeout is False :
            timeout = 0.0
    
        # select
        read, write, ex = select.select([self.fd], [], [], timeout)

        if read :
            return True

        else :
            # timeout
            return False

    def read (self, n=512) :
        """
            Read up to n bytes.
            
            Returns None if we would block.
            Raises EOFError on EOF.
        """

        try :
            buf = os.read(self.fd, n)

        except OSError as ex :
            # block?
            if ex.errno == errno.EAGAIN :
                # empty
                return None

            else :
                raise

        # eof?
        if not buf :
            raise EOFError()

        # ok
        return buf

    def readline (self) :
        """
            Read and return next waiting line from input.

            Line is returned without trailing '\n'.

            Returns None if there is no line available.
            Raises EOFError if the fifo write end was closed.
        """

        while '\n' not in self._buf :
            # read chunk
            read = self.read()

            if read is None :
                return None
            
            self._buf += read
        
        # split out one line
        line, self._buf = self._buf.split('\n', 1)

        return line

    def readlines (self) :
        """
            Read any available input, yielding lines.
            
            Re-opens the FIFO on EOF.

            Returns None if there was no more input available, or the fifo was re-opened after EOF.
        """

        while True :
            try :
                # pull line
                line = self.readline()

            except EOFError :
                log.debug("EOF/reopen: %s", self.path)

                # reopen and go back to waiting
                self._reopen()

                return
            
            if line is None :
                log.debug("fifo empty: %s", self.path)

                # wait
                return

            log.debug("%s", line)
            yield line

    __iter__ = readlines

    def close (self) :
        """
            Close the fifo.
        """

        if self._fd is None :
            raise ValueError("Fifo already closed: {0}".format(self.path))

        self._close()

    def __del__ (self) :
        """
            Cleanup
        """

        # will not close stdin
        if self._fd :
            self._close()