split pvl.dhcp-leases from pvl.syslog-dhcp using pvl.dhcp.hosts/syslog/leases
authorTero Marttila <terom@paivola.fi>
Sat, 26 Jan 2013 11:49:16 +0200
changeset 174 6f339a8a87dc
parent 173 5fc4c5e83b72
child 175 d61ca480243e
split pvl.dhcp-leases from pvl.syslog-dhcp using pvl.dhcp.hosts/syslog/leases
bin/pvl.dhcp-leases
bin/pvl.syslog-dhcp
pvl/dhcp/hosts.py
pvl/dhcp/leases.py
pvl/dhcp/syslog.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/pvl.dhcp-leases	Sat Jan 26 11:49:16 2013 +0200
@@ -0,0 +1,114 @@
+#!/usr/bin/env python
+
+"""
+    Monitor dhcpd leases -> database.
+"""
+
+__version__ = '0.0'
+
+import pvl.args
+import pvl.syslog.args
+
+import pvl.verkko.db as db
+import pvl.dhcp.leases
+
+import logging, optparse
+
+log = logging.getLogger('main')
+
+def parse_options (argv) :
+    """
+        Parse command-line arguments.
+    """
+
+    prog = argv[0]
+
+    parser = optparse.OptionParser(
+            prog        = prog,
+            usage       = '%prog: [options]',
+            version     = __version__,
+
+            # module docstring
+            description = __doc__,
+    )
+    
+    # options
+    parser.add_option_group(pvl.args.parser(parser))
+
+    ## leases
+    parser.add_option('--leases',               metavar='FILE', default='/var/lib/dhcp/dhcpd.leases',
+            help="Synchronize dhcpd leases from given file")
+
+    parser.add_option('--leases-tail',          type='float', metavar='POLL',
+            help="Continuously poll leases file with given timeout")
+
+    ## database
+    parser.add_option('--database',             metavar='URI',
+            help="Track hosts in given database")
+
+    parser.add_option('--create',               action='store_true',
+            help="Initialize database")
+
+    # defaults
+    parser.set_defaults(
+
+    )
+    
+    # parse
+    options, args = parser.parse_args(argv[1:])
+    
+    # apply
+    pvl.args.apply(options, prog)
+
+    if not options.database :
+        parser.error("Missing required option: --database")
+
+    return options, args
+
+def main (argv) :
+    options, args = parse_options(argv)
+
+    # db
+    if not options.database :
+        log.error("No database given")
+        return 1
+
+    log.info("Open up database: %s", options.database)
+    db = pvl.verkko.db.Database(options.database)
+    leases_db = pvl.dhcp.leases.DHCPLeasesDatabase(db)
+   
+    if options.create :
+        handler.create()
+
+    # leases
+    log.info("Open up DHCP leases...")
+    leases = pvl.dhcp.leases.DHCPLeases(options.leases)
+
+    # polling interval?
+    if options.leases_tail :
+        poll = options.leases_tail
+    else :
+        poll = None
+
+    # mainloop
+    log.info("Enter mainloop...")
+    while True :
+        log.debug("tick")
+
+        for lease in leases :
+            leases_db(lease)
+        
+        log.debug("tock")
+
+        if poll :
+            time.sleep(poll)
+        else :
+            break
+    
+    # done
+    return 0
+
+if __name__ == '__main__':
+    import sys
+
+    sys.exit(main(sys.argv))
--- a/bin/pvl.syslog-dhcp	Sat Jan 26 11:48:02 2013 +0200
+++ b/bin/pvl.syslog-dhcp	Sat Jan 26 11:49:16 2013 +0200
@@ -4,14 +4,14 @@
     Monitor DHCP use.
 """
 
-__version__ = '0.0'
+__version__ = '0.1'
 
 import pvl.args
 import pvl.syslog.args
 
 import pvl.verkko.db as db
 import pvl.dhcp.syslog
-import pvl.dhcp.leases
+import pvl.dhcp.hosts
 
 import logging, optparse
 
@@ -42,14 +42,6 @@
     ## syslog
     parser.add_option_group(pvl.syslog.args.parser(parser, prog=DHCP_SYSLOG_PROG))
 
-    ## leases
-    parser.add_option('--leases-file',          metavar='FILE',
-            help="Synchronize dhcpd leases from given file")
-
-    parser.add_option('--leases-tail',          type='float', metavar='POLL',
-            help="Continuously poll leases file XXX: overrides --syslog-tail")
-
-
     ## XXX: networks
     parser.add_option('--network',              metavar='NET', action='append',
             help="Filter leases by network prefix as plugin instance")
@@ -80,305 +72,6 @@
 
     return options, args
 
-class DHCPHostsDatabase (object) :
-    """
-        The dhcp_hosts table in our database.
-    """
-
-    def __init__ (self, db) :
-        self.db = db
-
-    def create (self) :
-        """
-            CREATE TABLEs
-        """
-
-        log.info("Creating database tables: dhcp_hosts")
-        db.dhcp_hosts.create(self.db.engine, checkfirst=True)
-
-    def insert (self, attrs) :
-        """
-            INSERT new host
-        """
-
-        query = db.dhcp_hosts.insert().values(
-                ip          = attrs['ip'],
-                mac         = attrs['mac'],
-                gw          = attrs['gw'],
-
-                first_seen  = attrs['timestamp'],
-                count       = 1,
-
-                last_seen   = attrs['timestamp'],
-                state       = attrs['state'],
-                
-                name        = attrs.get('name'),
-                error       = attrs.get('error'),
-        )
-        
-        # -> id
-        return self.db.insert(query)
-
-    def update (self, attrs) :
-        """
-            UPDATE existing host, or return False if not found.
-        """
-
-        table = db.dhcp_hosts
-
-        query = table.update()
-        query = query.where((table.c.ip == attrs['ip']) & (table.c.mac == attrs['mac']) & (table.c.gw == attrs['gw']))
-        query = query.values(
-                count       = db.func.coalesce(table.c.count, 0) + 1,
-
-                # set
-                last_seen   = attrs['timestamp'],
-                state       = attrs['state'],
-        )
-        
-        if 'name' in attrs :
-            query = query.values(name = attrs['name'])
-        
-        if 'error' in attrs :
-            query = query.values(error = attrs['error'])
-
-        # any matched rows?
-        return self.db.update(query)
-
-    def process (self, item) :
-        """
-            Process given DHCP syslog message to update the hosts table.
-        """
-
-        attrs = {}
-        
-        # ignore unless we have enough info to fully identify the client
-        # this means that we omit DHCPDISCOVER messages, but we get the OFFER/REQUEST/ACK
-        if any(name not in item for name in ('lease', 'hwaddr', 'gateway')) :
-            # ignore; we require these
-            return
-
-        # do not override error from request on NAK; clear otherwise
-        if item.get('type') == 'DHCPNAK' :
-            pass
-        else :
-            attrs['error'] = item.get('error')
-
-        # do not override name unless known
-        if item.get('name') :
-            attrs['name'] = item.get('name')
-
-        # db: syslog
-        ATTR_MAP = (
-            ('ip',          'lease'),
-            ('mac',         'hwaddr'),
-            ('gw',          'gateway'),
-
-            ('timestamp',   'timestamp'),
-            ('state',       'type'),
-        )
-
-        # generic attrs
-        for key, name in ATTR_MAP :
-            attrs[key] = item.get(name)
-
-        # update existing?
-        if self.update(attrs) :
-            log.info("Update: %s", attrs)
-
-        else :
-            # new
-            log.info("Insert: %s", attrs)
-            self.insert(attrs)
-
-class DHCPLeasesDatabase (object) :
-    def __init__ (self, db) :
-        self.db = db
-
-    def create (self) :
-        """
-            CREATE TABLEs
-        """
-
-        log.info("Creating database tables: dhcp_leases")
-        db.dhcp_leases.create(self.db.engine, checkfirst=True)
-
-    def update (self, lease) :
-        """
-            Try an extend an existing lease?
-        """
-
-        c = db.dhcp_leases.c
-
-        ip = lease['lease']
-        mac = lease.get('hwaddr')
-        starts = lease['starts']
-        ends = lease.get('ends')
-
-        update = db.dhcp_leases.update()
-        
-        # XXX: if ends is None?
-        if mac :
-            # renew lease..?
-            update = update.where((c.ip == ip) & (c.mac == mac) & ((starts < c.ends) | (c.ends == None)))
-        else :
-            # new state for lease..?
-            update = update.where((c.ip == ip) & ((starts < c.ends) | (c.ends == ends)))
-
-        update = update.values(
-                state       = lease['binding-state'],
-                next        = lease.get('next-binding-state'),
-                ends        = ends,
-        )
-
-        if lease.get('client-hostname') :
-            update = update.values(hostname = lease['client-hostname'])
-
-        return self.db.update(update) > 0
-
-    def insert (self, lease) :
-        """
-            Record a new lease.
-        """
-
-        c = db.dhcp_leases.c
-
-        query = db.dhcp_leases.insert().values(
-            ip          = lease['lease'],
-            mac         = lease['hwaddr'],
-            hostname    = lease.get('client-hostname'),
-
-            starts      = lease['starts'],
-            ends        = lease.get('ends'),
-
-            state       = lease['binding-state'],
-            next        = lease.get('next-binding-state'),
-        )
-
-        return self.db.insert(query)
-
-    def process (self, lease) :
-        """
-            Process given DHCP lease to update currently active lease, or insert a new one.
-        """
-
-        # update existing?
-        if self.update(lease) :
-            log.info("Update: %s", lease)
-
-        elif lease.get('hwaddr') :
-            # new
-            id = self.insert(lease)
-
-            log.info("Insert: %s -> %d", lease, id)
-
-        else :
-            # may be a free lease
-            log.warn("Ignored lease: %s", lease)
-
-# XXX: mainloop
-import time
-
-class DHCPHandler (object) :
-    """
-        Process lines from syslog
-    """
-
-    def __init__ (self, db, syslog, leases) :
-        self.db = db
-
-        self.syslog = syslog
-        self.leases = leases
-
-        self.hostdb = DHCPHostsDatabase(db)
-        self.leasedb = DHCPLeasesDatabase(db)
-
-        # XXX
-        self.parser = pvl.dhcp.syslog.DHCPSyslogParser()
-
-    def createdb (self) :
-        """
-            Initialize database tables.
-        """
-
-        self.hostdb.create()
-        self.leasedb.create()
-
-    def process_syslog (self, item) :
-        """
-            Handle a single item read from syslog to DB.
-        """
-
-        dhcp_item = self.parser.parse(item['msg'])
-
-        log.debug("%s: %s", item, dhcp_item)
-        
-        if not dhcp_item :
-            # ignore
-            return
-
-        dhcp_item['timestamp'] = item['timestamp'] # XXX: fixup DHCPSyslogParser?
-
-        if item.get('error') :
-            item['error'] = self.filter.parse_error(item['error'])
-
-        self.hostdb.process(dhcp_item)
-
-    def process_leases (self, leases) :
-        """
-            Handle given changed lease.
-        """
-        
-        for lease in leases :
-            log.info("%s", lease['lease'])
-
-            self.leasedb.process(lease)
-
-    def sync_leases (self, leases) :
-        """
-            Sync the entire given set of valid leases.
-        """
-        
-        log.info("%d leases", len(leases))
-
-        # XXX: any difference? Mostly removed leases, but... they should expire by themselves, right?
-        self.process_leases(leases)
-
-    def main (self, poll) :
-        """
-            Read items from syslog source with given polling style.
-
-            TODO: poll = false on SIGINT
-        """
-        
-        # mainloop
-        while True :
-            if self.syslog :
-                # process from source
-                for item in self.syslog :
-                    self.process_syslog(item)
-            
-            if self.leases :
-                # process internally
-                leases = self.leases.process()
-
-                if leases :
-                    self.process_leases(leases)
-
-            if not poll :
-                # done
-                break
-
-            elif self.syslog :
-                # wait
-                self.syslog.select(poll)
-
-            else :
-                # XXX: for --leases-tail
-                time.sleep(poll)
-
-            log.debug("tick")
-
 def main (argv) :
     options, args = parse_options(argv)
 
@@ -388,39 +81,24 @@
         return 1
 
     log.info("Open up database: %s", options.database)
-    database = pvl.verkko.db.Database(options.database)
-   
+    db = pvl.verkko.db.Database(options.database)
+    hosts_db = pvl.dhcp.hosts.DHCPHostsDatabase(db)
+
+    if options.create :
+        hosts_db.create()
+  
     # syslog
     log.info("Open up syslog...")
-    syslog = pvl.syslog.args.apply(options, optional=True)
-
-    if syslog :
-        poll = syslog.poll
-    else :
-        log.warning("No syslog source given")
-        poll = None
-
-    # leases
-    if options.leases_file :
-        log.info("Open up DHCP leases...")
-        leases = pvl.dhcp.leases.DHCPLeasesDatabase(options.leases_file)
+    syslog = pvl.syslog.args.apply(options)
+    parser = pvl.dhcp.syslog.DHCPSyslogParser()
 
-        # force polling interval
-        if options.leases_tail :
-            # XXX: min between syslog/leases?
-            poll = options.leases_tail
-
-    else :
-        leases = None
+    log.info("Enter mainloop...")
+    for source in syslog.main() :
+        # parse dhcp messages from syslog
+        for host in parser(source) :
+            log.debug("%s: %s", source, host)
 
-    # handler + main
-    handler = DHCPHandler(database, syslog, leases)
-
-    if options.create :
-        handler.createdb()
- 
-    log.info("Enter mainloop...")
-    handler.main(poll)
+            hosts_db(host)
     
     # done
     return 0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/dhcp/hosts.py	Sat Jan 26 11:49:16 2013 +0200
@@ -0,0 +1,121 @@
+"""
+    Track active DHCP hosts on network by dhcp messages.
+"""
+
+import logging; log = logging.getLogger('pvl.dhcp.hosts')
+
+# XXX: from db.dhcp_leases instead?
+import pvl.verkko.db as db
+
+class DHCPHostsDatabase (object) :
+    """
+        pvl.verkko.Database dhcp_hosts model for updates.
+    """
+
+    def __init__ (self, db) :
+        self.db = db
+
+    def create (self) :
+        """
+            CREATE TABLEs
+        """
+
+        log.info("Creating database tables: dhcp_hosts")
+        db.dhcp_hosts.create(self.db.engine, checkfirst=True)
+
+    def insert (self, attrs) :
+        """
+            INSERT new host
+        """
+
+        query = db.dhcp_hosts.insert().values(
+                ip          = attrs['ip'],
+                mac         = attrs['mac'],
+                gw          = attrs['gw'],
+
+                first_seen  = attrs['timestamp'],
+                count       = 1,
+
+                last_seen   = attrs['timestamp'],
+                state       = attrs['state'],
+                
+                name        = attrs.get('name'),
+                error       = attrs.get('error'),
+        )
+        
+        # -> id
+        return self.db.insert(query)
+
+    def update (self, attrs) :
+        """
+            UPDATE existing host, or return False if not found.
+        """
+
+        table = db.dhcp_hosts
+
+        query = table.update()
+        query = query.where((table.c.ip == attrs['ip']) & (table.c.mac == attrs['mac']) & (table.c.gw == attrs['gw']))
+        query = query.values(
+                count       = db.func.coalesce(table.c.count, 0) + 1,
+
+                # set
+                last_seen   = attrs['timestamp'],
+                state       = attrs['state'],
+        )
+        
+        if 'name' in attrs :
+            query = query.values(name = attrs['name'])
+        
+        if 'error' in attrs :
+            query = query.values(error = attrs['error'])
+
+        # any matched rows?
+        return self.db.update(query)
+
+    def __call__ (self, item) :
+        """
+            Process given DHCP syslog message to update the hosts table.
+        """
+
+        attrs = {}
+        
+        # ignore unless we have enough info to fully identify the client
+        # this means that we omit DHCPDISCOVER messages, but we get the OFFER/REQUEST/ACK
+        if any(name not in item for name in ('lease', 'hwaddr', 'gateway')) :
+            # ignore; we require these
+            return
+
+        # do not override error from request on NAK; clear otherwise
+        if item.get('type') == 'DHCPNAK' :
+            pass
+        else :
+            attrs['error'] = item.get('error')
+
+        # do not override name unless known
+        if item.get('name') :
+            attrs['name'] = item.get('name')
+
+        # db: syslog
+        ATTR_MAP = (
+            ('ip',          'lease'),
+            ('mac',         'hwaddr'),
+            ('gw',          'gateway'),
+
+            ('timestamp',   'timestamp'),
+            ('state',       'type'),
+        )
+
+        # generic attrs
+        for key, name in ATTR_MAP :
+            attrs[key] = item.get(name)
+
+        # update existing?
+        if self.update(attrs) :
+            log.info("Update: %s", attrs)
+
+        else :
+            # new
+            log.info("Insert: %s", attrs)
+            self.insert(attrs)
+
+
--- a/pvl/dhcp/leases.py	Sat Jan 26 11:48:02 2013 +0200
+++ b/pvl/dhcp/leases.py	Sat Jan 26 11:49:16 2013 +0200
@@ -205,9 +205,9 @@
             for item in self.parse(line) :
                 yield item
 
-class DHCPLeasesDatabase (object) :
+class DHCPLeases (object) :
     """
-        Process log-structured leases file.
+        Process log-structured leases file, updated by dhcpd.
     """
 
     LEASE_DATES = ('starts', 'ends', 'tstp', 'tsfp', 'atsfp', 'cltt')
@@ -224,20 +224,20 @@
         """
 
         # tail; handles file re-writes
-        self.source = pvl.syslog.tail.TailFile(path)
+        self.source = pvl.syslog.tail.Tail(path)
 
         # parser state
         self.parser = DHCPLeasesParser()
 
         # initial leases state
-        self.leases = None
+        self._leases = None
 
     def reset (self) :
         """
             Reset state, if we started to read a new file.
         """
 
-        self.leases = {}
+        self._leases = {}
 
     def process_lease_item_date (self, args) :
         """
@@ -296,7 +296,7 @@
         """
 
         # replace any existing
-        lease = self.leases[lease_name] = {}
+        lease = self._leases[lease_name] = {}
 
         # meta
         lease['lease'] = lease_name
@@ -354,8 +354,8 @@
 
             log.debug("lease: %s: %s", lease, items)
             
-            if lease in self.leases :
-                old = self.leases[lease]
+            if lease in self._leases :
+                old = self._leases[lease]
             else :
                 old = None
 
@@ -369,22 +369,18 @@
         else :
             log.warn("unknown block: %s: %s", type, args)
 
-    def process (self) :
+    def readleases (self) :
         """
             Read new lines from the leases database and update our state.
-            
-            XXX: Returns
-                (sync, leases)
 
-            whereby sync is normally False, and leases the set of (possibly) changed leases, unless during initial
-            startup and on database replacement, when the sync is True, and the entire set of valid leases is returned.
+            Yields changed leases. On startup and on periodic database reset, all leases are yielded.
         """
         
         # handle file replace by reading until EOF
         sync = False
 #        leases = []
 
-        if self.leases is None :
+        if self._leases is None :
             # initial sync
             self.reset()
             sync = True
@@ -413,7 +409,9 @@
 #        else :
 #            return False, leases
 
-    def __iter__ (self) :
+    __iter__ = readleases
+
+    def leases (self) :
         """
             Iterate over all leases.
         """
@@ -440,6 +438,105 @@
 
         return state
 
+# XXX: from db.dhcp_leases instead?
+import pvl.verkko.db as db
+
+class DHCPLeasesDatabase (object) :
+    """
+        pvl.verkko.Database dhcp_leases model for updates.
+    """
+
+    def __init__ (self, db) :
+        """
+            db      - pvl.verkko.Database
+        """
+
+        self.db = db
+
+    def create (self) :
+        """
+            CREATE TABLEs
+        """
+
+        log.info("Creating database tables: dhcp_leases")
+        db.dhcp_leases.create(self.db.engine, checkfirst=True)
+
+    def update (self, lease) :
+        """
+            Try an extend an existing lease?
+        """
+
+        c = db.dhcp_leases.c
+
+        ip = lease['lease']
+        mac = lease.get('hwaddr')
+        starts = lease['starts']
+        ends = lease.get('ends')
+
+        update = db.dhcp_leases.update()
+        
+        # XXX: if ends is None?
+        if mac :
+            # renew lease..?
+            update = update.where((c.ip == ip) & (c.mac == mac) & ((starts < c.ends) | (c.ends == None)))
+        else :
+            # new state for lease..?
+            update = update.where((c.ip == ip) & ((starts < c.ends) | (c.ends == ends)))
+
+        update = update.values(
+                state       = lease['binding-state'],
+                next        = lease.get('next-binding-state'),
+                ends        = ends,
+        )
+
+        if lease.get('client-hostname') :
+            update = update.values(hostname = lease['client-hostname'])
+
+        return self.db.update(update) > 0
+
+    def insert (self, lease) :
+        """
+            Record a new lease.
+        """
+
+        c = db.dhcp_leases.c
+
+        query = db.dhcp_leases.insert().values(
+            ip          = lease['lease'],
+            mac         = lease['hwaddr'],
+            hostname    = lease.get('client-hostname'),
+
+            starts      = lease['starts'],
+            ends        = lease.get('ends'),
+
+            state       = lease['binding-state'],
+            next        = lease.get('next-binding-state'),
+        )
+
+        return self.db.insert(query)
+
+    def __call__ (self, lease) :
+        """
+            Process given DHCP lease to update currently active lease, or insert a new one.
+
+            XXX: transaction? *leases?
+        """
+
+        # update existing?
+        if self.update(lease) :
+            log.info("Update: %s", lease)
+
+        elif lease.get('hwaddr') :
+            # new
+            id = self.insert(lease)
+
+            log.info("Insert: %s -> %d", lease, id)
+
+        else :
+            # may be a free lease
+            log.warn("Ignored lease: %s", lease)
+
+
 if __name__ == '__main__' :
     import logging
 
--- a/pvl/dhcp/syslog.py	Sat Jan 26 11:48:02 2013 +0200
+++ b/pvl/dhcp/syslog.py	Sat Jan 26 11:49:16 2013 +0200
@@ -1,7 +1,9 @@
 """
-    Parse ISC dhcpd messages in syslog.
+    Parse ISC dhcpd messages from pvl.syslog.
 """
 
+from pvl.invoke import merge # XXX
+
 import re
 
 import logging; log = logging.getLogger('pvl.dhcp.syslog')
@@ -10,6 +12,8 @@
 class DHCPSyslogParser (object) :
     """
         Parse SyslogMessages from SyslogParser for ISC dhcp semantics.
+
+        TODO: BOOTREQUEST from <hwaddr> via <gateway>
     """
 
     ## various message types sent/recieved by dhcpd
@@ -184,6 +188,30 @@
         # nope
         return None
 
+    def process (self, source) :
+        """
+            Process syslog items from given source, yielding parsed DHCP items.
+        """
+
+        for item in source :
+            dhcp_item = self.parse(item['msg'])
+
+            log.debug("%s: %s", item, dhcp_item)
+            
+            if not dhcp_item :
+                # ignore
+                continue
+
+            item = merge(item, dhcp_item)
+
+            if item.get('error') :
+                item['error'] = self.parse_error(item['error'])
+            
+            yield item
+    
+    __call__ = process
+
+
 if __name__ == '__main__' :
     import logging