pvl/syslog/fifo.py
author Tero Marttila <tero.marttila@aalto.fi>
Mon, 28 Jul 2014 13:32:41 +0300
changeset 35 4c7905e1cad7
parent 18 48d94f45b242
permissions -rw-r--r--
version 0.5.1: bugfix for working around conflicting -c/--config options
"""
    Non-blocking fifo reads.
"""

import os
import errno
import fcntl

import logging

log = logging.getLogger('pvl.syslog.fifo')

class Pipe (object) :
    """
        A pipe from a fd.

        Supports reading lines in a non-blocking fashion.
    """

    @classmethod
    def file (cls, file) :
        """
            Create Pipe from file, e.g. sys.stdin.

            Puts fd into nonblocking mode, which means that the given file will stop working!
        """
        
        fd = file.fileno()

        log.debug("%s: %s", file, fd)

        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fl |= os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, fl)
        
        return cls(fd)

    def __init__ (self, fd) :
        """
            May pass fd=None to open as closed.
        """

        self._fd = fd
        self._buf = ''
        
        log.debug("pipe: %d", fd)

    def open (self, fd) :
        """
            re-open closed pipe to use the given fd.

            Raises ValueError if already open.
        """

        if self._fd is None :
            self._fd = fd
        else :
            raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
    
    # XXX: good idea?
    def __nonzero__ (self) :
        """
            Test if we are open.

            XXX: signal EOF as well?
        """

        return self._fd is not None

    def fileno (self) :
        """
            Return the internal fd.

            Raises ValueError if we are closed.
            XXX: EOFError?
        """

        if self._fd is None :
            raise ValueError("I/O operation on closed pipe: %s" % (self, ))
        else :
            return self._fd
    
    # XXX: this is almost identical to pvl.socket.ReadStream
    def read (self, n=512) :
        """
            Read up to n bytes.
            
            Returns None if we would block.
            Raises EOFError on EOF, or closed.
        """

        try :
            buf = os.read(self.fileno(), n)

        except OSError as ex :
            # block?
            if ex.errno == errno.EAGAIN :
                # empty
                buf = None

            else :
                raise

        log.debug("%s: %s", self, buf)

        if buf is None :
            return None
        elif buf :
            return buf
        else :
            raise EOFError()

    def readline (self) :
        """
            Read and return next waiting line from input.

            Line is returned without trailing '\n'.

            Returns None if there is no line available.
            Raises EOFError if the fifo write end was closed.
        """

        while '\n' not in self._buf :
            # read chunk
            read = self.read()

            if read is None :
                return None
            
            self._buf += read
        
        # split out one line
        line, self._buf = self._buf.split('\n', 1)

        log.debug("%s", line)

        return line

    def readlines (self) :
        """
            Read any available input, yielding lines.
            
            Re-opens the FIFO on EOF.

            Returns None if there was no more input available, or the fifo was re-opened after EOF.
        """

        while True :
            # pull line
            line = self.readline()

            if line is not None :
                yield line
            else :
                return # block

    __iter__ = readlines

    def close (self) :
        """
            Close our fd, if open.

            May be open()'d again. Meanwhile, all operatations will raise EOFError.

            log.warn's if already closed.
        """
        
        if self._fd is None :
            log.warn("%s: already closed", self)

        else :
            log.debug("%s: %s", self, self._fd)

            os.close(self._fd)
            self._fd = None

    def __str__ (self) :
        return "pipe({self._fd})".format(self=self)

class Fifo (Pipe) :
    """
        A named pipe(7) on the filesystem.

        Supports reading lines in a non-blocking fashion, and re-opening on EOF.
    """

    def __init__ (self, path) :
        self.path = path
        Pipe.__init__(self, self._open())

    def _open (self) :
        """
            Open the internal fd (nonblocking).
        """
        
        fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)

        log.debug("%s: open: %s", self, fd)

        return fd
   
    def open (self) :
        """
            Re-open the FIFO.
            
            Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
            and resume nonblocking mode.
            
            Raises ValueError() if already open. close() first.
        """

        Pipe.open(self, self._open())

    def readlines (self) :
        """
            Read any available input, yielding lines.
            
            Re-opens the FIFO on EOF.

            Returns None if there was no more input available, or the fifo was re-opened after EOF.
        """

        while True :
            try :
                # pull line
                line = self.readline()

            except EOFError :
                log.debug("%s: EOF: reopen", self)

                # reopen and go back to waiting
                self.close()
                self.open()

                return
            
            if line is None :
                log.debug("%s: EOF: wait", self)
                return # wait
            else :
                yield line
    
    __iter__ = readlines

    def __str__ (self) :
        return self.path

    # XXX: we need to figure out what references we have lying around, and clean those out!
    def __del__ (self) :
        """
            Cleanup
        """

        if self._fd is not None :
            self.close()