author Tero Marttila <>
Sat, 06 Nov 2010 16:02:28 +0200
changeset 67 00907acd732a
parent 64 8574aeff9b36
permissions -rw-r--r--
Merge with api restructuring
    Implementations of the various sources of log data

from twisted.internet import protocol, reactor
from twisted.python import log

from fixbot.logwatch import fifo, message

class LogSource (object) :
        Reads lines of log data from some file or other source.

    def __init__ (self, name, filters) :
            name            - label lines read from this source
            filters         - LogFilter chain to pass lines through

        # set later on
        self.module = None
        # what filters to apply
        self.filters = filters
        # name, for display purposes = name

        # used to gather data together into lines
        self.buf = ""

    def setModule (self, module) :
        self.module = module

    def handleError (self, msg) :

    def connectionLost (self, reason) :
            The transport we were connected to has dropped, possibly as a result of our handlers raising an error?

        self.handleError("lost LogSource for %s: %s" % (, reason.getErrorMessage()))

    def handleData (self, data) :
            Feed binary data into the buffer, processing all lines via handleLine()

        data = self.buf + data
        while "\n" in data :
            line, data = data.split("\n", 1)
            # full line

        self.buf = data

    def handleLine (self, line) :
            Parse given line into a SyslogMessage, and pass it to handleMessage

        # parse
        try :
            msg = message.SyslogMessage(line)
        except Exception, e :
            # log and ignore
            log.err(e, "Invalid syslog message from source %s: %s" % (, line))
        else :
            # handle the structured message
            log.callWithLogger(self, self.handleMessage, msg)

    def handleMessage (self, msg) :
            Process the given SyslogMessage
        # Log incoming lines

        for filter in self.filters :
            # let the filter process the line
            out = filter.match(msg)

            if out :
                # unpack
                label, msg = out

                # output tag/type
                type = "%s:%s" % (, label)

                # positive match, send
                log.msg("\t%s: %s" % (type, msg))
                # drop until we have a module
                if self.module :
                    self.module.sendEvent(type, msg)
                # ok, first hit does it

            elif out is False :
                # negative match, stop processing

            elif out is None :
                # no match

            else :
                raise ValueError(out)

    def logPrefix (self) :
        return "LogSource(%s)" % (, )

class File (LogSource, protocol.ProcessProtocol) :
        Stream lines from a regular file using /usr/bin/tail -f

    def __init__ (self, name, path, filters) :
        super(File, self).__init__(name, filters)

        self.path = path

        log.msg("spawning tail process for %s:%s" % (name, path))

        reactor.spawnProcess(self, "/usr/bin/tail", ["tail", "-n0", "--follow=name", path])

    def errReceived (self, data) :
        self.handleError("tail for %s: %s" % (, data))

    def outReceived (self, data) :

    def processEnded (self, reason) :
        self.handleError("tail process for %s quit: %s" % (, reason.getErrorMessage()))

class Fifo (LogSource, fifo.Fifo) :
        Stream lines from a fifo object.

    def __init__ (self, name, path, filters) :
        LogSource.__init__(self, name, filters)

        log.msg("opening fifo for %s:%s" % (name, path))

        fifo.Fifo.__init__(self, path)
    def dataReceived (self, data) :

    def handleEOF (self) :
        self.handleError("!!! EOF on fifo %s, re-opening" %

class UnixDatagramSocket (LogSource, protocol.DatagramProtocol) :
        Stream messages from a UNIX datagram socket
    # maximum size of the recieved messages
    # native syslog is 1024, but syslog-ng does better... so 4k
    MAX_PACKET_SIZE = 4096

    def __init__ (self, name, path, filters) :
        LogSource.__init__(self, name, filters)

        log.msg("opening unix socket for %s at: %s" % (name, path))
        # open UNIX socket
        reactor.listenUNIXDatagram(path, self, self.MAX_PACKET_SIZE)

    def datagramReceived (self, data, addr) :
            Got message from syslog-ng
        # handle it as a line of data