import pvl.syslog from pvl-verkko
authorTero Marttila <terom@paivola.fi>
Tue, 19 Feb 2013 20:10:21 +0200
changeset 2 5a8a32cbc944
parent 1 ce931075b69e
child 3 cfe1b58f5d80
import pvl.syslog from pvl-verkko
pvl/syslog/__init__.py
pvl/syslog/args.py
pvl/syslog/fifo.py
pvl/syslog/file.py
pvl/syslog/filter.py
pvl/syslog/parser.py
pvl/syslog/rule.py
pvl/syslog/syslog.py
pvl/syslog/tail.py
setup.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/args.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,101 @@
+import optparse, sys
+
+from pvl.syslog.parser import SyslogParser
+from pvl.syslog.filter import SyslogFilter
+from pvl.syslog.syslog import SyslogSource
+from pvl.syslog import fifo, tail, file
+
+# XXX: use optparse parser.error()?
+import logging; log = logging.getLogger('pvl.syslog.args')
+
+def parser (parser, prog=None) :
+    """
+        Optparse option group
+
+            prog        - filter to only process lines from given process
+    """
+    
+    syslog = optparse.OptionGroup(parser, 'Syslog collector')
+
+    syslog.add_option('--syslog-fifo',          metavar='PATH',
+            help="Read syslog messages from given fifo")
+
+    syslog.add_option('--syslog-file',          metavar='FILE',
+            help="Read syslog messages from given file")
+
+    syslog.add_option('--syslog-tail',          metavar='FILE',
+            help="Continuously poll syslog messages given file")
+
+    syslog.add_option('--syslog-stdin',         action='store_true',
+            help="Read syslog messages from stdin")
+
+    syslog.add_option('--syslog-raw',           action='store_true',
+            help="Parse raw syslog lines without timestamp/etc")
+
+    syslog.add_option('--syslog-facility',      metavar='FACILITY',
+            help="Set/filter by given facility")
+
+    syslog.add_option('--syslog-severity',      metavar='SEVERITY',
+            help="Set given facility")
+
+    syslog.add_option('--syslog-prog',          metavar='PROG',     default=prog,
+            help="Filter by given prog: %default")
+
+    return syslog
+
+def apply (options, optional=False) :
+    """
+        Handle options, returning a SyslogSource, if any.
+
+        May log.error/sys.exit
+    """
+    
+    # XXX: this belongs in pvl.syslog.source
+    if options.syslog_fifo :
+        # fifo pipe
+        source = fifo.Fifo(options.syslog_fifo)
+        poll = True # select(source)
+
+    elif options.syslog_tail :
+        # tail file
+        source = tail.Tail(options.syslog_tail, skip=True)
+        poll = tail.Tail.POLL # select(float)
+
+    elif options.syslog_file :
+        # read file
+        source = file.File(open(options.syslog_file))
+        poll = False # do not loop, just read up to EOF
+
+    elif options.syslog_stdin :
+        # read pipe
+        source = fifo.Pipe.file(sys.stdin) # puts stdin into non-blocking mode
+        poll = True # select(source)
+
+    elif optional :
+        return None
+
+    else :
+        # from stdin
+        if sys.stdin.isatty() :
+            log.warning("Reading syslog messages from TTY?")
+        
+        source = file.File(sys.stdin)
+        poll = False # XXX: tty vs pipe vs file? False -> just block
+    
+    # options
+    parser = SyslogParser(
+        raw         = options.syslog_raw,
+        facility    = options.syslog_facility,
+        severity    = options.syslog_severity,
+    )
+    
+    # TODO: filter optional
+    filter = SyslogFilter.build(
+        # glob pattern
+        prog        = options.syslog_prog,
+        facility    = options.syslog_facility,
+        #severity   = options.sylog_severity,   # XXX: match as greater-than?
+    )
+
+    # polling
+    return SyslogSource(source, parser, filter, poll)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/fifo.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,256 @@
+"""
+    Non-blocking fifo reads.
+"""
+
+import os
+import errno
+import fcntl
+
+import logging
+
+log = logging.getLogger('pvl.syslog.fifo')
+
+class Pipe (object) :
+    """
+        A pipe from a fd.
+
+        Supports reading lines in a non-blocking fashion.
+    """
+
+    @classmethod
+    def file (cls, file) :
+        """
+            Create Pipe from file, e.g. sys.stdin.
+
+            Puts fd into nonblocking mode, which means that the given file will stop working!
+        """
+        
+        fd = file.fileno()
+
+        log.debug("%s: %s", file, fd)
+
+        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+        fl |= os.O_NONBLOCK
+        fcntl.fcntl(fd, fcntl.F_SETFL, fl)
+        
+        return cls(fd)
+
+    def __init__ (self, fd) :
+        """
+            May pass fd=None to open as closed.
+        """
+
+        self._fd = fd
+        self._buf = ''
+        
+        log.debug("pipe: %d", fd)
+
+    def open (self, fd) :
+        """
+            re-open closed pipe to use the given fd.
+
+            Raises ValueError if already open.
+        """
+
+        if self._fd is None :
+            self._fd = fd
+        else :
+            raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
+    
+    # XXX: good idea?
+    def __nonzero__ (self) :
+        """
+            Test if we are open.
+
+            XXX: signal EOF as well?
+        """
+
+        return self._fd is not None
+
+    def fileno (self) :
+        """
+            Return the internal fd.
+
+            Raises ValueError if we are closed.
+            XXX: EOFError?
+        """
+
+        if self._fd is None :
+            raise ValueError("I/O operation on closed pipe: %s" % (self, ))
+        else :
+            return self._fd
+    
+    # XXX: this is almost identical to pvl.socket.ReadStream
+    def read (self, n=512) :
+        """
+            Read up to n bytes.
+            
+            Returns None if we would block.
+            Raises EOFError on EOF, or closed.
+        """
+
+        try :
+            buf = os.read(self.fileno(), n)
+
+        except OSError as ex :
+            # block?
+            if ex.errno == errno.EAGAIN :
+                # empty
+                buf = None
+
+            else :
+                raise
+
+        log.debug("%s: %s", self, buf)
+
+        if buf is None :
+            return None
+        elif buf :
+            return buf
+        else :
+            raise EOFError()
+
+    def readline (self) :
+        """
+            Read and return next waiting line from input.
+
+            Line is returned without trailing '\n'.
+
+            Returns None if there is no line available.
+            Raises EOFError if the fifo write end was closed.
+        """
+
+        while '\n' not in self._buf :
+            # read chunk
+            read = self.read()
+
+            if read is None :
+                return None
+            
+            self._buf += read
+        
+        # split out one line
+        line, self._buf = self._buf.split('\n', 1)
+
+        log.debug("%s", line)
+
+        return line
+
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+            
+            Re-opens the FIFO on EOF.
+
+            Returns None if there was no more input available, or the fifo was re-opened after EOF.
+        """
+
+        while True :
+            # pull line
+            line = self.readline()
+
+            if line :
+                yield line
+            else :
+                return # block
+
+    __iter__ = readlines
+
+    def close (self) :
+        """
+            Close our fd, if open.
+
+            May be open()'d again. Meanwhile, all operatations will raise EOFError.
+
+            log.warn's if already closed.
+        """
+        
+        if self._fd is None :
+            log.warn("%s: already closed", self)
+
+        else :
+            log.debug("%s: %s", self, self._fd)
+
+            os.close(self._fd)
+            self._fd = None
+
+    def __str__ (self) :
+        return "pipe({self._fd})".format(self=self)
+
+class Fifo (Pipe) :
+    """
+        A named pipe(7) on the filesystem.
+
+        Supports reading lines in a non-blocking fashion, and re-opening on EOF.
+    """
+
+    def __init__ (self, path) :
+        self.path = path
+        Pipe.__init__(self, self._open())
+
+    def _open (self) :
+        """
+            Open the internal fd (nonblocking).
+        """
+        
+        fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+        log.debug("%s: open: %s", self, fd)
+
+        return fd
+   
+    def open (self) :
+        """
+            Re-open the FIFO.
+            
+            Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
+            and resume nonblocking mode.
+            
+            Raises ValueError() if already open. close() first.
+        """
+
+        Pipe.open(self, self._open())
+
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+            
+            Re-opens the FIFO on EOF.
+
+            Returns None if there was no more input available, or the fifo was re-opened after EOF.
+        """
+
+        while True :
+            try :
+                # pull line
+                line = self.readline()
+
+            except EOFError :
+                log.debug("%s: EOF: reopen", self)
+
+                # reopen and go back to waiting
+                self.close()
+                self.open()
+
+                return
+            
+            if line is None :
+                log.debug("%s: EOF: wait", self)
+                return # wait
+            else :
+                yield line
+    
+    __iter__ = readlines
+
+    def __str__ (self) :
+        return self.path
+
+    # XXX: we need to figure out what references we have lying around, and clean those out!
+    def __del__ (self) :
+        """
+            Cleanup
+        """
+
+        if self._fd is not None :
+            self.close()
+    
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/file.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,98 @@
+"""
+    Iterate over lines in file-like objects (without buffering lines!), write (flushing output).
+"""
+
+import logging; log = logging.getLogger('pvl.syslog.file')
+
+class File (object) :
+    """
+        Follow a file-like object, reading lines until no more are available. Never raises EOFError.
+
+        Works with python file objects that buffer readlines() when using e.g. `tail -f ... | python -u ...`.
+
+        readline() may block once there is no more input available, or may return None for evermore.
+
+        There is no fileno(), this is not pollable. At all. Don't even iterate on this with a timeout.
+
+        TODO:   it would be nice if this raised EOFError (to avoid bugs with polling this infinitely), but at least
+                the first readlines() must complete normally
+    """
+
+    @classmethod
+    def open (cls, path, mode='r', **opts) :
+        log.debug("%s", path)
+
+        return cls(open(path, mode), **opts)
+
+    EOL = '\n'
+
+    def __init__ (self, file) :
+        log.debug("%s", file)
+
+        self.file = file
+        
+    def readline (self) :
+        """
+            Reads a line from the file, without trailing \n.
+
+            Returns None on EOF.
+        """
+
+        line = self.file.readline()
+
+        if not line :
+            line = None
+        else : 
+            line = line.rstrip('\r\n')
+
+        log.debug("%s", line)
+
+        return line
+
+    def readlines (self) :
+        """
+            Reads any available lines from the file.
+        """
+
+        while True :
+            line = self.readline()
+            
+            if line is None :
+                log.debug("%s: eof", self)
+                return
+            else :
+                yield line
+
+    __iter__ = readlines
+
+    def writeline (self, line, eol=EOL) :
+        """
+            Write out line.
+        """
+
+        log.debug("%s", line)
+
+        self.file.write(str(line))
+        self.file.write(eol)
+
+    def __call__ (self, *lines) :
+        """
+            Write out lines, and flush.
+        """
+
+        for line in lines :
+            self.writeline(line)
+
+        self.file.flush()
+
+    def close (self) :
+        """
+            Close our file. Further operations raise ValueError.
+        """
+        
+        log.debug("%s", self)
+        self.file.close()
+
+    def __str__ (self) :
+        # XXX: optional attr?
+        return self.file.name
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/filter.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,157 @@
+import logging; log = logging.getLogger('pvl.syslog.filter')
+
+import re # XXX
+import os.path, fnmatch
+
+class SyslogFilter (object) :
+    """
+        Match syslog messages fields against given patterns.
+    """
+    
+    @classmethod
+    def build (cls, **filters) :
+        """
+            Match using given non-None fields.
+        """
+
+        # drop None's
+        return cls(dict((attr, regex) for attr, regex in filters.iteritems() if regex is not None))
+
+    def __init__ (self, filters) :
+        """
+            Match using given { field: regex }.
+        """
+
+        self.filters = filters
+
+    def match_glob (self, attr, glob, value=None) :
+        """
+            Match prog as glob.
+        """
+        
+        if not glob :
+            return { attr: value }
+
+        if not value :
+            # require
+            return False
+
+        # normalize
+        value = value.strip()
+
+        # match
+        if fnmatch.fnmatch(value, glob) :
+            return { attr: value }
+        else :
+            return False
+ 
+    match_facility = match_glob
+
+    def match_prog (self, attr, glob, prog=None) :
+        """
+            Match prog as glob.
+        """
+
+        if prog :
+            # normalize
+            prog = prog.strip().lower()
+    
+            if prog.startswith('/') :
+                # leaves postfix/* intact, but fixes /usr/bin/cron
+                _, prog = os.path.split(prog)
+
+        # match
+        return self.match_glob(attr, glob, prog)
+
+    REGEX_TYPE = type(re.compile(''))
+
+    def match_regex (self, attr, regex, value=None) :
+        """
+            Match given value against given pattern.
+        """
+
+        if not regex :
+            return { attr: value }
+
+        if not value :
+            # XXX: optional = match empty string?
+            value = ''
+        else :
+            # normalize; XXX: unicode?
+            value = str(value).strip()
+        
+        # match
+        match = regex.match(value)
+
+        if not match :
+            return False
+
+        # as match-values
+        matches = { attr: match.group(0) } # whole match
+        matches.update(match.groupdict())
+
+        # TODO match.expand?
+
+        return matches
+
+    def filter (self, item) :
+        """
+            Match given item. Returns any matched values (including regexp capture groups) across all fields.
+        """
+
+        match = None
+        matches = {}
+
+        for attr in self.filters :
+            # filter
+            filter = self.filters[attr]
+
+            # lookup match-func
+            match = getattr(self, 'match_{attr}'.format(attr=attr), None)
+
+            if match :
+                pass
+
+            elif isinstance(filter, self.REGEX_TYPE) :
+                match = self.match_regex
+
+            else :
+                match = self.match_glob
+
+            # apply match
+            if attr in item :
+                match = match(attr, filter, item[attr])
+            else :
+                match = match(attr, filter)
+
+            log.debug("%s: %s", attr, match)
+
+            if match :
+                # match
+                matches.update(match)
+            
+            else :
+                # reject
+                return
+        
+        # test last match
+        if match is None :
+            # empty filter -> all None
+            return True
+        else :
+            return matches
+
+    def process (self, items) :
+        for item in items:
+            match = self.filter(item)
+
+            if match :
+                yield item
+    
+    __call__ = process
+
+    def __nonzero__ (self) :
+        return bool(self.filters)
+
+    def __repr__ (self) :
+        return repr(self.filters)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/parser.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,234 @@
+import datetime, time
+import re
+
+import logging; log = logging.getLogger('pvl.syslog.parser')
+
+RFC3339_RE = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d+)?(Z|[+-]\d{2}:\d{2})?')
+RFC3339_FMT = '%Y-%m-%dT%H:%M:%S'
+
+def rfc3339 (timestamp) :
+    """
+        RFC3339 timestamps as used in some syslog implementations.
+
+        Returns a datetime in some random timezone, possibly localtime.
+    """
+
+    match = RFC3339_RE.match(timestamp)
+
+    if not match :
+        return None
+    
+    # parts
+    dt = datetime.datetime.strptime(match.group(1), RFC3339_FMT)
+    tz = match.group(2)
+    
+    # TODO: timezone?
+    return dt
+
+    if not tz :
+        # XXX: localtime
+        return dt
+
+    elif tz == 'Z' :
+        # UTC
+        pass
+
+    elif tz[0] in '+-' :
+        hours, minutes = tz[1:].split(':')
+        td = datetime.timedelta(hours=int(hours), minutes=int(minutes))
+        
+        if tz[0] == '-' :
+            dt += td
+        if tz[0] == '+' :
+            dt -= td
+    else :
+        raise ValueError("Invalid timezone offset: %s" % timestamp)
+
+    # XXX: UTC
+    return dt
+
+RFC3164_RE = re.compile(r'\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}')
+RFC3164_FMT = '%b %d %H:%M:%S'
+RFC3164_PRE = '%Y ' # add missing year, assuming current
+
+def rfc3164 (timestamp) :
+    """
+        Traditional BSD Syslog timestamps.
+
+        Returns a datetime assumed to be in localtime.
+    """
+
+    if not RFC3164_RE.match(timestamp) :
+        return
+
+    return datetime.datetime.strptime(time.strftime(RFC3164_PRE) + timestamp, RFC3164_PRE + RFC3164_FMT)
+       
+class SyslogParser (object) :
+    """
+        Parse syslog lines in text format, as used in logfiles/fifos.
+    """
+
+    SEVERITIES = dict(enumerate((
+        'emerg',
+        'alert', 
+        'crit', 
+        'err',
+        'warning',
+        'notice',
+        'info', 
+        'debug',
+    )))
+
+    FACILITIES = dict(enumerate((
+        'kern',     # 0
+        'user',     # 1
+        'mail',     # 2
+        'daemon',   # 3
+        'auth',     # 4
+        'syslog',   # 5
+        'lpr',      # 6
+        'news',     # 7
+        'uucp',     # 8
+        'cron',     # 9
+        'authpriv', # 10
+        'ftp',      # 11
+        'ntp',      # 12
+        'audit',    # 13
+        'alert',    # 14
+        'clock',    # 15
+        'local0',   # 16
+        'local1',   # 17
+        'local2',   # 18
+        'local3',   # 19
+        'local4',   # 20
+        'local5',   # 21
+        'local6',   # 22
+        'local7',   # 23
+    )))
+
+    # default syslogd format
+    SYSLOG_RE = re.compile(
+        # the timestamp+hostname header
+        # XXX:  hostname may be missing
+        #       at least in Ubuntu 11.10 syslogd 'last message repeated 2 times'...
+            r'(?:<(?P<pri>\d+|(?P<facility>\w+)\.(?P<severity>\w+))>)?'
+        +   r'(?P<timestamp>\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}|.+?) '
+        +   r'(?P<hostname>\S+)? '
+
+        # the message, including possible tag/pid
+        +   r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
+    )
+
+    def __init__ (self, raw=False, facility=None, severity=None) :
+        """
+            Using given facility/severity as default.
+        """
+
+        self.raw = raw
+        self.facility = facility
+        self.severity = severity
+
+    def parse_pri (self, match) :
+        """
+            Parse pri/facility/severity.
+        """
+
+        pri = match.group('pri')
+        facility = match.group('facility') or self.facility
+        severity = match.group('severity') or self.severity
+        
+        if pri and pri.isdigit() :
+            pri = int(pri)
+            facility, severity = divmod(pri, 8)
+
+        return dict(
+            pri         = pri,
+            severity    = self.SEVERITIES.get(severity, severity),
+            facility    = self.FACILITIES.get(facility, facility)
+        )
+
+    def parse_timestamp (self, match) :
+        """
+            Parse timstamp from line into datetime.
+        """
+
+        timestamp = match.group('timestamp')
+
+        # timestamp, in various formats
+        try :
+            return rfc3164(timestamp) or rfc3339(timestamp)
+
+        except ValueError as ex:
+            # skip it
+            log.warning("timestamp: %s:", timestamp, exc_info=ex)
+            return None
+
+    def parse_prog (self, match) :
+        """
+            Parse prog from line.
+        """
+
+        prog = match.group('program')
+
+        if prog :
+            return prog
+        else :
+            # no tag
+            return None
+
+    def parse (self, line) :
+        """
+            Parse given input line into SyslogMessage.
+        """
+
+        # ignore whitespace
+        line = line.strip()
+
+        # timestamp?
+        if self.raw :
+            # from defaults
+            return dict(
+                timestamp   = datetime.datetime.now(), # XXX: None?
+                host        = None,
+                prog        = None,
+                pid         = None,
+                msg         = line,
+            )
+
+        else :
+            # parse
+            match = self.SYSLOG_RE.match(line)
+
+            if not match :
+                log.warn("Unparseable syslog message: %r", line)
+                return
+
+            # parse
+            item = dict(
+                timestamp   = self.parse_timestamp(match),
+                host        = match.group('hostname'),
+                prog        = self.parse_prog(match),
+                pid         = match.group('pid'),
+                msg         = match.group('text'),
+            )
+           
+            # facility/severity prefix?
+            item.update(self.parse_pri(match))
+
+            return item
+    
+    def process (self, lines) :
+        """
+            Yield SyslogMessages from given series of lines.
+        """
+
+        for line in lines :
+            item = self.parse(line)
+
+            log.debug("%s", item)
+
+            if item :
+                yield item
+
+    __call__ = process
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/rule.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,216 @@
+from pvl.syslog.filter import SyslogFilter
+
+import re
+
+import optparse, sys
+import configobj
+
+import logging; log = logging.getLogger('pvl.syslog.rule')
+
+def parser (parser) :
+    """
+        Optparse option group.
+    """
+
+    syslog_rules = optparse.OptionGroup(parser, "Syslog rules")
+    
+    syslog_rules.add_option('--syslog-rules', metavar='FILE',
+            help="Load syslog rules from file")
+
+    return syslog_rules
+
+def apply (options) :
+    """
+        Build SyslogRules from options.
+    """
+    
+    if options.syslog_rules :
+        return SyslogRule.load(open(options.syslog_rules))
+
+    else :
+        return SyslogRule('default', formats={ 'text': '{msg}' })
+
+def merge (*dicts, **kwargs) :
+    return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
+
+# TODO: combine SyslogRule/Rules into one heirarchial SyslogRule -type?
+class SyslogRule (object) :
+    """
+        A named SyslogFilter with sub-rules.
+    """
+
+    @classmethod
+    def load (cls, file) :
+        """
+            Load SyslogRule from file.
+        """
+
+        config = configobj.ConfigObj(file)
+
+        return cls.config_section(file.name, config)
+    
+    @classmethod
+    def config_section (cls, name, section) :
+        """
+            Recursively load Syslogrules from config section.
+        """
+
+        rules = [cls.config_section(subsection, section[subsection]) for subsection in section.sections]
+        attrs = dict((name, section[name]) for name in section.scalars)
+         
+        try :
+            return cls.config(name, rules, **attrs)
+
+        except ValueError as ex :
+            raise ValueError("[%s] %s" % (name, ex))
+
+    @classmethod
+    def config_filters (cls, program=None, facility=None, pattern=None, **filters) :
+        """
+            Return filter expression from given attr/value in config.
+        """
+
+        # XXX: get rid of these special cases
+        if facility :
+            yield 'facility', facility # glob
+
+        if program :
+            yield 'prog', program # glob
+
+        if pattern :
+            filters['msg'] = pattern
+        
+        # generic
+        for attr, value in filters.iteritems() :
+            try :
+                # regex
+                yield attr, re.compile(value)
+            
+            except re.error as ex :
+                raise ValueError("%s: %s" % (attr, ex))
+
+    @classmethod
+    def config (cls, name, rules=None, format=None, irk=None, **filters) :
+        """
+            Build SyslogRule from config options
+        """
+
+        if format is not None :
+            format = { 'text': format }
+        else :
+            format = { }
+
+        if irk :
+            format['irk'] = irk
+        
+        filters = dict(cls.config_filters(**filters))
+
+        filter = SyslogFilter(filters)
+        
+        log.debug("%s: %s %s", name, rules, filter)
+
+        return cls(name, rules, filter, format)
+
+    def __init__ (self, name, rules=None, filter=None, formats=None) : 
+        self.name = name
+        self.rules = rules or [] # sub-rules
+        self.filter = filter # SyslogFilter
+        self.formats = formats or {}
+
+    def match (self, item) :
+        """
+            Match item against our filter, returning match-dict (empty?) or None.
+        """
+        
+        if self.filter :
+            # filter
+            matches = self.filter.filter(item)
+
+        else :
+            # match all, we probably have sub-rules that we're interested in
+            return { }
+
+        log.debug("%s: %s", self, matches)
+
+        if matches :
+            return matches
+        else :
+            # no match
+            return None
+
+    def format (self, item) :
+        """
+            Apply our output formats to given base apply, yielding (unique) attr, value tuples.
+        """
+        
+        for attr, format in self.formats.iteritems() :
+            value = format.format(**item)
+
+            log.debug("%s: %s: %s", self, attr, value)
+
+            yield attr, value
+
+    def apply (self, item) :
+        """
+            Recursively match item against ourself and sub-rules. Returns applied output.
+
+            Matches are passed down the tree, and applies are passed up.
+        """
+
+        log.debug("%s", self)
+
+        # match rule -> matches
+        matches = self.match(item)
+
+        if matches is None :
+            # skip
+            return None, None, None
+        
+        # merge matches down
+        item = merge(item, matches)
+
+        # recursive sub-rules -> apply
+        for rule in self.rules :
+            try :
+                # pass matches down
+                match, rules, apply = rule.apply(item)
+
+            except Exception as ex :
+                log.exception("%s -> %s: %r", self, rule, item)
+                continue # XXX: skip?
+
+            if apply :
+                # pass apply up
+                break
+        else :
+            # self-match
+            match, rules, apply = item, [], { }
+        
+        rules.append(self)
+
+        # formats?
+        if self.formats :
+            # merge apply up
+            apply = merge(dict(self.format(item)), apply)
+        
+        log.debug("%s: %s", '/'.join(str(rule) for rule in rules), apply)
+        
+        return match, rules, apply
+
+    def __iter__ (self, items) :
+        """
+            Apply items against our rules, yielding any matches.
+        """
+
+        for item in items :
+            match, rules, apply = self.apply(item)
+
+            if apply :
+                yield apply
+ 
+    def __str__ (self) :
+        return self.name
+    
+    def __repr__ (self) :
+        return 'SyslogRule({self.name}, ...)'.format(self=self)
+   
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/syslog.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,126 @@
+"""
+    Syslog handling.
+
+    XXX: this belongs in pvl.syslog.source (apart from __iter__?)
+"""
+
+import select
+
+import logging; log = logging.getLogger('pvl.syslog.source')
+
+class SyslogSource (object) :
+    """
+        Process syslog input from a given source.
+        
+        Implements an iterable mainloop doing continuous polling on the source, using either a timeout or
+        select():able source.
+    """
+    
+    def __init__ (self, source, parser, filter, poll=None) :
+        """
+            Using given underlying line source.
+                
+                source      - source to select() if poll=True
+                poll        - polling behaviour for source
+        """
+        
+        self.source = source
+        self.parser = parser
+        self.filter = filter
+
+        self.poll = poll
+
+    def __iter__ (self) :
+        """
+            Yield available input.
+
+            Raises EOFError if source has been closed.
+        """
+
+        return self.filter(self.parser(self.source))
+        
+    def fileno (self) :
+        return self.source.fileno()
+
+    def select (self, poll=None, reading=(), writing=()) :
+        """
+            Poll our source for input, with given polling behaviour:
+                True    - select() on source
+                False   - peek on source
+                float   - timeout in seconds
+            
+            Returns None on unknown, empty sequence on timeout, list of readables on select.
+        """
+        
+        if poll is True :
+            timeout = None # block
+            reading += (self, ) # source.fileno()
+
+        elif not poll :
+            timeout = 0.0 # do not block
+
+        else :
+            timeout = float(poll)
+
+        log.debug("%s (%s)", reading, timeout)
+    
+        # select
+        readable, writeable, ex = select.select(reading, writing, [], timeout)
+        
+        log.debug("select: %s", readable)
+
+        if readable :
+            return readable
+
+        elif reading :
+            # timeout
+            # XXX: this is the same as readable
+            return ()
+
+        else :
+            # unknown
+            return None
+
+    def main (self, poll=None) :
+        """
+            Yield active syslog sources, polling as given.
+
+            Returns once no more lines are available.
+
+            XXX: reconnect? or source takes care of that..
+            TODO: SIGINT -> finish iteration and return?
+        """
+
+        # from __init__
+        # note that we must interpret poll here, since False -> never poll
+        if poll is None :
+            poll = self.poll
+
+        # mainloop
+        while True :
+            # caller is responsible for reading them!
+            yield self
+            
+            # poll
+            if poll :
+                # wait
+                self.select(poll)
+
+            else :
+                # done
+                break
+
+        log.debug("exit")
+
+    def close (self) :
+        """
+            Close the syslog source, if possible.
+
+        """
+
+        # XXX: do all sources support close?
+        self.source.close()
+
+    def __str__ (self) :
+        return ' | '.join((str(self.source), str(self.parser), str(self.filter)))
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/tail.py	Tue Feb 19 20:10:21 2013 +0200
@@ -0,0 +1,150 @@
+"""
+    Iterate over input lines in filesystem files. 
+"""
+
+import os
+
+import logging; log = logging.getLogger('pvl.syslog.tail')
+
+class Tail (object) :
+    """
+        Follow a file on the filesystem, reading lines until EOF, and re-opening if replaced.
+
+        Never blocks, no fileno() to poll. Just poll(timeout=POLL).
+
+        Not writable.
+    """
+    
+    POLL = 2.0 
+
+    def __init__ (self, path, skip=None, **opts) :
+        log.debug("%s", path)
+
+        self.path = path
+        self.file = self.stat = None # closed
+
+        self.open()
+
+        if skip :
+            self.skip()
+   
+    def _stat (self) :
+        """
+            Return a key identifying the file at our path.
+        """
+
+        st = os.stat(self.path)
+
+        stat = st.st_dev, st.st_ino
+
+        return stat
+
+    def _open (self) :
+        """
+            Return the opened file.
+        """
+
+        return open(self.path, 'r')
+
+    def open (self) :
+        """
+            Re-opens our file when closed.
+
+            Raises ValueError if already open.
+        """
+
+        if self.file is None :
+            # XXX: use fstat for "atomic" open+stat?
+            self.file = self._open()
+            self.stat = self._stat()
+
+            log.debug("%s: %s: %s", self, self.file, self.stat)
+
+        else :
+            raise ValueError("%s: open already open tail" % (self, ))
+
+    def changed (self) :
+        """
+            Has the underlying file changed?
+        """
+
+        return self._stat() != self.stat
+
+    def readline (self) :
+        """
+            Reads a line from the file, without trailing \n.
+
+            Returns None on EOF.
+        """
+
+        line = self.file.readline()
+
+        if not line :
+            line = None
+        else : 
+            line = line.rstrip('\r\n')
+
+        log.debug("%s", line)
+
+        return line
+
+    def readlines (self, eof_mark=False) :
+        """
+            Reads any available lines from the file.
+
+            Reopens the file on EOF if the underlying file changed.
+
+                eof_mark:   yields a special None line when this happens.
+        """
+
+        while True :
+            line = self.readline()
+            
+            if line is not None :
+                yield line
+
+            elif self.changed() :
+                log.debug("EOF: reopen")
+                
+                self.close()
+                self.open()
+                
+                if eof_mark :
+                    yield None # special token
+                
+                # keep going
+                continue
+
+            else :
+                log.debug("EOF: wait")
+                break
+
+    __iter__ = readlines
+
+    def skip (self) :
+        """
+            Skip any available lines.
+        """
+
+        log.debug("%s", self)
+
+        for line in self.readlines() :
+            pass
+
+    def close (self) :
+        """
+            Close our file, if open. Further operations raise ValueError.
+
+            log.warn's if already closed.
+        """
+        
+        if self.file :
+            log.debug("%s", self)
+            self.file.close()
+            self.file = None
+        else :
+            log.warn("%s: close on already closed tail", self)
+
+    def __str__ (self) :
+        return self.path
+
--- a/setup.py	Tue Feb 19 19:43:47 2013 +0200
+++ b/setup.py	Tue Feb 19 20:10:21 2013 +0200
@@ -18,6 +18,7 @@
     # code
     packages        = [
         'pvl', 
+        'pvl.syslog',
     ],
     
     # binaries