--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/args.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,101 @@
+import optparse, sys
+
+from pvl.syslog.parser import SyslogParser
+from pvl.syslog.filter import SyslogFilter
+from pvl.syslog.syslog import SyslogSource
+from pvl.syslog import fifo, tail, file
+
+# XXX: use optparse parser.error()?
+import logging; log = logging.getLogger('pvl.syslog.args')
+
+def parser (parser, prog=None) :
+ """
+ Optparse option group
+
+ prog - filter to only process lines from given process
+ """
+
+ syslog = optparse.OptionGroup(parser, 'Syslog collector')
+
+ syslog.add_option('--syslog-fifo', metavar='PATH',
+ help="Read syslog messages from given fifo")
+
+ syslog.add_option('--syslog-file', metavar='FILE',
+ help="Read syslog messages from given file")
+
+ syslog.add_option('--syslog-tail', metavar='FILE',
+ help="Continuously poll syslog messages given file")
+
+ syslog.add_option('--syslog-stdin', action='store_true',
+ help="Read syslog messages from stdin")
+
+ syslog.add_option('--syslog-raw', action='store_true',
+ help="Parse raw syslog lines without timestamp/etc")
+
+ syslog.add_option('--syslog-facility', metavar='FACILITY',
+ help="Set/filter by given facility")
+
+ syslog.add_option('--syslog-severity', metavar='SEVERITY',
+ help="Set given facility")
+
+ syslog.add_option('--syslog-prog', metavar='PROG', default=prog,
+ help="Filter by given prog: %default")
+
+ return syslog
+
+def apply (options, optional=False) :
+ """
+ Handle options, returning a SyslogSource, if any.
+
+ May log.error/sys.exit
+ """
+
+ # XXX: this belongs in pvl.syslog.source
+ if options.syslog_fifo :
+ # fifo pipe
+ source = fifo.Fifo(options.syslog_fifo)
+ poll = True # select(source)
+
+ elif options.syslog_tail :
+ # tail file
+ source = tail.Tail(options.syslog_tail, skip=True)
+ poll = tail.Tail.POLL # select(float)
+
+ elif options.syslog_file :
+ # read file
+ source = file.File(open(options.syslog_file))
+ poll = False # do not loop, just read up to EOF
+
+ elif options.syslog_stdin :
+ # read pipe
+ source = fifo.Pipe.file(sys.stdin) # puts stdin into non-blocking mode
+ poll = True # select(source)
+
+ elif optional :
+ return None
+
+ else :
+ # from stdin
+ if sys.stdin.isatty() :
+ log.warning("Reading syslog messages from TTY?")
+
+ source = file.File(sys.stdin)
+ poll = False # XXX: tty vs pipe vs file? False -> just block
+
+ # options
+ parser = SyslogParser(
+ raw = options.syslog_raw,
+ facility = options.syslog_facility,
+ severity = options.syslog_severity,
+ )
+
+ # TODO: filter optional
+ filter = SyslogFilter.build(
+ # glob pattern
+ prog = options.syslog_prog,
+ facility = options.syslog_facility,
+ #severity = options.sylog_severity, # XXX: match as greater-than?
+ )
+
+ # polling
+ return SyslogSource(source, parser, filter, poll)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/fifo.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,256 @@
+"""
+ Non-blocking fifo reads.
+"""
+
+import os
+import errno
+import fcntl
+
+import logging
+
+log = logging.getLogger('pvl.syslog.fifo')
+
+class Pipe (object) :
+ """
+ A pipe from a fd.
+
+ Supports reading lines in a non-blocking fashion.
+ """
+
+ @classmethod
+ def file (cls, file) :
+ """
+ Create Pipe from file, e.g. sys.stdin.
+
+ Puts fd into nonblocking mode, which means that the given file will stop working!
+ """
+
+ fd = file.fileno()
+
+ log.debug("%s: %s", file, fd)
+
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fl |= os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl)
+
+ return cls(fd)
+
+ def __init__ (self, fd) :
+ """
+ May pass fd=None to open as closed.
+ """
+
+ self._fd = fd
+ self._buf = ''
+
+ log.debug("pipe: %d", fd)
+
+ def open (self, fd) :
+ """
+ re-open closed pipe to use the given fd.
+
+ Raises ValueError if already open.
+ """
+
+ if self._fd is None :
+ self._fd = fd
+ else :
+ raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
+
+ # XXX: good idea?
+ def __nonzero__ (self) :
+ """
+ Test if we are open.
+
+ XXX: signal EOF as well?
+ """
+
+ return self._fd is not None
+
+ def fileno (self) :
+ """
+ Return the internal fd.
+
+ Raises ValueError if we are closed.
+ XXX: EOFError?
+ """
+
+ if self._fd is None :
+ raise ValueError("I/O operation on closed pipe: %s" % (self, ))
+ else :
+ return self._fd
+
+ # XXX: this is almost identical to pvl.socket.ReadStream
+ def read (self, n=512) :
+ """
+ Read up to n bytes.
+
+ Returns None if we would block.
+ Raises EOFError on EOF, or closed.
+ """
+
+ try :
+ buf = os.read(self.fileno(), n)
+
+ except OSError as ex :
+ # block?
+ if ex.errno == errno.EAGAIN :
+ # empty
+ buf = None
+
+ else :
+ raise
+
+ log.debug("%s: %s", self, buf)
+
+ if buf is None :
+ return None
+ elif buf :
+ return buf
+ else :
+ raise EOFError()
+
+ def readline (self) :
+ """
+ Read and return next waiting line from input.
+
+ Line is returned without trailing '\n'.
+
+ Returns None if there is no line available.
+ Raises EOFError if the fifo write end was closed.
+ """
+
+ while '\n' not in self._buf :
+ # read chunk
+ read = self.read()
+
+ if read is None :
+ return None
+
+ self._buf += read
+
+ # split out one line
+ line, self._buf = self._buf.split('\n', 1)
+
+ log.debug("%s", line)
+
+ return line
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Re-opens the FIFO on EOF.
+
+ Returns None if there was no more input available, or the fifo was re-opened after EOF.
+ """
+
+ while True :
+ # pull line
+ line = self.readline()
+
+ if line :
+ yield line
+ else :
+ return # block
+
+ __iter__ = readlines
+
+ def close (self) :
+ """
+ Close our fd, if open.
+
+ May be open()'d again. Meanwhile, all operatations will raise EOFError.
+
+ log.warn's if already closed.
+ """
+
+ if self._fd is None :
+ log.warn("%s: already closed", self)
+
+ else :
+ log.debug("%s: %s", self, self._fd)
+
+ os.close(self._fd)
+ self._fd = None
+
+ def __str__ (self) :
+ return "pipe({self._fd})".format(self=self)
+
+class Fifo (Pipe) :
+ """
+ A named pipe(7) on the filesystem.
+
+ Supports reading lines in a non-blocking fashion, and re-opening on EOF.
+ """
+
+ def __init__ (self, path) :
+ self.path = path
+ Pipe.__init__(self, self._open())
+
+ def _open (self) :
+ """
+ Open the internal fd (nonblocking).
+ """
+
+ fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+ log.debug("%s: open: %s", self, fd)
+
+ return fd
+
+ def open (self) :
+ """
+ Re-open the FIFO.
+
+ Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
+ and resume nonblocking mode.
+
+ Raises ValueError() if already open. close() first.
+ """
+
+ Pipe.open(self, self._open())
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Re-opens the FIFO on EOF.
+
+ Returns None if there was no more input available, or the fifo was re-opened after EOF.
+ """
+
+ while True :
+ try :
+ # pull line
+ line = self.readline()
+
+ except EOFError :
+ log.debug("%s: EOF: reopen", self)
+
+ # reopen and go back to waiting
+ self.close()
+ self.open()
+
+ return
+
+ if line is None :
+ log.debug("%s: EOF: wait", self)
+ return # wait
+ else :
+ yield line
+
+ __iter__ = readlines
+
+ def __str__ (self) :
+ return self.path
+
+ # XXX: we need to figure out what references we have lying around, and clean those out!
+ def __del__ (self) :
+ """
+ Cleanup
+ """
+
+ if self._fd is not None :
+ self.close()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/file.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,98 @@
+"""
+ Iterate over lines in file-like objects (without buffering lines!), write (flushing output).
+"""
+
+import logging; log = logging.getLogger('pvl.syslog.file')
+
+class File (object) :
+ """
+ Follow a file-like object, reading lines until no more are available. Never raises EOFError.
+
+ Works with python file objects that buffer readlines() when using e.g. `tail -f ... | python -u ...`.
+
+ readline() may block once there is no more input available, or may return None for evermore.
+
+ There is no fileno(), this is not pollable. At all. Don't even iterate on this with a timeout.
+
+ TODO: it would be nice if this raised EOFError (to avoid bugs with polling this infinitely), but at least
+ the first readlines() must complete normally
+ """
+
+ @classmethod
+ def open (cls, path, mode='r', **opts) :
+ log.debug("%s", path)
+
+ return cls(open(path, mode), **opts)
+
+ EOL = '\n'
+
+ def __init__ (self, file) :
+ log.debug("%s", file)
+
+ self.file = file
+
+ def readline (self) :
+ """
+ Reads a line from the file, without trailing \n.
+
+ Returns None on EOF.
+ """
+
+ line = self.file.readline()
+
+ if not line :
+ line = None
+ else :
+ line = line.rstrip('\r\n')
+
+ log.debug("%s", line)
+
+ return line
+
+ def readlines (self) :
+ """
+ Reads any available lines from the file.
+ """
+
+ while True :
+ line = self.readline()
+
+ if line is None :
+ log.debug("%s: eof", self)
+ return
+ else :
+ yield line
+
+ __iter__ = readlines
+
+ def writeline (self, line, eol=EOL) :
+ """
+ Write out line.
+ """
+
+ log.debug("%s", line)
+
+ self.file.write(str(line))
+ self.file.write(eol)
+
+ def __call__ (self, *lines) :
+ """
+ Write out lines, and flush.
+ """
+
+ for line in lines :
+ self.writeline(line)
+
+ self.file.flush()
+
+ def close (self) :
+ """
+ Close our file. Further operations raise ValueError.
+ """
+
+ log.debug("%s", self)
+ self.file.close()
+
+ def __str__ (self) :
+ # XXX: optional attr?
+ return self.file.name
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/filter.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,157 @@
+import logging; log = logging.getLogger('pvl.syslog.filter')
+
+import re # XXX
+import os.path, fnmatch
+
+class SyslogFilter (object) :
+ """
+ Match syslog messages fields against given patterns.
+ """
+
+ @classmethod
+ def build (cls, **filters) :
+ """
+ Match using given non-None fields.
+ """
+
+ # drop None's
+ return cls(dict((attr, regex) for attr, regex in filters.iteritems() if regex is not None))
+
+ def __init__ (self, filters) :
+ """
+ Match using given { field: regex }.
+ """
+
+ self.filters = filters
+
+ def match_glob (self, attr, glob, value=None) :
+ """
+ Match prog as glob.
+ """
+
+ if not glob :
+ return { attr: value }
+
+ if not value :
+ # require
+ return False
+
+ # normalize
+ value = value.strip()
+
+ # match
+ if fnmatch.fnmatch(value, glob) :
+ return { attr: value }
+ else :
+ return False
+
+ match_facility = match_glob
+
+ def match_prog (self, attr, glob, prog=None) :
+ """
+ Match prog as glob.
+ """
+
+ if prog :
+ # normalize
+ prog = prog.strip().lower()
+
+ if prog.startswith('/') :
+ # leaves postfix/* intact, but fixes /usr/bin/cron
+ _, prog = os.path.split(prog)
+
+ # match
+ return self.match_glob(attr, glob, prog)
+
+ REGEX_TYPE = type(re.compile(''))
+
+ def match_regex (self, attr, regex, value=None) :
+ """
+ Match given value against given pattern.
+ """
+
+ if not regex :
+ return { attr: value }
+
+ if not value :
+ # XXX: optional = match empty string?
+ value = ''
+ else :
+ # normalize; XXX: unicode?
+ value = str(value).strip()
+
+ # match
+ match = regex.match(value)
+
+ if not match :
+ return False
+
+ # as match-values
+ matches = { attr: match.group(0) } # whole match
+ matches.update(match.groupdict())
+
+ # TODO match.expand?
+
+ return matches
+
+ def filter (self, item) :
+ """
+ Match given item. Returns any matched values (including regexp capture groups) across all fields.
+ """
+
+ match = None
+ matches = {}
+
+ for attr in self.filters :
+ # filter
+ filter = self.filters[attr]
+
+ # lookup match-func
+ match = getattr(self, 'match_{attr}'.format(attr=attr), None)
+
+ if match :
+ pass
+
+ elif isinstance(filter, self.REGEX_TYPE) :
+ match = self.match_regex
+
+ else :
+ match = self.match_glob
+
+ # apply match
+ if attr in item :
+ match = match(attr, filter, item[attr])
+ else :
+ match = match(attr, filter)
+
+ log.debug("%s: %s", attr, match)
+
+ if match :
+ # match
+ matches.update(match)
+
+ else :
+ # reject
+ return
+
+ # test last match
+ if match is None :
+ # empty filter -> all None
+ return True
+ else :
+ return matches
+
+ def process (self, items) :
+ for item in items:
+ match = self.filter(item)
+
+ if match :
+ yield item
+
+ __call__ = process
+
+ def __nonzero__ (self) :
+ return bool(self.filters)
+
+ def __repr__ (self) :
+ return repr(self.filters)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/parser.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,234 @@
+import datetime, time
+import re
+
+import logging; log = logging.getLogger('pvl.syslog.parser')
+
+RFC3339_RE = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d+)?(Z|[+-]\d{2}:\d{2})?')
+RFC3339_FMT = '%Y-%m-%dT%H:%M:%S'
+
+def rfc3339 (timestamp) :
+ """
+ RFC3339 timestamps as used in some syslog implementations.
+
+ Returns a datetime in some random timezone, possibly localtime.
+ """
+
+ match = RFC3339_RE.match(timestamp)
+
+ if not match :
+ return None
+
+ # parts
+ dt = datetime.datetime.strptime(match.group(1), RFC3339_FMT)
+ tz = match.group(2)
+
+ # TODO: timezone?
+ return dt
+
+ if not tz :
+ # XXX: localtime
+ return dt
+
+ elif tz == 'Z' :
+ # UTC
+ pass
+
+ elif tz[0] in '+-' :
+ hours, minutes = tz[1:].split(':')
+ td = datetime.timedelta(hours=int(hours), minutes=int(minutes))
+
+ if tz[0] == '-' :
+ dt += td
+ if tz[0] == '+' :
+ dt -= td
+ else :
+ raise ValueError("Invalid timezone offset: %s" % timestamp)
+
+ # XXX: UTC
+ return dt
+
+RFC3164_RE = re.compile(r'\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}')
+RFC3164_FMT = '%b %d %H:%M:%S'
+RFC3164_PRE = '%Y ' # add missing year, assuming current
+
+def rfc3164 (timestamp) :
+ """
+ Traditional BSD Syslog timestamps.
+
+ Returns a datetime assumed to be in localtime.
+ """
+
+ if not RFC3164_RE.match(timestamp) :
+ return
+
+ return datetime.datetime.strptime(time.strftime(RFC3164_PRE) + timestamp, RFC3164_PRE + RFC3164_FMT)
+
+class SyslogParser (object) :
+ """
+ Parse syslog lines in text format, as used in logfiles/fifos.
+ """
+
+ SEVERITIES = dict(enumerate((
+ 'emerg',
+ 'alert',
+ 'crit',
+ 'err',
+ 'warning',
+ 'notice',
+ 'info',
+ 'debug',
+ )))
+
+ FACILITIES = dict(enumerate((
+ 'kern', # 0
+ 'user', # 1
+ 'mail', # 2
+ 'daemon', # 3
+ 'auth', # 4
+ 'syslog', # 5
+ 'lpr', # 6
+ 'news', # 7
+ 'uucp', # 8
+ 'cron', # 9
+ 'authpriv', # 10
+ 'ftp', # 11
+ 'ntp', # 12
+ 'audit', # 13
+ 'alert', # 14
+ 'clock', # 15
+ 'local0', # 16
+ 'local1', # 17
+ 'local2', # 18
+ 'local3', # 19
+ 'local4', # 20
+ 'local5', # 21
+ 'local6', # 22
+ 'local7', # 23
+ )))
+
+ # default syslogd format
+ SYSLOG_RE = re.compile(
+ # the timestamp+hostname header
+ # XXX: hostname may be missing
+ # at least in Ubuntu 11.10 syslogd 'last message repeated 2 times'...
+ r'(?:<(?P<pri>\d+|(?P<facility>\w+)\.(?P<severity>\w+))>)?'
+ + r'(?P<timestamp>\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}|.+?) '
+ + r'(?P<hostname>\S+)? '
+
+ # the message, including possible tag/pid
+ + r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
+ )
+
+ def __init__ (self, raw=False, facility=None, severity=None) :
+ """
+ Using given facility/severity as default.
+ """
+
+ self.raw = raw
+ self.facility = facility
+ self.severity = severity
+
+ def parse_pri (self, match) :
+ """
+ Parse pri/facility/severity.
+ """
+
+ pri = match.group('pri')
+ facility = match.group('facility') or self.facility
+ severity = match.group('severity') or self.severity
+
+ if pri and pri.isdigit() :
+ pri = int(pri)
+ facility, severity = divmod(pri, 8)
+
+ return dict(
+ pri = pri,
+ severity = self.SEVERITIES.get(severity, severity),
+ facility = self.FACILITIES.get(facility, facility)
+ )
+
+ def parse_timestamp (self, match) :
+ """
+ Parse timstamp from line into datetime.
+ """
+
+ timestamp = match.group('timestamp')
+
+ # timestamp, in various formats
+ try :
+ return rfc3164(timestamp) or rfc3339(timestamp)
+
+ except ValueError as ex:
+ # skip it
+ log.warning("timestamp: %s:", timestamp, exc_info=ex)
+ return None
+
+ def parse_prog (self, match) :
+ """
+ Parse prog from line.
+ """
+
+ prog = match.group('program')
+
+ if prog :
+ return prog
+ else :
+ # no tag
+ return None
+
+ def parse (self, line) :
+ """
+ Parse given input line into SyslogMessage.
+ """
+
+ # ignore whitespace
+ line = line.strip()
+
+ # timestamp?
+ if self.raw :
+ # from defaults
+ return dict(
+ timestamp = datetime.datetime.now(), # XXX: None?
+ host = None,
+ prog = None,
+ pid = None,
+ msg = line,
+ )
+
+ else :
+ # parse
+ match = self.SYSLOG_RE.match(line)
+
+ if not match :
+ log.warn("Unparseable syslog message: %r", line)
+ return
+
+ # parse
+ item = dict(
+ timestamp = self.parse_timestamp(match),
+ host = match.group('hostname'),
+ prog = self.parse_prog(match),
+ pid = match.group('pid'),
+ msg = match.group('text'),
+ )
+
+ # facility/severity prefix?
+ item.update(self.parse_pri(match))
+
+ return item
+
+ def process (self, lines) :
+ """
+ Yield SyslogMessages from given series of lines.
+ """
+
+ for line in lines :
+ item = self.parse(line)
+
+ log.debug("%s", item)
+
+ if item :
+ yield item
+
+ __call__ = process
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/rule.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,216 @@
+from pvl.syslog.filter import SyslogFilter
+
+import re
+
+import optparse, sys
+import configobj
+
+import logging; log = logging.getLogger('pvl.syslog.rule')
+
+def parser (parser) :
+ """
+ Optparse option group.
+ """
+
+ syslog_rules = optparse.OptionGroup(parser, "Syslog rules")
+
+ syslog_rules.add_option('--syslog-rules', metavar='FILE',
+ help="Load syslog rules from file")
+
+ return syslog_rules
+
+def apply (options) :
+ """
+ Build SyslogRules from options.
+ """
+
+ if options.syslog_rules :
+ return SyslogRule.load(open(options.syslog_rules))
+
+ else :
+ return SyslogRule('default', formats={ 'text': '{msg}' })
+
+def merge (*dicts, **kwargs) :
+ return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
+
+# TODO: combine SyslogRule/Rules into one heirarchial SyslogRule -type?
+class SyslogRule (object) :
+ """
+ A named SyslogFilter with sub-rules.
+ """
+
+ @classmethod
+ def load (cls, file) :
+ """
+ Load SyslogRule from file.
+ """
+
+ config = configobj.ConfigObj(file)
+
+ return cls.config_section(file.name, config)
+
+ @classmethod
+ def config_section (cls, name, section) :
+ """
+ Recursively load Syslogrules from config section.
+ """
+
+ rules = [cls.config_section(subsection, section[subsection]) for subsection in section.sections]
+ attrs = dict((name, section[name]) for name in section.scalars)
+
+ try :
+ return cls.config(name, rules, **attrs)
+
+ except ValueError as ex :
+ raise ValueError("[%s] %s" % (name, ex))
+
+ @classmethod
+ def config_filters (cls, program=None, facility=None, pattern=None, **filters) :
+ """
+ Return filter expression from given attr/value in config.
+ """
+
+ # XXX: get rid of these special cases
+ if facility :
+ yield 'facility', facility # glob
+
+ if program :
+ yield 'prog', program # glob
+
+ if pattern :
+ filters['msg'] = pattern
+
+ # generic
+ for attr, value in filters.iteritems() :
+ try :
+ # regex
+ yield attr, re.compile(value)
+
+ except re.error as ex :
+ raise ValueError("%s: %s" % (attr, ex))
+
+ @classmethod
+ def config (cls, name, rules=None, format=None, irk=None, **filters) :
+ """
+ Build SyslogRule from config options
+ """
+
+ if format is not None :
+ format = { 'text': format }
+ else :
+ format = { }
+
+ if irk :
+ format['irk'] = irk
+
+ filters = dict(cls.config_filters(**filters))
+
+ filter = SyslogFilter(filters)
+
+ log.debug("%s: %s %s", name, rules, filter)
+
+ return cls(name, rules, filter, format)
+
+ def __init__ (self, name, rules=None, filter=None, formats=None) :
+ self.name = name
+ self.rules = rules or [] # sub-rules
+ self.filter = filter # SyslogFilter
+ self.formats = formats or {}
+
+ def match (self, item) :
+ """
+ Match item against our filter, returning match-dict (empty?) or None.
+ """
+
+ if self.filter :
+ # filter
+ matches = self.filter.filter(item)
+
+ else :
+ # match all, we probably have sub-rules that we're interested in
+ return { }
+
+ log.debug("%s: %s", self, matches)
+
+ if matches :
+ return matches
+ else :
+ # no match
+ return None
+
+ def format (self, item) :
+ """
+ Apply our output formats to given base apply, yielding (unique) attr, value tuples.
+ """
+
+ for attr, format in self.formats.iteritems() :
+ value = format.format(**item)
+
+ log.debug("%s: %s: %s", self, attr, value)
+
+ yield attr, value
+
+ def apply (self, item) :
+ """
+ Recursively match item against ourself and sub-rules. Returns applied output.
+
+ Matches are passed down the tree, and applies are passed up.
+ """
+
+ log.debug("%s", self)
+
+ # match rule -> matches
+ matches = self.match(item)
+
+ if matches is None :
+ # skip
+ return None, None, None
+
+ # merge matches down
+ item = merge(item, matches)
+
+ # recursive sub-rules -> apply
+ for rule in self.rules :
+ try :
+ # pass matches down
+ match, rules, apply = rule.apply(item)
+
+ except Exception as ex :
+ log.exception("%s -> %s: %r", self, rule, item)
+ continue # XXX: skip?
+
+ if apply :
+ # pass apply up
+ break
+ else :
+ # self-match
+ match, rules, apply = item, [], { }
+
+ rules.append(self)
+
+ # formats?
+ if self.formats :
+ # merge apply up
+ apply = merge(dict(self.format(item)), apply)
+
+ log.debug("%s: %s", '/'.join(str(rule) for rule in rules), apply)
+
+ return match, rules, apply
+
+ def __iter__ (self, items) :
+ """
+ Apply items against our rules, yielding any matches.
+ """
+
+ for item in items :
+ match, rules, apply = self.apply(item)
+
+ if apply :
+ yield apply
+
+ def __str__ (self) :
+ return self.name
+
+ def __repr__ (self) :
+ return 'SyslogRule({self.name}, ...)'.format(self=self)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/syslog.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,126 @@
+"""
+ Syslog handling.
+
+ XXX: this belongs in pvl.syslog.source (apart from __iter__?)
+"""
+
+import select
+
+import logging; log = logging.getLogger('pvl.syslog.source')
+
+class SyslogSource (object) :
+ """
+ Process syslog input from a given source.
+
+ Implements an iterable mainloop doing continuous polling on the source, using either a timeout or
+ select():able source.
+ """
+
+ def __init__ (self, source, parser, filter, poll=None) :
+ """
+ Using given underlying line source.
+
+ source - source to select() if poll=True
+ poll - polling behaviour for source
+ """
+
+ self.source = source
+ self.parser = parser
+ self.filter = filter
+
+ self.poll = poll
+
+ def __iter__ (self) :
+ """
+ Yield available input.
+
+ Raises EOFError if source has been closed.
+ """
+
+ return self.filter(self.parser(self.source))
+
+ def fileno (self) :
+ return self.source.fileno()
+
+ def select (self, poll=None, reading=(), writing=()) :
+ """
+ Poll our source for input, with given polling behaviour:
+ True - select() on source
+ False - peek on source
+ float - timeout in seconds
+
+ Returns None on unknown, empty sequence on timeout, list of readables on select.
+ """
+
+ if poll is True :
+ timeout = None # block
+ reading += (self, ) # source.fileno()
+
+ elif not poll :
+ timeout = 0.0 # do not block
+
+ else :
+ timeout = float(poll)
+
+ log.debug("%s (%s)", reading, timeout)
+
+ # select
+ readable, writeable, ex = select.select(reading, writing, [], timeout)
+
+ log.debug("select: %s", readable)
+
+ if readable :
+ return readable
+
+ elif reading :
+ # timeout
+ # XXX: this is the same as readable
+ return ()
+
+ else :
+ # unknown
+ return None
+
+ def main (self, poll=None) :
+ """
+ Yield active syslog sources, polling as given.
+
+ Returns once no more lines are available.
+
+ XXX: reconnect? or source takes care of that..
+ TODO: SIGINT -> finish iteration and return?
+ """
+
+ # from __init__
+ # note that we must interpret poll here, since False -> never poll
+ if poll is None :
+ poll = self.poll
+
+ # mainloop
+ while True :
+ # caller is responsible for reading them!
+ yield self
+
+ # poll
+ if poll :
+ # wait
+ self.select(poll)
+
+ else :
+ # done
+ break
+
+ log.debug("exit")
+
+ def close (self) :
+ """
+ Close the syslog source, if possible.
+
+ """
+
+ # XXX: do all sources support close?
+ self.source.close()
+
+ def __str__ (self) :
+ return ' | '.join((str(self.source), str(self.parser), str(self.filter)))
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/tail.py Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,150 @@
+"""
+ Iterate over input lines in filesystem files.
+"""
+
+import os
+
+import logging; log = logging.getLogger('pvl.syslog.tail')
+
+class Tail (object) :
+ """
+ Follow a file on the filesystem, reading lines until EOF, and re-opening if replaced.
+
+ Never blocks, no fileno() to poll. Just poll(timeout=POLL).
+
+ Not writable.
+ """
+
+ POLL = 2.0
+
+ def __init__ (self, path, skip=None, **opts) :
+ log.debug("%s", path)
+
+ self.path = path
+ self.file = self.stat = None # closed
+
+ self.open()
+
+ if skip :
+ self.skip()
+
+ def _stat (self) :
+ """
+ Return a key identifying the file at our path.
+ """
+
+ st = os.stat(self.path)
+
+ stat = st.st_dev, st.st_ino
+
+ return stat
+
+ def _open (self) :
+ """
+ Return the opened file.
+ """
+
+ return open(self.path, 'r')
+
+ def open (self) :
+ """
+ Re-opens our file when closed.
+
+ Raises ValueError if already open.
+ """
+
+ if self.file is None :
+ # XXX: use fstat for "atomic" open+stat?
+ self.file = self._open()
+ self.stat = self._stat()
+
+ log.debug("%s: %s: %s", self, self.file, self.stat)
+
+ else :
+ raise ValueError("%s: open already open tail" % (self, ))
+
+ def changed (self) :
+ """
+ Has the underlying file changed?
+ """
+
+ return self._stat() != self.stat
+
+ def readline (self) :
+ """
+ Reads a line from the file, without trailing \n.
+
+ Returns None on EOF.
+ """
+
+ line = self.file.readline()
+
+ if not line :
+ line = None
+ else :
+ line = line.rstrip('\r\n')
+
+ log.debug("%s", line)
+
+ return line
+
+ def readlines (self, eof_mark=False) :
+ """
+ Reads any available lines from the file.
+
+ Reopens the file on EOF if the underlying file changed.
+
+ eof_mark: yields a special None line when this happens.
+ """
+
+ while True :
+ line = self.readline()
+
+ if line is not None :
+ yield line
+
+ elif self.changed() :
+ log.debug("EOF: reopen")
+
+ self.close()
+ self.open()
+
+ if eof_mark :
+ yield None # special token
+
+ # keep going
+ continue
+
+ else :
+ log.debug("EOF: wait")
+ break
+
+ __iter__ = readlines
+
+ def skip (self) :
+ """
+ Skip any available lines.
+ """
+
+ log.debug("%s", self)
+
+ for line in self.readlines() :
+ pass
+
+ def close (self) :
+ """
+ Close our file, if open. Further operations raise ValueError.
+
+ log.warn's if already closed.
+ """
+
+ if self.file :
+ log.debug("%s", self)
+ self.file.close()
+ self.file = None
+ else :
+ log.warn("%s: close on already closed tail", self)
+
+ def __str__ (self) :
+ return self.path
+
--- a/setup.py Tue Feb 19 19:43:47 2013 +0200
+++ b/setup.py Tue Feb 19 20:10:21 2013 +0200
@@ -18,6 +18,7 @@
# code
packages = [
'pvl',
+ 'pvl.syslog',
],
# binaries