add support for fifos in logwatch
authorTero Marttila <terom@paivola.fi>
Wed, 26 Mar 2008 02:30:34 +0200
changeset 17 24dc72473ff9
parent 16 521fec9bb663
child 18 6348bf9750bc
add support for fifos in logwatch

committer: Tero Marttila <terom@paivola.fi>
fifo.py
logwatch_sources.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/fifo.py	Wed Mar 26 02:30:34 2008 +0200
@@ -0,0 +1,100 @@
+# read a stream from a fifo
+
+from twisted.internet import reactor, interfaces
+from twisted.python import log
+from zope.interface import implements
+
+import os, fcntl, errno
+
+class EOF (Exception) : pass
+
+BUF_SIZE = 2048
+
+class Fifo (object) :
+    implements(interfaces.IReadDescriptor)
+
+    def __init__ (self, path) :
+        self.path = path
+        self._open()
+
+    def _open (self) :
+        self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+        reactor.addReader(self)
+        
+        log.msg("opened fifo %s as %d" % (self.path, self.fd))
+
+    def _close (self) :
+        if self.fd :
+            reactor.removeReader(self)
+            os.close(self.fd)
+
+            log.msg("closed fifo %d at %s" % (self.fd, self.path))
+            
+            self.fd = None
+    close = _close
+
+    def reopen (self) :
+        """
+            Close and re-open the fifo. This is useful for handling EOF
+        """
+        self._close()
+        self._open()
+
+    def _read (self, length) :
+
+        log.msg("(read %d bytes from %d:%s)" % (length, self.fd, self.path))
+        try :
+            data = os.read(self.fd, length)
+
+        except OSError, e :
+            if e.errno == errno.EAGAIN :
+                log.msg("\tEAGAIN")
+                return None
+            else :
+                log.msg("\tERROR: %s" % e)
+                raise
+
+        if not data :
+            log.msg("\tEOF")
+            raise EOF()
+        
+        log.msg("\tDATA: %d: %r" % (len(data), data))
+        return data
+    
+    def fileno (self) :
+        return self.fd
+
+    def doRead (self) :
+        while True :
+            log.msg("fifo doRead loop")
+
+            try :
+                data = self._read(BUF_SIZE)
+            except EOF :
+                self.handleEOF()
+                return
+
+            if data :
+                self.dataReceived(data)
+            else :
+                break
+        
+    def dataReceived (self, data) :
+        pass
+    
+    def handleEOF (self) :
+        pass
+    
+    def connectionLost (self, reason) :
+        self.close()
+    
+    def logPrefix (self) :
+        return "FIFO:%d:%s" % (self.fd, self.path)
+
+    def __del__ (self) :
+        """
+            !!! this is important
+        """
+        self.close()
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/logwatch_sources.py	Wed Mar 26 02:30:34 2008 +0200
@@ -0,0 +1,85 @@
+from twisted.internet import reactor, protocol
+from twisted.python import log
+
+import fifo
+
+class LogSource (object) :
+    def __init__ (self, name, filters) :
+        # set later on
+        self.module = None
+        
+        # what filters to apply
+        self.filters = filters
+        
+        # name, for display purposes
+        self.name = name
+
+        # used to gather data together into lines
+        self.buf = ""
+
+    def setModule (self, module) :
+        self.module = module
+
+    def handleError (self, msg) :
+        log.err(msg)
+        self.module.error(msg)
+
+    def handleData (self, data) :
+        data = self.buf + data
+        
+        while "\n" in data :
+            line, data = data.split("\n", 1)
+
+            self.handleLine(line)
+
+        self.buf = data
+
+    def handleLine (self, line) :
+        log.msg("Matching line `%s'..." % line)
+
+        for filter in self.filters :
+            out = filter.test(line)
+
+            if out :
+                log.msg("\t%s: %s" % (filter.event_type, out))
+                self.module.sendEvent(filter.event_type, out)
+
+class File (LogSource, protocol.ProcessProtocol) :
+    def __init__ (self, name, path, filters) :
+        super(File, self).__init__(name, filters)
+
+        self.path = path
+
+        log.msg("spawning tail process for %s:%s" % (name, path))
+
+        reactor.spawnProcess(self, "/usr/bin/tail", ["tail", "-n0", "--follow=name", path])
+
+    def errReceived (self, data) :
+        self.handleError("tail for %s: %s" % (self.name, data))
+
+    def outReceived (self, data) :
+        self.handleData(data)
+
+    def processEnded (self, reason) :
+        self.handleError("tail process for %s quit: %s" % (self.name, reason.getErrorMessage()))
+
+class Fifo (LogSource, fifo.Fifo) :
+    def __init__ (self, name, path, filters) :
+        LogSource.__init__(self, name, filters)
+
+        log.msg("opening fifo for %s:%s" % (name, path))
+
+        fifo.Fifo.__init__(self, path)
+    
+    def dataReceived (self, data) :
+        self.handleData(data)
+
+    def handleEOF (self) :
+        self.handleError("!!! EOF on fifo %s, re-opening" % self.name)
+        self.reopen()
+    
+    def connectionLost (self, reason) :
+        super(Fifo, self).connectionLost(reason)
+        self.handleError("lost fifo for %s: %s" % (self.name, reason.getErrorMessage()))
+
+