track dhcp leases, using pvl.verkko.dhcp.leases from pvl-collectd..
--- a/bin/pvl.verkko-dhcp Thu Oct 18 23:06:23 2012 +0300
+++ b/bin/pvl.verkko-dhcp Fri Oct 19 02:39:57 2012 +0300
@@ -11,6 +11,7 @@
import pvl.verkko.db as db
import pvl.syslog.dhcp
+import pvl.verkko.dhcp.leases
import logging, optparse
@@ -41,6 +42,14 @@
## syslog
parser.add_option_group(pvl.syslog.args.parser(parser, prog=DHCP_SYSLOG_PROG))
+ ## leases
+ parser.add_option('--leases-file', metavar='FILE',
+ help="Synchronize dhcpd leases from given file")
+
+ parser.add_option('--leases-tail', type='float', metavar='POLL',
+ help="Continuously poll leases file XXX: overrides --syslog-tail")
+
+
## XXX: networks
parser.add_option('--network', metavar='NET', action='append',
help="Filter leases by network prefix as plugin instance")
@@ -71,7 +80,13 @@
return options, args
-class DHCPHostsDatabase (db.Database) :
+class DHCPHostsDatabase (object) :
+ """
+ The dhcp_hosts table in our database.
+ """
+
+ def __init__ (self, db) :
+ self.db = db
def create (self) :
"""
@@ -79,7 +94,7 @@
"""
log.info("Creating database tables: dhcp_hosts")
- db.dhcp_hosts.create(self.engine)
+ db.dhcp_hosts.create(self.db.engine)
def insert (self, attrs) :
"""
@@ -100,10 +115,9 @@
name = attrs.get('name'),
error = attrs.get('error'),
)
- result = self.engine.execute(query)
- id, = result.inserted_primary_key
- return id
+ # -> id
+ return self.db.insert(query)
def update (self, attrs) :
"""
@@ -128,10 +142,8 @@
if 'error' in attrs :
query = query.values(error = attrs['error'])
- result = self.engine.execute(query)
-
# any matched rows?
- return result.rowcount > 0
+ return self.db.update(query)
def process (self, item) :
"""
@@ -179,21 +191,118 @@
log.info("Insert: %s", attrs)
self.insert(attrs)
-class DHCPSyslogHandler (object) :
+class DHCPLeasesDatabase (object) :
+ def __init__ (self, db) :
+ self.db = db
+
+ def create (self) :
+ """
+ CREATE TABLEs
+ """
+
+ log.info("Creating database tables: dhcp_leases")
+ db.dhcp_leases.create(self.db.engine)
+
+ def update (self, lease) :
+ """
+ Try an extend an existing lease?
+ """
+
+ c = db.dhcp_leases.c
+
+ ip = lease['lease']
+ mac = lease.get('hwaddr')
+ starts = lease['starts']
+ ends = lease['ends']
+
+ update = db.dhcp_leases.update()
+
+ if mac :
+ # renew lease..?
+ update = update.where((c.ip == ip) & (c.mac == mac) & ((starts < c.ends) | (c.ends == None)))
+ else :
+ # new state for lease..?
+ update = update.where((c.ip == ip) & ((starts < c.ends) | (c.ends == ends)))
+
+ update = update.values(
+ state = lease['binding-state'],
+ next = lease.get('next-binding-state'),
+ ends = lease['ends'],
+ )
+
+ if lease.get('client-hostname') :
+ update = update.values(hostname = lease['client-hostname'])
+
+ return self.db.update(update) > 0
+
+ def insert (self, lease) :
+ """
+ Record a new lease.
+ """
+
+ c = db.dhcp_leases.c
+
+ query = db.dhcp_leases.insert().values(
+ ip = lease['lease'],
+ mac = lease['hwaddr'],
+ hostname = lease.get('client-hostname'),
+
+ starts = lease['starts'],
+ ends = lease['ends'],
+
+ state = lease['binding-state'],
+ next = lease.get('next-binding-state'),
+ )
+
+ return self.db.insert(query)
+
+ def process (self, lease) :
+ """
+ Process given DHCP lease to update currently active lease, or insert a new one.
+ """
+
+ # update existing?
+ if self.update(lease) :
+ log.info("Update: %s", lease)
+
+ elif lease.get('hwaddr') :
+ # new
+ id = self.insert(lease)
+
+ log.info("Insert: %s -> %d", lease, id)
+
+ else :
+ # may be a free lease
+ log.warn("Ignored lease: %s", lease)
+
+class DHCPHandler (object) :
"""
Process lines from syslog
"""
- def __init__ (self, db) :
+ def __init__ (self, db, syslog, leases) :
self.db = db
+ self.syslog = syslog
+ self.leases = leases
+
+ self.hostdb = DHCPHostsDatabase(db)
+ self.leasedb = DHCPLeasesDatabase(db)
+
# XXX
self.filter = pvl.syslog.dhcp.DHCPSyslogFilter()
- def process (self, item) :
+ def createdb (self) :
+ """
+ Initialize database tables.
+ """
+
+ self.hostdb.create()
+ self.leasedb.create()
+
+ def process_syslog (self, item) :
"""
Handle a single item read from syslog to DB.
- filter = pvl.syslog.dhcp.DHCPSyslogFilter()
"""
dhcp_item = self.filter.parse(item['msg'])
@@ -207,9 +316,29 @@
if item.get('error') :
item['error'] = self.filter.parse_error(item['error'])
- self.db.process(dhcp_item)
+ self.hostdb.process(dhcp_item)
- def main (self, source, poll) :
+ def process_leases (self, leases) :
+ """
+ Handle given changed lease.
+ """
+
+ for lease in leases :
+ log.info("%s", lease['lease'])
+
+ self.leasedb.process(lease)
+
+ def sync_leases (self, leases) :
+ """
+ Sync the entire given set of valid leases.
+ """
+
+ log.info("%d leases", len(leases))
+
+ # XXX: any difference? Mostly removed leases, but... they should expire by themselves, right?
+ self.process_leases(leases)
+
+ def main (self, poll) :
"""
Read items from syslog source with given polling style.
@@ -220,10 +349,23 @@
# mainloop
while True :
- # process from source
- for item in parser.process(source) :
- self.process(item)
+ if self.syslog :
+ # process from source
+ for item in parser.process(self.syslog) :
+ self.process_syslog(item)
+ if self.leases :
+ # process internally
+ #sync, leases = self.leases.process()
+ leases = self.leases.process()
+
+ #if sync :
+ # self.sync_leases(leases)
+ if leases :
+ self.process_leases(leases)
+ #else :
+ # pass
+
if poll is False :
# done
break
@@ -242,38 +384,52 @@
return 1
log.info("Open up database: %s", options.database)
- database = DHCPHostsDatabase(options.database)
-
- if options.create :
- database.create()
-
+ database = pvl.verkko.db.Database(options.database)
+
# syslog
log.info("Open up syslog...")
syslog_parser = pvl.syslog.parser.SyslogParser(prog=DHCP_SYSLOG_PROG) # filter by prog
if options.syslog_fifo :
- source = pvl.syslog.fifo.Fifo(options.syslog_fifo)
+ syslog = pvl.syslog.fifo.Fifo(options.syslog_fifo)
poll = None # no timeout
elif options.syslog_tail :
# continuous file tail
- source = pvl.syslog.tail.TailFile(options.syslog_file)
+ syslog = pvl.syslog.tail.TailFile(options.syslog_file)
poll = options.syslog_tail # polling interval
elif options.syslog_file :
# one-shot file read
- source = pvl.syslog.tail.TailFile(options.syslog_file)
+ syslog = pvl.syslog.tail.TailFile(options.syslog_file)
poll = False # do not poll-loop
else :
+ syslog = None
+ poll = False
log.warning("No syslog source given")
- return 0
+
+ # leases
+ if options.leases_file :
+ log.info("Open up DHCP leases...")
+ leases = pvl.verkko.dhcp.leases.DHCPLeasesDatabase(options.leases_file)
+
+ # force polling interval
+ if options.leases_tail :
+ # XXX: min between syslog/leases?
+ poll = options.leases_tail
+
+ else :
+ leases = None
# handler + main
- handler = DHCPSyslogHandler(database)
+ handler = DHCPHandler(database, syslog, leases)
+ if options.create :
+ handler.createdb()
+
log.info("Enter mainloop...")
- handler.main(source, poll)
+ handler.main(poll)
# done
return 0
--- a/pvl/verkko/db.py Thu Oct 18 23:06:23 2012 +0300
+++ b/pvl/verkko/db.py Fri Oct 19 02:39:57 2012 +0300
@@ -6,9 +6,8 @@
# schema
metadata = MetaData()
-# TODO: count, completely separate dhcp_events?
dhcp_hosts = Table('dhcp_hosts', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True),
# unique
Column('ip', String, nullable=False),
@@ -30,6 +29,20 @@
UniqueConstraint('ip', 'mac', 'gw'),
)
+dhcp_leases = Table('dhcp_leases', metadata,
+ Column('id', Integer, primary_key=True),
+
+ Column('ip', String, nullable=False),
+ Column('mac', String, nullable=False),
+ Column('hostname', String, nullable=True),
+
+ Column('starts', DateTime, nullable=False),
+ Column('ends', DateTime, nullable=True), # never
+
+ Column('state', String, nullable=True),
+ Column('next', String, nullable=True),
+)
+
# for ORM models
from sqlalchemy.orm import mapper, sessionmaker
@@ -40,9 +53,6 @@
Our underlying database.
"""
- # XXX: alias Tables in?
- dhcp_hosts = dhcp_hosts
-
def __init__ (self, database) :
"""
database - sqlalchemy connection URI
@@ -57,6 +67,9 @@
return Session(bind=self.engine)
# SQL
+ def execute (self, query) :
+ return self.engine.execute(query)
+
def select (self, query) :
return self.engine.execute(query)
@@ -68,3 +81,24 @@
"""
return self.select(query).fetchone()
+
+ def insert (self, insert) :
+ """
+ Execute given INSERT query, returning the inserted primary key.
+ """
+
+ result = self.engine.execute(insert)
+
+ id, = result.inserted_primary_key
+
+ return id
+
+ def update (self, update) :
+ """
+ Execute given UPDATE query, returning the number of matched rows.
+ """
+
+ result = self.engine.execute(update)
+
+ return result.rowcount
+
--- a/pvl/verkko/hosts.py Thu Oct 18 23:06:23 2012 +0300
+++ b/pvl/verkko/hosts.py Fri Oct 19 02:39:57 2012 +0300
@@ -129,6 +129,7 @@
'name': Host.name,
'seen': Host.last_seen,
'state': Host.state,
+ 'count': Host.count,
}
HOST_SORT = Host.last_seen.desc()
@@ -268,6 +269,7 @@
('First seen', host.first_seen),
('Last seen', host.last_seen),
('Last state', host.render_state()),
+ ('Total messages', host.count),
)
return (
--- a/test.wsgi Thu Oct 18 23:06:23 2012 +0300
+++ b/test.wsgi Fri Oct 19 02:39:57 2012 +0300
@@ -4,6 +4,6 @@
import pvl.verkko.wsgi
-DATABASE_READ = 'postgresql://verkko_dev:iphie5Aa1ohquahd@localhost/terom_verkko_dev'
+DATABASE_READ = 'postgresql://verkko_dev:iphie5Aa1ohquahd@localhost/verkko_dev'
application = pvl.verkko.wsgi.Application(DATABASE_READ)