pvl.syslog: import from pvl-collectd
authorTero Marttila <terom@paivola.fi>
Wed, 24 Oct 2012 21:02:33 +0300
changeset 31 3e6d0feb115c
parent 30 841d856293a1
child 32 12816e361b2d
pvl.syslog: import from pvl-collectd
pvl/syslog/__init__.py
pvl/syslog/args.py
pvl/syslog/dhcp.py
pvl/syslog/event.py
pvl/syslog/fifo.py
pvl/syslog/parser.py
pvl/syslog/syslog.py
pvl/syslog/tail.py
pvl/verkko/hosts.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/args.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,66 @@
+import optparse, sys
+
+from pvl.syslog.parser import SyslogParser
+from pvl.syslog.syslog import SyslogSource
+from pvl.syslog import fifo, tail
+
+# XXX: use optparse parser.error()?
+import logging; log = logging.getLogger('pvl.syslog.args')
+
+def parser (parser, prog=None) :
+    """
+        Optparse option group
+
+            prog        - filter to only process lines from given process
+    """
+    
+    syslog = optparse.OptionGroup(parser, 'Syslog collector')
+
+    syslog.add_option('--syslog-fifo',          metavar='PATH',
+            help="Read syslog messages from given fifo")
+
+    syslog.add_option('--syslog-file',          metavar='FILE',
+            help="Read syslog messages from given file")
+
+    syslog.add_option('--syslog-tail',          type='float', metavar='POLL',
+            help="Continuously poll file")
+
+    syslog.add_option('--syslog-raw',           action='store_true',
+            help="Parse raw syslog lines without timestamp/etc")
+
+    syslog.add_option('--syslog-prog',          metavar='PROG',     default=prog,
+            help="Filter by given prog: %default")
+
+    return syslog
+
+def apply (options, optional=False) :
+    """
+        Handle options, returning a SyslogSource, if any.
+
+        May log.error/sys.exit
+    """
+
+    if options.syslog_fifo :
+        # read fifo
+        source = fifo.Fifo(options.syslog_fifo)
+
+    elif options.syslog_file :
+        # tail file
+        source = tail.TailFile(options.syslog_file)
+
+    elif optional :
+        log.debug("No --syslog source given")
+        return None
+
+    else :
+        log.error("No --syslog source given")
+        sys.exit(2)
+    
+    parser = SyslogParser(
+        raw     = options.syslog_raw,
+        prog    = options.syslog_prog,
+    )
+
+    # build
+    return SyslogSource(source, parser)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/dhcp.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,191 @@
+"""
+    Parse ISC dhcpd messages in syslog.
+"""
+
+import re
+
+class DHCPSyslogFilter (object) :
+    """
+        Parse SyslogMessages from SyslogParser for ISC dhcp semantics.
+    """
+
+    ## various message types sent/recieved by dhcpd
+    # from server/dhcp.c
+    TYPE_NAMES = (
+        "DHCPDISCOVER",
+        "DHCPOFFER",
+        "DHCPREQUEST",
+        "DHCPDECLINE",
+        "DHCPACK",
+        "DHCPNAK",
+        "DHCPRELEASE",
+        "DHCPINFORM",
+        "type 9",
+        "DHCPLEASEQUERY",
+        "DHCPLEASEUNASSIGNED",
+        "DHCPLEASEUNKNOWN",
+        "DHCPLEASEACTIVE"
+    )
+
+    # message-parsing regexp..
+    RECV_MESSAGE_RE = (
+        # dhcpdiscover/ack_lease: info/error
+        #   hwaddr:     <no identifier>
+        #   hostname:   Hostname Unsuitable for Printing
+        #   error:
+        #               peer holds all free leases
+        #               network %s: no free leases
+        re.compile(r'(?P<type>DHCPDISCOVER) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+
+        # dhcprequest
+        #   error:
+        #               wrong network.
+        #               ignored (not authoritative).
+        #               ignored (unknown subnet).
+        #               lease %s unavailable.
+        #               unknown lease %s.
+        re.compile(r'(?P<type>DHCPREQUEST) for (?P<lease>.+?)( \((?P<server>.+?)\))? from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+
+        # dhcprelease
+        re.compile(r'(?P<type>DHCPRELEASE) of (?P<lease>.+?) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?) \((?P<found>.+?)\)$'),
+
+        # dhcpdecline
+        #   status:
+        #       abandoned
+        #       not found
+        #       ignored
+        re.compile(r'(?P<type>DHCPDECLINE) of (?P<lease>.+?) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?): (?P<status>.+?)$'),
+
+        # dhcpinform
+        #   error:
+        #       ignored (null source address).
+        #       unknown subnet for relay address %s
+        #       unknown subnet for %s address %s
+        #       not authoritative for subnet %s
+        re.compile(r'(?P<type>DHCPINFORM) from (?P<lease>.+?) via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+        
+        # dhcpleasequery
+        re.compile(r'(?P<type>DHCPLEASEQUERY) from (?P<server>.+?)( for (?P<key_type>IP|client-id|MAC address) (?P<key>.+?))?(: (?P<error>.+?))?$'),
+
+        # dhcp: generic/unknown packet
+        re.compile(r'(?P<type>\w+) from (?P<hwaddr>.+?) via (?P<gateway>.+?): (?P<error>.+?)$'),
+    )
+
+    SEND_MESSAGE_RE = (
+        # dhcp_reply
+        re.compile(r'(?P<type>DHCPACK|DHCPOFFER|BOOTREPLY) on (?P<lease>.+?) to (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)$'),
+
+        # dhcpinform
+        #   hwaddr:     <no client hardware address>
+        re.compile(r'(?P<type>DHCPACK) to (?P<lease>.+?) \((?P<hwaddr>.+?)\) via (?P<gateway>.+?)$'),
+
+        # nak_lease
+        re.compile(r'(?P<type>DHCPNAK) on (?P<lease>.+?) to (?P<hwaddr>.+?) via (?P<gateway>.+?)$'),
+
+        # dhcpleasequery
+        re.compile(r'(?P<type>DHCPLEASEUNKNOWN|DHCPLEASEACTIVE|DHCPLEASEUNASSIGNED) to (?P<lease>.+?) for (?P<key_type>IP|client-id|MAC address) (?P<key>.+?) \((?P<count>\d+) associated IPs\)$'),
+    )
+
+    MESSAGE_ERROR_RE = (
+        ('peer-all-free-leases',    re.compile('peer holds all free leases')),
+        ('no-free-leases',          re.compile(r'network (?P<network>.+?): no free leases')),
+        ('wrong-network',           re.compile(r'wrong network')),
+        ('ignored-not-auth',        re.compile(r'ignored \(not authoritative\)')),
+        ('ignored-unknown-subnet',  re.compile(r'ignored \(unknown subnet\)')),
+        ('lease-unavailable',       re.compile(r'lease (?P<lease>.+?) unavailable')),
+        ('lease-unknown',           re.compile(r'unknown lease (?P<lease>.+?).$')),
+    )
+
+    ERROR_RE = (
+        # find_lease
+        ('duplicate-uid-lease', 
+            re.compile(r'uid lease (?P<client>.+?) for client (?P<hwaddr>.+?) is duplicate on (?P<shared_network>.+?)$')),
+
+        # dhcprelease
+        ('dhcprelease-requested-address', 
+            re.compile(r'DHCPRELEASE from (?P<hwaddr>.+?) specified requested-address.')),
+
+        # ???
+        ('unexpected-icmp-echo-reply',
+            re.compile(r'unexpected ICMP Echo Reply from (?P<client>.+?)$')),
+        
+        ('host-unknown',
+            re.compile(r'(?P<host>.+?): host unknown.')),
+    )
+
+    IGNORE_RE = (
+        re.compile(r'Wrote (?P<count>\d+) (?P<what>.+?) to leases file.'),
+    )
+
+    def parse (self, line) :
+        """
+            Match line against our regexps, returning a
+
+                {
+                    tag:        send/recv/error,
+                    type:       ...,
+                    [error]:    ...,
+                    ...
+                }
+
+            dict if matched
+
+            Returns False if the message is ignored, or None if the no regexp matched.
+        """
+
+        for tag, re_list in (
+            ('recv',    self.RECV_MESSAGE_RE),
+            ('send',    self.SEND_MESSAGE_RE),
+        ) :
+            for re in re_list :
+                # test
+                match = re.match(line)
+
+                if match :
+                    data = match.groupdict()
+                    data['tag'] = tag
+
+                    return data
+                
+        # error?
+        for type, re in self.ERROR_RE:
+            match = re.match(line)
+
+            if match : 
+                data = match.groupdict()
+                data['tag'] = 'error'
+                data['type'] = type
+
+                return data
+
+        # ignore
+        for re in self.IGNORE_RE :
+            if re.match(line) :
+                # ignore
+                return False
+
+        # unknown
+        return None
+
+    def parse_error (self, error) :
+        """
+            Match given error status from send/recv against known types, returning a type name or None.
+        """
+
+        for type, re in self.MESSAGE_ERROR_RE :
+            match = re.match(error)
+
+            if match :
+                return type
+        
+        # nope
+        return None
+
+if __name__ == '__main__' :
+    import logging
+
+    logging.basicConfig()
+
+    import doctest
+    doctest.testmod()
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/event.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,45 @@
+import gevent.core as event
+
+class EventBase (object) :
+    """
+        libevent-style event base.
+
+        XXX: just a wrapper around the gevent libevent bindings, uses implict state..
+    """
+
+    def __init__ (self) :
+        event.init()
+
+    def timer (self, timeout, cb) :
+        return event.event(0, 0, timeout, cb)
+
+    def read (self, fd, cb, timeout=-1) :
+        return event.event(event.EV_READ, fd, cb)
+
+    def main (self) :
+        """
+            Run mainloop until exit.
+        """
+
+        event.dispatch()
+
+class SyslogSource (object) :
+    def __init__ (self, source, parser) :
+        self.source = source
+        self.parser = parser
+    
+    def event (self, event, evtype) :
+        """
+            Process source
+        """
+
+        # directly iter across source
+        for item in self.parser.process(self.source) :
+            yield item
+ 
+    def process (self, item) :
+        """
+            Handle item from syslog.
+        """
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/fifo.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,190 @@
+"""
+    Non-blocking fifo reads.
+"""
+
+import os
+import errno
+import select
+
+import logging
+
+log = logging.getLogger('pvl.collectd.fifo')
+
+class Fifo (object) :
+    """
+        A named pipe(7) on the filesystem.
+
+        Supports reading lines in a non-blocking fashion.
+    """
+
+    def __init__ (self, path) :
+        self.path = path
+        self._fd = None
+        self._buf = ''
+
+        log.debug("open: %s", path)
+        self._open()
+
+    @property
+    def fd (self) :
+        """
+            Fetch the internal fd, failing if we are not open..
+        """
+
+        if self._fd is None :
+            raise ValueError("I/O operation on closed fifo: {0}".format(self.path))
+
+        return self._fd
+
+    def _open (self) :
+        """
+            Open the internal fd.
+        """
+
+        assert self._fd is None
+
+        self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+    def _close (self) :
+        """
+            Close.
+        """
+
+        assert self._fd is not None
+
+        os.close(self._fd)
+
+        self._fd = None
+    
+    def _reopen (self) :
+        """
+            Re-open the FIFO in case the writing end was closed, and read gave EOF.
+        """
+
+        self._close()
+        self._open()
+
+    def poll (self, timeout=None) :
+        """
+            Poll for input, with given timeout in seconds (float).
+
+            A timeout of None indicates to block forever, False indicates to never block.
+
+            Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
+        """
+
+        if timeout is False :
+            timeout = 0.0
+    
+        # select
+        read, write, ex = select.select([self.fd], [], [], timeout)
+
+        if read :
+            return True
+
+        else :
+            # timeout
+            return False
+
+    def read (self, n=512) :
+        """
+            Read up to n bytes.
+            
+            Returns None if we would block.
+            Raises EOFError on EOF.
+        """
+
+        try :
+            buf = os.read(self.fd, n)
+
+        except OSError as ex :
+            # block?
+            if ex.errno == errno.EAGAIN :
+                # empty
+                return None
+
+            else :
+                raise
+
+        # eof?
+        if not buf :
+            raise EOFError()
+
+        # ok
+        return buf
+
+    def readline (self) :
+        """
+            Read and return next waiting line from input.
+
+            Line is returned without trailing '\n'.
+
+            Returns None if there is no line available.
+            Raises EOFError if the fifo write end was closed.
+        """
+
+        while '\n' not in self._buf :
+            # read chunk
+            read = self.read()
+
+            if read is None :
+                return None
+            
+            self._buf += read
+        
+        # split out one line
+        line, self._buf = self._buf.split('\n', 1)
+
+        return line
+
+    def readlines (self) :
+        """
+            Read any available input, yielding lines.
+            
+            Re-opens the FIFO on EOF.
+
+            Returns None if there was no more input available, or the fifo was re-opened after EOF.
+        """
+
+        while True :
+            try :
+                # pull line
+                line = self.readline()
+
+            except EOFError :
+                log.debug("EOF/reopen: %s", self.path)
+
+                # reopen and go back to waiting
+                self._reopen()
+
+                return
+            
+            if line is None :
+                log.debug("fifo empty: %s", self.path)
+
+                # wait
+                return
+
+            log.debug("%s", line)
+            yield line
+
+    __iter__ = readlines
+
+    def close (self) :
+        """
+            Close the fifo.
+        """
+
+        if self._fd is None :
+            raise ValueError("Fifo already closed: {0}".format(self.path))
+
+        self._close()
+
+    def __del__ (self) :
+        """
+            Cleanup
+        """
+
+        # will not close stdin
+        if self._fd :
+            self._close()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/parser.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,152 @@
+"""
+    Parse syslog lines in text format.
+"""
+
+import datetime, time
+import re
+
+import logging; log = logging.getLogger('pvl.syslog.parser')
+
+class SyslogParser (object) :
+    """
+        A source of syslog items.
+    """
+    
+    # default syslogd format
+    SYSLOG_RE = re.compile(
+        # the timestamp+hostname header
+            r"(?P<timestamp>\w{3} [0-9 ]\d \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) "
+
+        # the message, including possible tag/pid
+        +   r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
+    )
+
+    TIMESTAMP_FMT = '%b %d %H:%M:%S'
+
+    def __init__ (self, raw=False, prog=None) :
+        """
+            Using given underlying line source.
+        """
+
+        self.raw = raw
+        self.prog = prog
+
+    def parse_timestamp (self, match) :
+        """
+            Parse timstamp from line into datetime.
+        """
+
+        timestamp = match.group('timestamp')
+        
+        # add missing year; assume current
+        timestamp = time.strftime('%Y') + ' ' + timestamp
+        
+        # k
+        timestamp = datetime.datetime.strptime(timestamp, '%Y ' + self.TIMESTAMP_FMT)
+
+        return timestamp
+
+    def parse_prog (self, match) :
+        """
+            Parse prog from line.
+        """
+
+        prog = match.group('program')
+
+        if not prog :
+            # no tag
+            return None
+        
+        # normalize
+        prog = prog.lower()
+
+        if prog.startswith('/') :
+            # base
+            prog = prog.split('/')[-1]
+
+        return prog
+
+    def parse (self, line) :
+        """
+            Parse given input line into SyslogMessage.
+        """
+
+        # ignore whitespace
+        line = line.strip()
+
+        # debug
+        log.debug("%s", line)
+
+        # timestamp?
+        if self.raw :
+            # from defaults
+            return dict(
+                timestamp   = datetime.datetime.now(), # XXX: None?
+                host        = None,
+                prog        = self.prog,
+                pid         = None,
+                msg         = line,
+            )
+
+        else :
+            # parse
+            match = self.SYSLOG_RE.match(line)
+
+            if not match :
+                log.warn("Unparseable syslog message: %r", line)
+                return
+
+            # parse
+            return dict(
+                timestamp   = self.parse_timestamp(match),
+                host        = match.group('hostname'),
+                prog        = self.parse_prog(match),
+                pid         = match.group('pid'),
+                msg         = match.group('text'),
+            )
+    
+    def match_prog (self, prog) :
+        """
+            Match given prog?
+        """
+        
+        if not prog :
+            # never matches non-tagged lines
+            return False
+
+        elif self.prog.endswith('*') :
+            # prefix match
+            return prog.startswith(self.prog[:-1])
+        else :
+            return prog == self.prog
+
+
+    def filter (self, line, item) :
+        """
+            Filter given item?
+        """
+
+        if not item :
+            log.debug("empty: %r", line)
+
+        elif self.prog and not self.match_prog(item['prog']) :
+            log.debug("prog: %r", item)
+
+        else :
+            # ok
+            return True
+
+    def process (self, lines) :
+        """
+            Yield SyslogMessages from given series of lines.
+        """
+
+        for line in lines :
+            item = self.parse(line)
+            
+            # filter?
+            if self.filter(line, item) :
+                yield item
+
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/syslog.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,31 @@
+"""
+    Syslog handling.
+"""
+
+import logging; log = logging.getLogger('pvl.syslog.source')
+
+class SyslogSource (object) :
+    """
+        A source of syslog items.
+    """
+    
+    def __init__ (self, source, parser) :
+        """
+            Using given underlying line source.
+        """
+
+        self.source = source
+        self.parser = parser
+
+        # proxy
+        self.poll = source.poll
+
+    def __iter__ (self) :
+        """
+            Read syslog messages from source.
+        """
+        
+        # directly iter across source
+        for item in self.parser.process(self.source) :
+            yield item
+            
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/tail.py	Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,154 @@
+"""
+    `tail -f` style continuous file reads.
+
+    Checks if the file was renamed on EOF, and reopens if so.
+"""
+
+import os
+
+import logging; log = logging.getLogger('pvl.collectd.tail')
+
+class TailFile (object) :
+    """
+        A file on the filesystem, that is appended to.
+    """
+
+    def __init__ (self, path) :
+        self.path = path
+        self._file = None
+        self._id = None
+
+        self._open()
+    
+    @property
+    def file (self) :
+        """
+            The underlying file objct, if opened.
+        """
+
+        if self._file is None :
+            raise ValueError("I/O operation on closed file: {0}".format(self.path))
+
+        return self._file
+
+    @property
+    def fd (self) :
+        return self.file.fileno()
+
+    def fileno (self) :
+        """
+            The underlying OS fd.
+        """
+
+        return self.fd
+
+    def _stat (self) :
+        """
+            Return a key identifying the file at our path.
+        """
+
+        st = os.stat(self.path)
+
+        return st.st_dev, st.st_ino
+
+    def _open (self) :
+        assert self._file is None
+
+        self._file = open(self.path, 'r')
+        self._id = self._stat()
+
+    def _close (self) :
+        assert self._file is not None
+
+        self._file.close()
+        self._file = None
+
+    def _reopen (self) :
+        """
+            Re-open, in case the file changed..
+        """
+
+        self._close()
+        self._open()
+
+    def _changed (self) :
+        """
+            Has the underlying file changed?
+        """
+
+        return self._stat() != self._id
+
+    def poll (self, timeout) :
+        """
+            XXX: not really implemented...
+        """
+
+        import time
+
+        time.sleep(timeout)
+    
+    def readline (self) :
+        """
+            Reads a line from the file.
+
+            Raises EOF on end-of-file.
+        """
+
+        line = self.file.readline()
+
+        # eof?
+        if not line :
+            raise EOFError()
+        
+        return line
+
+    def readlines (self, eof_mark=False) :
+        """
+            Reads any available lines from the file.
+
+            Reopens the file on EOF if the underlying file changed.
+
+                eof_mark:   yields a special None line when this happens.
+        """
+
+        while True :
+            try :
+                line = self.readline()
+
+            except EOFError :
+                if self._changed() :
+                    log.debug("EOF: file changed: reopen")
+                    self._reopen()
+                    
+                    if eof_mark :
+                        # special token
+                        yield None
+
+                else :
+                    log.debug("EOF: wait")
+                    # done reading
+                    return
+
+            else :
+                yield line.strip()
+
+    __iter__ = readlines
+
+    def close (self) :
+        """
+            Close the fifo.
+        """
+
+        if self._file is None :
+            raise ValueError("File already closed: {0}".format(self.path))
+
+        self._close()
+
+    def __del__ (self) :
+        """
+            Cleanup
+        """
+
+        if self._file is not None :
+            self._close()
+
--- a/pvl/verkko/hosts.py	Wed Oct 24 20:46:17 2012 +0300
+++ b/pvl/verkko/hosts.py	Wed Oct 24 21:02:33 2012 +0300
@@ -566,12 +566,17 @@
 
         else :
             # render html
-            self.hosts = hosts.limit(10)
+            hosts = hosts.limit(10)
 
             # XXX: testing
-            self.hosts = self.hosts.offset(1)
+            hosts = hosts.offset(1)
 
-            self.t = self.hosts[0].last_seen
+            self.hosts = list(hosts)
+            
+            if self.hosts :
+                self.t = self.hosts[0].last_seen
+            else :
+                self.t = datetime.datetime.now()
 
     def title (self) :
         if self.filters :