--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/args.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,66 @@
+import optparse, sys
+
+from pvl.syslog.parser import SyslogParser
+from pvl.syslog.syslog import SyslogSource
+from pvl.syslog import fifo, tail
+
+# XXX: use optparse parser.error()?
+import logging; log = logging.getLogger('pvl.syslog.args')
+
+def parser (parser, prog=None) :
+ """
+ Optparse option group
+
+ prog - filter to only process lines from given process
+ """
+
+ syslog = optparse.OptionGroup(parser, 'Syslog collector')
+
+ syslog.add_option('--syslog-fifo', metavar='PATH',
+ help="Read syslog messages from given fifo")
+
+ syslog.add_option('--syslog-file', metavar='FILE',
+ help="Read syslog messages from given file")
+
+ syslog.add_option('--syslog-tail', type='float', metavar='POLL',
+ help="Continuously poll file")
+
+ syslog.add_option('--syslog-raw', action='store_true',
+ help="Parse raw syslog lines without timestamp/etc")
+
+ syslog.add_option('--syslog-prog', metavar='PROG', default=prog,
+ help="Filter by given prog: %default")
+
+ return syslog
+
+def apply (options, optional=False) :
+ """
+ Handle options, returning a SyslogSource, if any.
+
+ May log.error/sys.exit
+ """
+
+ if options.syslog_fifo :
+ # read fifo
+ source = fifo.Fifo(options.syslog_fifo)
+
+ elif options.syslog_file :
+ # tail file
+ source = tail.TailFile(options.syslog_file)
+
+ elif optional :
+ log.debug("No --syslog source given")
+ return None
+
+ else :
+ log.error("No --syslog source given")
+ sys.exit(2)
+
+ parser = SyslogParser(
+ raw = options.syslog_raw,
+ prog = options.syslog_prog,
+ )
+
+ # build
+ return SyslogSource(source, parser)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/dhcp.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,191 @@
+"""
+ Parse ISC dhcpd messages in syslog.
+"""
+
+import re
+
+class DHCPSyslogFilter (object) :
+ """
+ Parse SyslogMessages from SyslogParser for ISC dhcp semantics.
+ """
+
+ ## various message types sent/recieved by dhcpd
+ # from server/dhcp.c
+ TYPE_NAMES = (
+ "DHCPDISCOVER",
+ "DHCPOFFER",
+ "DHCPREQUEST",
+ "DHCPDECLINE",
+ "DHCPACK",
+ "DHCPNAK",
+ "DHCPRELEASE",
+ "DHCPINFORM",
+ "type 9",
+ "DHCPLEASEQUERY",
+ "DHCPLEASEUNASSIGNED",
+ "DHCPLEASEUNKNOWN",
+ "DHCPLEASEACTIVE"
+ )
+
+ # message-parsing regexp..
+ RECV_MESSAGE_RE = (
+ # dhcpdiscover/ack_lease: info/error
+ # hwaddr: <no identifier>
+ # hostname: Hostname Unsuitable for Printing
+ # error:
+ # peer holds all free leases
+ # network %s: no free leases
+ re.compile(r'(?P<type>DHCPDISCOVER) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+
+ # dhcprequest
+ # error:
+ # wrong network.
+ # ignored (not authoritative).
+ # ignored (unknown subnet).
+ # lease %s unavailable.
+ # unknown lease %s.
+ re.compile(r'(?P<type>DHCPREQUEST) for (?P<lease>.+?)( \((?P<server>.+?)\))? from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+
+ # dhcprelease
+ re.compile(r'(?P<type>DHCPRELEASE) of (?P<lease>.+?) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?) \((?P<found>.+?)\)$'),
+
+ # dhcpdecline
+ # status:
+ # abandoned
+ # not found
+ # ignored
+ re.compile(r'(?P<type>DHCPDECLINE) of (?P<lease>.+?) from (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?): (?P<status>.+?)$'),
+
+ # dhcpinform
+ # error:
+ # ignored (null source address).
+ # unknown subnet for relay address %s
+ # unknown subnet for %s address %s
+ # not authoritative for subnet %s
+ re.compile(r'(?P<type>DHCPINFORM) from (?P<lease>.+?) via (?P<gateway>.+?)(: (?P<error>.+?))?$'),
+
+ # dhcpleasequery
+ re.compile(r'(?P<type>DHCPLEASEQUERY) from (?P<server>.+?)( for (?P<key_type>IP|client-id|MAC address) (?P<key>.+?))?(: (?P<error>.+?))?$'),
+
+ # dhcp: generic/unknown packet
+ re.compile(r'(?P<type>\w+) from (?P<hwaddr>.+?) via (?P<gateway>.+?): (?P<error>.+?)$'),
+ )
+
+ SEND_MESSAGE_RE = (
+ # dhcp_reply
+ re.compile(r'(?P<type>DHCPACK|DHCPOFFER|BOOTREPLY) on (?P<lease>.+?) to (?P<hwaddr>.+?)( \((?P<hostname>.+?)\))? via (?P<gateway>.+?)$'),
+
+ # dhcpinform
+ # hwaddr: <no client hardware address>
+ re.compile(r'(?P<type>DHCPACK) to (?P<lease>.+?) \((?P<hwaddr>.+?)\) via (?P<gateway>.+?)$'),
+
+ # nak_lease
+ re.compile(r'(?P<type>DHCPNAK) on (?P<lease>.+?) to (?P<hwaddr>.+?) via (?P<gateway>.+?)$'),
+
+ # dhcpleasequery
+ re.compile(r'(?P<type>DHCPLEASEUNKNOWN|DHCPLEASEACTIVE|DHCPLEASEUNASSIGNED) to (?P<lease>.+?) for (?P<key_type>IP|client-id|MAC address) (?P<key>.+?) \((?P<count>\d+) associated IPs\)$'),
+ )
+
+ MESSAGE_ERROR_RE = (
+ ('peer-all-free-leases', re.compile('peer holds all free leases')),
+ ('no-free-leases', re.compile(r'network (?P<network>.+?): no free leases')),
+ ('wrong-network', re.compile(r'wrong network')),
+ ('ignored-not-auth', re.compile(r'ignored \(not authoritative\)')),
+ ('ignored-unknown-subnet', re.compile(r'ignored \(unknown subnet\)')),
+ ('lease-unavailable', re.compile(r'lease (?P<lease>.+?) unavailable')),
+ ('lease-unknown', re.compile(r'unknown lease (?P<lease>.+?).$')),
+ )
+
+ ERROR_RE = (
+ # find_lease
+ ('duplicate-uid-lease',
+ re.compile(r'uid lease (?P<client>.+?) for client (?P<hwaddr>.+?) is duplicate on (?P<shared_network>.+?)$')),
+
+ # dhcprelease
+ ('dhcprelease-requested-address',
+ re.compile(r'DHCPRELEASE from (?P<hwaddr>.+?) specified requested-address.')),
+
+ # ???
+ ('unexpected-icmp-echo-reply',
+ re.compile(r'unexpected ICMP Echo Reply from (?P<client>.+?)$')),
+
+ ('host-unknown',
+ re.compile(r'(?P<host>.+?): host unknown.')),
+ )
+
+ IGNORE_RE = (
+ re.compile(r'Wrote (?P<count>\d+) (?P<what>.+?) to leases file.'),
+ )
+
+ def parse (self, line) :
+ """
+ Match line against our regexps, returning a
+
+ {
+ tag: send/recv/error,
+ type: ...,
+ [error]: ...,
+ ...
+ }
+
+ dict if matched
+
+ Returns False if the message is ignored, or None if the no regexp matched.
+ """
+
+ for tag, re_list in (
+ ('recv', self.RECV_MESSAGE_RE),
+ ('send', self.SEND_MESSAGE_RE),
+ ) :
+ for re in re_list :
+ # test
+ match = re.match(line)
+
+ if match :
+ data = match.groupdict()
+ data['tag'] = tag
+
+ return data
+
+ # error?
+ for type, re in self.ERROR_RE:
+ match = re.match(line)
+
+ if match :
+ data = match.groupdict()
+ data['tag'] = 'error'
+ data['type'] = type
+
+ return data
+
+ # ignore
+ for re in self.IGNORE_RE :
+ if re.match(line) :
+ # ignore
+ return False
+
+ # unknown
+ return None
+
+ def parse_error (self, error) :
+ """
+ Match given error status from send/recv against known types, returning a type name or None.
+ """
+
+ for type, re in self.MESSAGE_ERROR_RE :
+ match = re.match(error)
+
+ if match :
+ return type
+
+ # nope
+ return None
+
+if __name__ == '__main__' :
+ import logging
+
+ logging.basicConfig()
+
+ import doctest
+ doctest.testmod()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/event.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,45 @@
+import gevent.core as event
+
+class EventBase (object) :
+ """
+ libevent-style event base.
+
+ XXX: just a wrapper around the gevent libevent bindings, uses implict state..
+ """
+
+ def __init__ (self) :
+ event.init()
+
+ def timer (self, timeout, cb) :
+ return event.event(0, 0, timeout, cb)
+
+ def read (self, fd, cb, timeout=-1) :
+ return event.event(event.EV_READ, fd, cb)
+
+ def main (self) :
+ """
+ Run mainloop until exit.
+ """
+
+ event.dispatch()
+
+class SyslogSource (object) :
+ def __init__ (self, source, parser) :
+ self.source = source
+ self.parser = parser
+
+ def event (self, event, evtype) :
+ """
+ Process source
+ """
+
+ # directly iter across source
+ for item in self.parser.process(self.source) :
+ yield item
+
+ def process (self, item) :
+ """
+ Handle item from syslog.
+ """
+
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/fifo.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,190 @@
+"""
+ Non-blocking fifo reads.
+"""
+
+import os
+import errno
+import select
+
+import logging
+
+log = logging.getLogger('pvl.collectd.fifo')
+
+class Fifo (object) :
+ """
+ A named pipe(7) on the filesystem.
+
+ Supports reading lines in a non-blocking fashion.
+ """
+
+ def __init__ (self, path) :
+ self.path = path
+ self._fd = None
+ self._buf = ''
+
+ log.debug("open: %s", path)
+ self._open()
+
+ @property
+ def fd (self) :
+ """
+ Fetch the internal fd, failing if we are not open..
+ """
+
+ if self._fd is None :
+ raise ValueError("I/O operation on closed fifo: {0}".format(self.path))
+
+ return self._fd
+
+ def _open (self) :
+ """
+ Open the internal fd.
+ """
+
+ assert self._fd is None
+
+ self._fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+ def _close (self) :
+ """
+ Close.
+ """
+
+ assert self._fd is not None
+
+ os.close(self._fd)
+
+ self._fd = None
+
+ def _reopen (self) :
+ """
+ Re-open the FIFO in case the writing end was closed, and read gave EOF.
+ """
+
+ self._close()
+ self._open()
+
+ def poll (self, timeout=None) :
+ """
+ Poll for input, with given timeout in seconds (float).
+
+ A timeout of None indicates to block forever, False indicates to never block.
+
+ Returns True if we have input waiting, False on timeout with no input. None on indeterminate.
+ """
+
+ if timeout is False :
+ timeout = 0.0
+
+ # select
+ read, write, ex = select.select([self.fd], [], [], timeout)
+
+ if read :
+ return True
+
+ else :
+ # timeout
+ return False
+
+ def read (self, n=512) :
+ """
+ Read up to n bytes.
+
+ Returns None if we would block.
+ Raises EOFError on EOF.
+ """
+
+ try :
+ buf = os.read(self.fd, n)
+
+ except OSError as ex :
+ # block?
+ if ex.errno == errno.EAGAIN :
+ # empty
+ return None
+
+ else :
+ raise
+
+ # eof?
+ if not buf :
+ raise EOFError()
+
+ # ok
+ return buf
+
+ def readline (self) :
+ """
+ Read and return next waiting line from input.
+
+ Line is returned without trailing '\n'.
+
+ Returns None if there is no line available.
+ Raises EOFError if the fifo write end was closed.
+ """
+
+ while '\n' not in self._buf :
+ # read chunk
+ read = self.read()
+
+ if read is None :
+ return None
+
+ self._buf += read
+
+ # split out one line
+ line, self._buf = self._buf.split('\n', 1)
+
+ return line
+
+ def readlines (self) :
+ """
+ Read any available input, yielding lines.
+
+ Re-opens the FIFO on EOF.
+
+ Returns None if there was no more input available, or the fifo was re-opened after EOF.
+ """
+
+ while True :
+ try :
+ # pull line
+ line = self.readline()
+
+ except EOFError :
+ log.debug("EOF/reopen: %s", self.path)
+
+ # reopen and go back to waiting
+ self._reopen()
+
+ return
+
+ if line is None :
+ log.debug("fifo empty: %s", self.path)
+
+ # wait
+ return
+
+ log.debug("%s", line)
+ yield line
+
+ __iter__ = readlines
+
+ def close (self) :
+ """
+ Close the fifo.
+ """
+
+ if self._fd is None :
+ raise ValueError("Fifo already closed: {0}".format(self.path))
+
+ self._close()
+
+ def __del__ (self) :
+ """
+ Cleanup
+ """
+
+ # will not close stdin
+ if self._fd :
+ self._close()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/parser.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,152 @@
+"""
+ Parse syslog lines in text format.
+"""
+
+import datetime, time
+import re
+
+import logging; log = logging.getLogger('pvl.syslog.parser')
+
+class SyslogParser (object) :
+ """
+ A source of syslog items.
+ """
+
+ # default syslogd format
+ SYSLOG_RE = re.compile(
+ # the timestamp+hostname header
+ r"(?P<timestamp>\w{3} [0-9 ]\d \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) "
+
+ # the message, including possible tag/pid
+ + r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
+ )
+
+ TIMESTAMP_FMT = '%b %d %H:%M:%S'
+
+ def __init__ (self, raw=False, prog=None) :
+ """
+ Using given underlying line source.
+ """
+
+ self.raw = raw
+ self.prog = prog
+
+ def parse_timestamp (self, match) :
+ """
+ Parse timstamp from line into datetime.
+ """
+
+ timestamp = match.group('timestamp')
+
+ # add missing year; assume current
+ timestamp = time.strftime('%Y') + ' ' + timestamp
+
+ # k
+ timestamp = datetime.datetime.strptime(timestamp, '%Y ' + self.TIMESTAMP_FMT)
+
+ return timestamp
+
+ def parse_prog (self, match) :
+ """
+ Parse prog from line.
+ """
+
+ prog = match.group('program')
+
+ if not prog :
+ # no tag
+ return None
+
+ # normalize
+ prog = prog.lower()
+
+ if prog.startswith('/') :
+ # base
+ prog = prog.split('/')[-1]
+
+ return prog
+
+ def parse (self, line) :
+ """
+ Parse given input line into SyslogMessage.
+ """
+
+ # ignore whitespace
+ line = line.strip()
+
+ # debug
+ log.debug("%s", line)
+
+ # timestamp?
+ if self.raw :
+ # from defaults
+ return dict(
+ timestamp = datetime.datetime.now(), # XXX: None?
+ host = None,
+ prog = self.prog,
+ pid = None,
+ msg = line,
+ )
+
+ else :
+ # parse
+ match = self.SYSLOG_RE.match(line)
+
+ if not match :
+ log.warn("Unparseable syslog message: %r", line)
+ return
+
+ # parse
+ return dict(
+ timestamp = self.parse_timestamp(match),
+ host = match.group('hostname'),
+ prog = self.parse_prog(match),
+ pid = match.group('pid'),
+ msg = match.group('text'),
+ )
+
+ def match_prog (self, prog) :
+ """
+ Match given prog?
+ """
+
+ if not prog :
+ # never matches non-tagged lines
+ return False
+
+ elif self.prog.endswith('*') :
+ # prefix match
+ return prog.startswith(self.prog[:-1])
+ else :
+ return prog == self.prog
+
+
+ def filter (self, line, item) :
+ """
+ Filter given item?
+ """
+
+ if not item :
+ log.debug("empty: %r", line)
+
+ elif self.prog and not self.match_prog(item['prog']) :
+ log.debug("prog: %r", item)
+
+ else :
+ # ok
+ return True
+
+ def process (self, lines) :
+ """
+ Yield SyslogMessages from given series of lines.
+ """
+
+ for line in lines :
+ item = self.parse(line)
+
+ # filter?
+ if self.filter(line, item) :
+ yield item
+
+
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/syslog.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,31 @@
+"""
+ Syslog handling.
+"""
+
+import logging; log = logging.getLogger('pvl.syslog.source')
+
+class SyslogSource (object) :
+ """
+ A source of syslog items.
+ """
+
+ def __init__ (self, source, parser) :
+ """
+ Using given underlying line source.
+ """
+
+ self.source = source
+ self.parser = parser
+
+ # proxy
+ self.poll = source.poll
+
+ def __iter__ (self) :
+ """
+ Read syslog messages from source.
+ """
+
+ # directly iter across source
+ for item in self.parser.process(self.source) :
+ yield item
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/syslog/tail.py Wed Oct 24 21:02:33 2012 +0300
@@ -0,0 +1,154 @@
+"""
+ `tail -f` style continuous file reads.
+
+ Checks if the file was renamed on EOF, and reopens if so.
+"""
+
+import os
+
+import logging; log = logging.getLogger('pvl.collectd.tail')
+
+class TailFile (object) :
+ """
+ A file on the filesystem, that is appended to.
+ """
+
+ def __init__ (self, path) :
+ self.path = path
+ self._file = None
+ self._id = None
+
+ self._open()
+
+ @property
+ def file (self) :
+ """
+ The underlying file objct, if opened.
+ """
+
+ if self._file is None :
+ raise ValueError("I/O operation on closed file: {0}".format(self.path))
+
+ return self._file
+
+ @property
+ def fd (self) :
+ return self.file.fileno()
+
+ def fileno (self) :
+ """
+ The underlying OS fd.
+ """
+
+ return self.fd
+
+ def _stat (self) :
+ """
+ Return a key identifying the file at our path.
+ """
+
+ st = os.stat(self.path)
+
+ return st.st_dev, st.st_ino
+
+ def _open (self) :
+ assert self._file is None
+
+ self._file = open(self.path, 'r')
+ self._id = self._stat()
+
+ def _close (self) :
+ assert self._file is not None
+
+ self._file.close()
+ self._file = None
+
+ def _reopen (self) :
+ """
+ Re-open, in case the file changed..
+ """
+
+ self._close()
+ self._open()
+
+ def _changed (self) :
+ """
+ Has the underlying file changed?
+ """
+
+ return self._stat() != self._id
+
+ def poll (self, timeout) :
+ """
+ XXX: not really implemented...
+ """
+
+ import time
+
+ time.sleep(timeout)
+
+ def readline (self) :
+ """
+ Reads a line from the file.
+
+ Raises EOF on end-of-file.
+ """
+
+ line = self.file.readline()
+
+ # eof?
+ if not line :
+ raise EOFError()
+
+ return line
+
+ def readlines (self, eof_mark=False) :
+ """
+ Reads any available lines from the file.
+
+ Reopens the file on EOF if the underlying file changed.
+
+ eof_mark: yields a special None line when this happens.
+ """
+
+ while True :
+ try :
+ line = self.readline()
+
+ except EOFError :
+ if self._changed() :
+ log.debug("EOF: file changed: reopen")
+ self._reopen()
+
+ if eof_mark :
+ # special token
+ yield None
+
+ else :
+ log.debug("EOF: wait")
+ # done reading
+ return
+
+ else :
+ yield line.strip()
+
+ __iter__ = readlines
+
+ def close (self) :
+ """
+ Close the fifo.
+ """
+
+ if self._file is None :
+ raise ValueError("File already closed: {0}".format(self.path))
+
+ self._close()
+
+ def __del__ (self) :
+ """
+ Cleanup
+ """
+
+ if self._file is not None :
+ self._close()
+
--- a/pvl/verkko/hosts.py Wed Oct 24 20:46:17 2012 +0300
+++ b/pvl/verkko/hosts.py Wed Oct 24 21:02:33 2012 +0300
@@ -566,12 +566,17 @@
else :
# render html
- self.hosts = hosts.limit(10)
+ hosts = hosts.limit(10)
# XXX: testing
- self.hosts = self.hosts.offset(1)
+ hosts = hosts.offset(1)
- self.t = self.hosts[0].last_seen
+ self.hosts = list(hosts)
+
+ if self.hosts :
+ self.t = self.hosts[0].last_seen
+ else :
+ self.t = datetime.datetime.now()
def title (self) :
if self.filters :