terom@48: """ terom@48: Implementations of the various sources of log data terom@48: """ terom@48: terom@51: from twisted.internet import protocol, reactor terom@21: from twisted.python import log terom@21: terom@63: from fixbot.logwatch import fifo, message terom@21: terom@21: class LogSource (object) : terom@48: """ terom@48: Reads lines of log data from some file or other source. terom@48: """ terom@48: terom@21: def __init__ (self, name, filters) : terom@48: """ terom@48: name - label lines read from this source terom@48: filters - LogFilter chain to pass lines through terom@48: """ terom@48: terom@21: # set later on terom@21: self.module = None terom@21: terom@21: # what filters to apply terom@21: self.filters = filters terom@21: terom@21: # name, for display purposes terom@21: self.name = name terom@21: terom@21: # used to gather data together into lines terom@21: self.buf = "" terom@21: terom@21: def setModule (self, module) : terom@21: self.module = module terom@21: terom@21: def handleError (self, msg) : terom@21: log.err(msg) terom@21: self.module.error(msg) terom@21: terom@64: def connectionLost (self, reason) : terom@64: """ terom@64: The transport we were connected to has dropped, possibly as a result of our handlers raising an error? terom@64: """ terom@64: terom@64: self.handleError("lost LogSource for %s: %s" % (self.name, reason.getErrorMessage())) terom@64: terom@21: def handleData (self, data) : terom@48: """ terom@51: Feed binary data into the buffer, processing all lines via handleLine() terom@48: """ terom@48: terom@21: data = self.buf + data terom@21: terom@21: while "\n" in data : terom@21: line, data = data.split("\n", 1) terom@51: terom@51: # full line terom@21: self.handleLine(line) terom@21: terom@21: self.buf = data terom@21: terom@21: def handleLine (self, line) : terom@51: """ terom@51: Parse given line into a SyslogMessage, and pass it to handleMessage terom@51: """ terom@51: terom@51: # parse terom@51: try : terom@51: msg = message.SyslogMessage(line) terom@51: terom@51: except Exception, e : terom@51: # log and ignore terom@51: log.err(e, "Invalid syslog message from source %s: %s" % (self.name, line)) terom@51: terom@51: else : terom@51: # handle the structured message terom@54: log.callWithLogger(self, self.handleMessage, msg) terom@51: terom@51: def handleMessage (self, msg) : terom@51: """ terom@51: Process the given SyslogMessage terom@51: """ terom@54: terom@54: # Log incoming lines terom@51: log.msg(repr(msg)) terom@21: terom@21: for filter in self.filters : terom@40: # let the filter process the line terom@53: out = filter.match(msg) terom@21: terom@21: if out : terom@48: # unpack terom@59: label, msg = out terom@59: terom@59: # output tag/type terom@59: type = "%s:%s" % (self.name, label) terom@48: terom@40: # positive match, send terom@48: log.msg("\t%s: %s" % (type, msg)) terom@48: terom@48: # drop until we have a module terom@48: if self.module : terom@48: self.module.sendEvent(type, msg) terom@48: terom@48: # ok, first hit does it terom@21: break terom@40: terom@21: elif out is False : terom@40: # negative match, stop processing terom@21: return terom@40: terom@48: elif out is None : terom@40: # no match terom@21: continue terom@21: terom@48: else : terom@48: raise ValueError(out) terom@48: terom@64: def logPrefix (self) : terom@64: return "LogSource(%s)" % (self.name, ) terom@64: terom@21: class File (LogSource, protocol.ProcessProtocol) : terom@40: """ terom@40: Stream lines from a regular file using /usr/bin/tail -f terom@40: """ terom@40: terom@21: def __init__ (self, name, path, filters) : terom@21: super(File, self).__init__(name, filters) terom@21: terom@21: self.path = path terom@21: terom@21: log.msg("spawning tail process for %s:%s" % (name, path)) terom@21: terom@21: reactor.spawnProcess(self, "/usr/bin/tail", ["tail", "-n0", "--follow=name", path]) terom@21: terom@21: def errReceived (self, data) : terom@21: self.handleError("tail for %s: %s" % (self.name, data)) terom@21: terom@21: def outReceived (self, data) : terom@21: self.handleData(data) terom@21: terom@21: def processEnded (self, reason) : terom@21: self.handleError("tail process for %s quit: %s" % (self.name, reason.getErrorMessage())) terom@21: terom@21: class Fifo (LogSource, fifo.Fifo) : terom@40: """ terom@40: Stream lines from a fifo object. terom@40: """ terom@40: terom@21: def __init__ (self, name, path, filters) : terom@21: LogSource.__init__(self, name, filters) terom@21: terom@21: log.msg("opening fifo for %s:%s" % (name, path)) terom@21: terom@21: fifo.Fifo.__init__(self, path) terom@21: terom@21: def dataReceived (self, data) : terom@21: self.handleData(data) terom@21: terom@21: def handleEOF (self) : terom@21: self.handleError("!!! EOF on fifo %s, re-opening" % self.name) terom@64: self.reopen() terom@21: terom@51: class UnixDatagramSocket (LogSource, protocol.DatagramProtocol) : terom@51: """ terom@51: Stream messages from a UNIX datagram socket terom@51: """ terom@51: terom@51: # maximum size of the recieved messages terom@51: # native syslog is 1024, but syslog-ng does better... so 4k terom@51: MAX_PACKET_SIZE = 4096 terom@21: terom@51: def __init__ (self, name, path, filters) : terom@51: LogSource.__init__(self, name, filters) terom@51: terom@51: log.msg("opening unix socket for %s at: %s" % (name, path)) terom@51: terom@51: # open UNIX socket terom@51: reactor.listenUNIXDatagram(path, self, self.MAX_PACKET_SIZE) terom@51: terom@51: def datagramReceived (self, data, addr) : terom@51: """ terom@51: Got message from syslog-ng terom@51: """ terom@51: terom@51: # handle it as a line of data terom@51: self.handleLine(data) terom@51: terom@54: