pvl/syslog/fifo.py
author Tero Marttila <terom@paivola.fi>
Sun, 13 Jan 2013 02:43:15 +0200
changeset 122 f742c866c765
parent 77 05d6cfa9efac
child 125 9925ef5258f1
permissions -rw-r--r--
pvl.syslog.fifo: separate Pipe/Fifo
"""
    Non-blocking fifo reads.
"""

import os
import errno
import fcntl

import logging

log = logging.getLogger('pvl.syslog.fifo')

class Pipe (object) :
    """
        A pipe from a fd.

        Supports reading lines in a non-blocking fashion.
    """

    @classmethod
    def file (cls, file) :
        """
            Create Pipe from file, e.g. sys.stdin.

            Puts fd into nonblocking mode, which means that the given file will stop working!
        """
        
        fd = file.fileno()

        log.debug("%s: %s", file, fd)

        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fl |= os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, fl)
        
        return cls(fd)

    def __init__ (self, fd) :
        self._fd = fd
        self._buf = ''
        
        log.debug("pipe: %d", fd)

    def fileno (self) :
        """
            Fetch the internal fd, failing if we are not open..
        """

        if self._fd is None :
            raise ValueError("I/O operation on closed pipe: %s" % (self, ))
        else :
            return self._fd

    def read (self, n=512) :
        """
            Read up to n bytes.
            
            Returns None if we would block.
            Raises EOFError on EOF.
        """

        try :
            buf = os.read(self.fileno(), n)

        except OSError as ex :
            # block?
            if ex.errno == errno.EAGAIN :
                # empty
                return None

            else :
                raise

        # eof?
        if buf :
            return buf
        else :
            raise EOFError()

    def readline (self) :
        """
            Read and return next waiting line from input.

            Line is returned without trailing '\n'.

            Returns None if there is no line available.
            Raises EOFError if the fifo write end was closed.
        """

        while '\n' not in self._buf :
            # read chunk
            read = self.read()

            if read is None :
                return None
            
            self._buf += read
        
        # split out one line
        line, self._buf = self._buf.split('\n', 1)

        log.debug("%s", line)

        return line

    def readlines (self) :
        """
            Read any available input, yielding lines.
            
            Re-opens the FIFO on EOF.

            Returns None if there was no more input available, or the fifo was re-opened after EOF.
        """

        while True :
            # pull line
            line = self.readline()

            if line :
                yield line
            else :
                return # block

    __iter__ = readlines

    def __str__ (self) :
        return "pipe:{self._fd}".format(self=self)

class Fifo (Pipe) :
    """
        A named pipe(7) on the filesystem.

        Supports reading lines in a non-blocking fashion, and re-opening on EOF.
    """

    def __init__ (self, path) :
        self.path = path
        Pipe.__init__(self, self._open())

    def _open (self) :
        """
            Open the internal fd (nonblocking).
        """
        
        fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)

        log.debug("%s: open: %s", self, fd)

        return fd

    def _close (self, fd) :
        """
            Close.
        """

        os.close(fd)
    
    def _reopen (self) :
        """
            Re-open the FIFO in case the writing end was closed, and read gave EOF.
        """

        self._close(self._fd)
        self._fd = None
        self._fd = self._open()

    def readlines (self) :
        """
            Read any available input, yielding lines.
            
            Re-opens the FIFO on EOF.

            Returns None if there was no more input available, or the fifo was re-opened after EOF.
        """

        while True :
            try :
                # pull line
                line = self.readline()

            except EOFError :
                log.debug("%s: EOF: reopen", self)

                # reopen and go back to waiting
                self._reopen()

                return
            
            if line is None :
                log.debug("%s: EOF: wait", self)
                return # wait
            else :
                yield line
    
    __iter__ = readlines

    def close (self) :
        """
            Close the fifo.
        """

        self._close(self.fileno())
        self._fd = None

    def __del__ (self) :
        """
            Cleanup
        """

        if self._fd is not None :
            self._close(self._fd)
            self._fd = None