track dhcp leases, using pvl.verkko.dhcp.leases from pvl-collectd..
#!/usr/bin/env python
"""
Monitor DHCP use.
"""
__version__ = '0.0'
import pvl.args
import pvl.syslog.args
import pvl.verkko.db as db
import pvl.syslog.dhcp
import pvl.verkko.dhcp.leases
import logging, optparse
log = logging.getLogger('main')
# name of process in syslog
DHCP_SYSLOG_PROG = 'dhcpd'
def parse_options (argv) :
"""
Parse command-line arguments.
"""
prog = argv[0]
parser = optparse.OptionParser(
prog = prog,
usage = '%prog: [options]',
version = __version__,
# module docstring
description = __doc__,
)
# options
parser.add_option_group(pvl.args.parser(parser))
## syslog
parser.add_option_group(pvl.syslog.args.parser(parser, prog=DHCP_SYSLOG_PROG))
## leases
parser.add_option('--leases-file', metavar='FILE',
help="Synchronize dhcpd leases from given file")
parser.add_option('--leases-tail', type='float', metavar='POLL',
help="Continuously poll leases file XXX: overrides --syslog-tail")
## XXX: networks
parser.add_option('--network', metavar='NET', action='append',
help="Filter leases by network prefix as plugin instance")
parser.add_option('--gateway', metavar='GW/IFACE', action='append',
help="Filter messages by gateway/interface as plugin instance")
## hosts
parser.add_option('--database', metavar='URI',
help="Track hosts in given database")
parser.add_option('--create', action='store_true',
help="Initialize database")
# defaults
parser.set_defaults(
)
# parse
options, args = parser.parse_args(argv[1:])
# apply
pvl.args.apply(options, prog)
if not options.database :
parser.error("Missing required option: --database")
return options, args
class DHCPHostsDatabase (object) :
"""
The dhcp_hosts table in our database.
"""
def __init__ (self, db) :
self.db = db
def create (self) :
"""
CREATE TABLEs
"""
log.info("Creating database tables: dhcp_hosts")
db.dhcp_hosts.create(self.db.engine)
def insert (self, attrs) :
"""
INSERT new host
"""
query = db.dhcp_hosts.insert().values(
ip = attrs['ip'],
mac = attrs['mac'],
gw = attrs['gw'],
first_seen = attrs['timestamp'],
count = 1,
last_seen = attrs['timestamp'],
state = attrs['state'],
name = attrs.get('name'),
error = attrs.get('error'),
)
# -> id
return self.db.insert(query)
def update (self, attrs) :
"""
UPDATE existing host, or return False if not found.
"""
table = db.dhcp_hosts
query = table.update()
query = query.where((table.c.ip == attrs['ip']) & (table.c.mac == attrs['mac']) & (table.c.gw == attrs['gw']))
query = query.values(
count = db.func.coalesce(table.c.count, 0) + 1,
# set
last_seen = attrs['timestamp'],
state = attrs['state'],
)
if 'name' in attrs :
query = query.values(name = attrs['name'])
if 'error' in attrs :
query = query.values(error = attrs['error'])
# any matched rows?
return self.db.update(query)
def process (self, item) :
"""
Process given DHCP syslog message to update the hosts table.
"""
attrs = {}
# ignore unless we have enough info to fully identify the client
# this means that we omit DHCPDISCOVER messages, but we get the OFFER/REQUEST/ACK
if any(name not in item for name in ('lease', 'hwaddr', 'gateway')) :
# ignore; we require these
return
# do not override error from request on NAK; clear otherwise
if item.get('type') == 'DHCPNAK' :
pass
else :
attrs['error'] = item.get('error')
# do not override name unless known
if item.get('name') :
attrs['name'] = item.get('name')
# db: syslog
ATTR_MAP = (
('ip', 'lease'),
('mac', 'hwaddr'),
('gw', 'gateway'),
('timestamp', 'timestamp'),
('state', 'type'),
)
# generic attrs
for key, name in ATTR_MAP :
attrs[key] = item.get(name)
# update existing?
if self.update(attrs) :
log.info("Update: %s", attrs)
else :
# new
log.info("Insert: %s", attrs)
self.insert(attrs)
class DHCPLeasesDatabase (object) :
def __init__ (self, db) :
self.db = db
def create (self) :
"""
CREATE TABLEs
"""
log.info("Creating database tables: dhcp_leases")
db.dhcp_leases.create(self.db.engine)
def update (self, lease) :
"""
Try an extend an existing lease?
"""
c = db.dhcp_leases.c
ip = lease['lease']
mac = lease.get('hwaddr')
starts = lease['starts']
ends = lease['ends']
update = db.dhcp_leases.update()
if mac :
# renew lease..?
update = update.where((c.ip == ip) & (c.mac == mac) & ((starts < c.ends) | (c.ends == None)))
else :
# new state for lease..?
update = update.where((c.ip == ip) & ((starts < c.ends) | (c.ends == ends)))
update = update.values(
state = lease['binding-state'],
next = lease.get('next-binding-state'),
ends = lease['ends'],
)
if lease.get('client-hostname') :
update = update.values(hostname = lease['client-hostname'])
return self.db.update(update) > 0
def insert (self, lease) :
"""
Record a new lease.
"""
c = db.dhcp_leases.c
query = db.dhcp_leases.insert().values(
ip = lease['lease'],
mac = lease['hwaddr'],
hostname = lease.get('client-hostname'),
starts = lease['starts'],
ends = lease['ends'],
state = lease['binding-state'],
next = lease.get('next-binding-state'),
)
return self.db.insert(query)
def process (self, lease) :
"""
Process given DHCP lease to update currently active lease, or insert a new one.
"""
# update existing?
if self.update(lease) :
log.info("Update: %s", lease)
elif lease.get('hwaddr') :
# new
id = self.insert(lease)
log.info("Insert: %s -> %d", lease, id)
else :
# may be a free lease
log.warn("Ignored lease: %s", lease)
class DHCPHandler (object) :
"""
Process lines from syslog
"""
def __init__ (self, db, syslog, leases) :
self.db = db
self.syslog = syslog
self.leases = leases
self.hostdb = DHCPHostsDatabase(db)
self.leasedb = DHCPLeasesDatabase(db)
# XXX
self.filter = pvl.syslog.dhcp.DHCPSyslogFilter()
def createdb (self) :
"""
Initialize database tables.
"""
self.hostdb.create()
self.leasedb.create()
def process_syslog (self, item) :
"""
Handle a single item read from syslog to DB.
"""
dhcp_item = self.filter.parse(item['msg'])
if not dhcp_item :
# ignore
return
dhcp_item['timestamp'] = item['timestamp'] # XXX: fixup DHCPSyslogParser?
if item.get('error') :
item['error'] = self.filter.parse_error(item['error'])
self.hostdb.process(dhcp_item)
def process_leases (self, leases) :
"""
Handle given changed lease.
"""
for lease in leases :
log.info("%s", lease['lease'])
self.leasedb.process(lease)
def sync_leases (self, leases) :
"""
Sync the entire given set of valid leases.
"""
log.info("%d leases", len(leases))
# XXX: any difference? Mostly removed leases, but... they should expire by themselves, right?
self.process_leases(leases)
def main (self, poll) :
"""
Read items from syslog source with given polling style.
TODO: poll = false on SIGINT
"""
parser = pvl.syslog.parser.SyslogParser()
# mainloop
while True :
if self.syslog :
# process from source
for item in parser.process(self.syslog) :
self.process_syslog(item)
if self.leases :
# process internally
#sync, leases = self.leases.process()
leases = self.leases.process()
#if sync :
# self.sync_leases(leases)
if leases :
self.process_leases(leases)
#else :
# pass
if poll is False :
# done
break
else :
# wait
source.poll(poll)
log.debug("tick")
def main (argv) :
options, args = parse_options(argv)
# db
if not options.database :
log.error("No database given")
return 1
log.info("Open up database: %s", options.database)
database = pvl.verkko.db.Database(options.database)
# syslog
log.info("Open up syslog...")
syslog_parser = pvl.syslog.parser.SyslogParser(prog=DHCP_SYSLOG_PROG) # filter by prog
if options.syslog_fifo :
syslog = pvl.syslog.fifo.Fifo(options.syslog_fifo)
poll = None # no timeout
elif options.syslog_tail :
# continuous file tail
syslog = pvl.syslog.tail.TailFile(options.syslog_file)
poll = options.syslog_tail # polling interval
elif options.syslog_file :
# one-shot file read
syslog = pvl.syslog.tail.TailFile(options.syslog_file)
poll = False # do not poll-loop
else :
syslog = None
poll = False
log.warning("No syslog source given")
# leases
if options.leases_file :
log.info("Open up DHCP leases...")
leases = pvl.verkko.dhcp.leases.DHCPLeasesDatabase(options.leases_file)
# force polling interval
if options.leases_tail :
# XXX: min between syslog/leases?
poll = options.leases_tail
else :
leases = None
# handler + main
handler = DHCPHandler(database, syslog, leases)
if options.create :
handler.createdb()
log.info("Enter mainloop...")
handler.main(poll)
# done
return 0
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))