"""
Implementations of the various sources of log data
"""
from twisted.internet import protocol, reactor
from twisted.python import log
from fixbot.logwatch import fifo, message
class LogSource (object) :
"""
Reads lines of log data from some file or other source.
"""
def __init__ (self, name, filters) :
"""
name - label lines read from this source
filters - LogFilter chain to pass lines through
"""
# set later on
self.module = None
# what filters to apply
self.filters = filters
# name, for display purposes
self.name = name
# used to gather data together into lines
self.buf = ""
def setModule (self, module) :
self.module = module
def handleError (self, msg) :
log.err(msg)
self.module.error(msg)
def connectionLost (self, reason) :
"""
The transport we were connected to has dropped, possibly as a result of our handlers raising an error?
"""
self.handleError("lost LogSource for %s: %s" % (self.name, reason.getErrorMessage()))
def handleData (self, data) :
"""
Feed binary data into the buffer, processing all lines via handleLine()
"""
data = self.buf + data
while "\n" in data :
line, data = data.split("\n", 1)
# full line
self.handleLine(line)
self.buf = data
def handleLine (self, line) :
"""
Parse given line into a SyslogMessage, and pass it to handleMessage
"""
# parse
try :
msg = message.SyslogMessage(line)
except Exception, e :
# log and ignore
log.err(e, "Invalid syslog message from source %s: %s" % (self.name, line))
else :
# handle the structured message
log.callWithLogger(self, self.handleMessage, msg)
def handleMessage (self, msg) :
"""
Process the given SyslogMessage
"""
# Log incoming lines
log.msg(repr(msg))
for filter in self.filters :
# let the filter process the line
out = filter.match(msg)
if out :
# unpack
label, msg = out
# output tag/type
type = "%s:%s" % (self.name, label)
# positive match, send
log.msg("\t%s: %s" % (type, msg))
# drop until we have a module
if self.module :
self.module.sendEvent(type, msg)
# ok, first hit does it
break
elif out is False :
# negative match, stop processing
return
elif out is None :
# no match
continue
else :
raise ValueError(out)
def logPrefix (self) :
return "LogSource(%s)" % (self.name, )
class File (LogSource, protocol.ProcessProtocol) :
"""
Stream lines from a regular file using /usr/bin/tail -f
"""
def __init__ (self, name, path, filters) :
super(File, self).__init__(name, filters)
self.path = path
log.msg("spawning tail process for %s:%s" % (name, path))
reactor.spawnProcess(self, "/usr/bin/tail", ["tail", "-n0", "--follow=name", path])
def errReceived (self, data) :
self.handleError("tail for %s: %s" % (self.name, data))
def outReceived (self, data) :
self.handleData(data)
def processEnded (self, reason) :
self.handleError("tail process for %s quit: %s" % (self.name, reason.getErrorMessage()))
class Fifo (LogSource, fifo.Fifo) :
"""
Stream lines from a fifo object.
"""
def __init__ (self, name, path, filters) :
LogSource.__init__(self, name, filters)
log.msg("opening fifo for %s:%s" % (name, path))
fifo.Fifo.__init__(self, path)
def dataReceived (self, data) :
self.handleData(data)
def handleEOF (self) :
self.handleError("!!! EOF on fifo %s, re-opening" % self.name)
self.reopen()
class UnixDatagramSocket (LogSource, protocol.DatagramProtocol) :
"""
Stream messages from a UNIX datagram socket
"""
# maximum size of the recieved messages
# native syslog is 1024, but syslog-ng does better... so 4k
MAX_PACKET_SIZE = 4096
def __init__ (self, name, path, filters) :
LogSource.__init__(self, name, filters)
log.msg("opening unix socket for %s at: %s" % (name, path))
# open UNIX socket
reactor.listenUNIXDatagram(path, self, self.MAX_PACKET_SIZE)
def datagramReceived (self, data, addr) :
"""
Got message from syslog-ng
"""
# handle it as a line of data
self.handleLine(data)