pvl/syslog/fifo.py
changeset 2 5a8a32cbc944
child 18 48d94f45b242
equal deleted inserted replaced
1:ce931075b69e 2:5a8a32cbc944
       
     1 """
       
     2     Non-blocking fifo reads.
       
     3 """
       
     4 
       
     5 import os
       
     6 import errno
       
     7 import fcntl
       
     8 
       
     9 import logging
       
    10 
       
    11 log = logging.getLogger('pvl.syslog.fifo')
       
    12 
       
    13 class Pipe (object) :
       
    14     """
       
    15         A pipe from a fd.
       
    16 
       
    17         Supports reading lines in a non-blocking fashion.
       
    18     """
       
    19 
       
    20     @classmethod
       
    21     def file (cls, file) :
       
    22         """
       
    23             Create Pipe from file, e.g. sys.stdin.
       
    24 
       
    25             Puts fd into nonblocking mode, which means that the given file will stop working!
       
    26         """
       
    27         
       
    28         fd = file.fileno()
       
    29 
       
    30         log.debug("%s: %s", file, fd)
       
    31 
       
    32         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
       
    33         fl |= os.O_NONBLOCK
       
    34         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
       
    35         
       
    36         return cls(fd)
       
    37 
       
    38     def __init__ (self, fd) :
       
    39         """
       
    40             May pass fd=None to open as closed.
       
    41         """
       
    42 
       
    43         self._fd = fd
       
    44         self._buf = ''
       
    45         
       
    46         log.debug("pipe: %d", fd)
       
    47 
       
    48     def open (self, fd) :
       
    49         """
       
    50             re-open closed pipe to use the given fd.
       
    51 
       
    52             Raises ValueError if already open.
       
    53         """
       
    54 
       
    55         if self._fd is None :
       
    56             self._fd = fd
       
    57         else :
       
    58             raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
       
    59     
       
    60     # XXX: good idea?
       
    61     def __nonzero__ (self) :
       
    62         """
       
    63             Test if we are open.
       
    64 
       
    65             XXX: signal EOF as well?
       
    66         """
       
    67 
       
    68         return self._fd is not None
       
    69 
       
    70     def fileno (self) :
       
    71         """
       
    72             Return the internal fd.
       
    73 
       
    74             Raises ValueError if we are closed.
       
    75             XXX: EOFError?
       
    76         """
       
    77 
       
    78         if self._fd is None :
       
    79             raise ValueError("I/O operation on closed pipe: %s" % (self, ))
       
    80         else :
       
    81             return self._fd
       
    82     
       
    83     # XXX: this is almost identical to pvl.socket.ReadStream
       
    84     def read (self, n=512) :
       
    85         """
       
    86             Read up to n bytes.
       
    87             
       
    88             Returns None if we would block.
       
    89             Raises EOFError on EOF, or closed.
       
    90         """
       
    91 
       
    92         try :
       
    93             buf = os.read(self.fileno(), n)
       
    94 
       
    95         except OSError as ex :
       
    96             # block?
       
    97             if ex.errno == errno.EAGAIN :
       
    98                 # empty
       
    99                 buf = None
       
   100 
       
   101             else :
       
   102                 raise
       
   103 
       
   104         log.debug("%s: %s", self, buf)
       
   105 
       
   106         if buf is None :
       
   107             return None
       
   108         elif buf :
       
   109             return buf
       
   110         else :
       
   111             raise EOFError()
       
   112 
       
   113     def readline (self) :
       
   114         """
       
   115             Read and return next waiting line from input.
       
   116 
       
   117             Line is returned without trailing '\n'.
       
   118 
       
   119             Returns None if there is no line available.
       
   120             Raises EOFError if the fifo write end was closed.
       
   121         """
       
   122 
       
   123         while '\n' not in self._buf :
       
   124             # read chunk
       
   125             read = self.read()
       
   126 
       
   127             if read is None :
       
   128                 return None
       
   129             
       
   130             self._buf += read
       
   131         
       
   132         # split out one line
       
   133         line, self._buf = self._buf.split('\n', 1)
       
   134 
       
   135         log.debug("%s", line)
       
   136 
       
   137         return line
       
   138 
       
   139     def readlines (self) :
       
   140         """
       
   141             Read any available input, yielding lines.
       
   142             
       
   143             Re-opens the FIFO on EOF.
       
   144 
       
   145             Returns None if there was no more input available, or the fifo was re-opened after EOF.
       
   146         """
       
   147 
       
   148         while True :
       
   149             # pull line
       
   150             line = self.readline()
       
   151 
       
   152             if line :
       
   153                 yield line
       
   154             else :
       
   155                 return # block
       
   156 
       
   157     __iter__ = readlines
       
   158 
       
   159     def close (self) :
       
   160         """
       
   161             Close our fd, if open.
       
   162 
       
   163             May be open()'d again. Meanwhile, all operatations will raise EOFError.
       
   164 
       
   165             log.warn's if already closed.
       
   166         """
       
   167         
       
   168         if self._fd is None :
       
   169             log.warn("%s: already closed", self)
       
   170 
       
   171         else :
       
   172             log.debug("%s: %s", self, self._fd)
       
   173 
       
   174             os.close(self._fd)
       
   175             self._fd = None
       
   176 
       
   177     def __str__ (self) :
       
   178         return "pipe({self._fd})".format(self=self)
       
   179 
       
   180 class Fifo (Pipe) :
       
   181     """
       
   182         A named pipe(7) on the filesystem.
       
   183 
       
   184         Supports reading lines in a non-blocking fashion, and re-opening on EOF.
       
   185     """
       
   186 
       
   187     def __init__ (self, path) :
       
   188         self.path = path
       
   189         Pipe.__init__(self, self._open())
       
   190 
       
   191     def _open (self) :
       
   192         """
       
   193             Open the internal fd (nonblocking).
       
   194         """
       
   195         
       
   196         fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
       
   197 
       
   198         log.debug("%s: open: %s", self, fd)
       
   199 
       
   200         return fd
       
   201    
       
   202     def open (self) :
       
   203         """
       
   204             Re-open the FIFO.
       
   205             
       
   206             Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
       
   207             and resume nonblocking mode.
       
   208             
       
   209             Raises ValueError() if already open. close() first.
       
   210         """
       
   211 
       
   212         Pipe.open(self, self._open())
       
   213 
       
   214     def readlines (self) :
       
   215         """
       
   216             Read any available input, yielding lines.
       
   217             
       
   218             Re-opens the FIFO on EOF.
       
   219 
       
   220             Returns None if there was no more input available, or the fifo was re-opened after EOF.
       
   221         """
       
   222 
       
   223         while True :
       
   224             try :
       
   225                 # pull line
       
   226                 line = self.readline()
       
   227 
       
   228             except EOFError :
       
   229                 log.debug("%s: EOF: reopen", self)
       
   230 
       
   231                 # reopen and go back to waiting
       
   232                 self.close()
       
   233                 self.open()
       
   234 
       
   235                 return
       
   236             
       
   237             if line is None :
       
   238                 log.debug("%s: EOF: wait", self)
       
   239                 return # wait
       
   240             else :
       
   241                 yield line
       
   242     
       
   243     __iter__ = readlines
       
   244 
       
   245     def __str__ (self) :
       
   246         return self.path
       
   247 
       
   248     # XXX: we need to figure out what references we have lying around, and clean those out!
       
   249     def __del__ (self) :
       
   250         """
       
   251             Cleanup
       
   252         """
       
   253 
       
   254         if self._fd is not None :
       
   255             self.close()
       
   256