# HG changeset patch # User Tero Marttila # Date 1332348236 -7200 # Node ID 21b33b9090d02a7e863309c72e1a918963047da3 # Parent 620f4594a09d953b8057c015f7c4e345defd2148 check-dhcp-hosts: parse dhcp conf for fixed-address stanzas, and ensure that they resolve.. diff -r 620f4594a09d -r 21b33b9090d0 bin/check-dhcp-hosts --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/check-dhcp-hosts Wed Mar 21 18:43:56 2012 +0200 @@ -0,0 +1,201 @@ +#!/usr/bin/env python + +""" + Go through a dhcp conf file looking for fixed-address stanzas, and make sure that they are valid. +""" + +__version__ = '0.0.1-dev' + +import optparse +import codecs +import logging + +import socket + +log = logging.getLogger('main') + +# command-line options, global state +options = None + +def parse_options (argv) : + """ + Parse command-line arguments. + """ + + prog = argv[0] + + parser = optparse.OptionParser( + prog = prog, + usage = '%prog: [options]', + version = __version__, + + # module docstring + description = __doc__, + ) + + # logging + general = optparse.OptionGroup(parser, "General Options") + + general.add_option('-q', '--quiet', dest='loglevel', action='store_const', const=logging.ERROR, help="Less output") + general.add_option('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO, help="More output") + general.add_option('-D', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG, help="Even more output") + + parser.add_option_group(general) + + # input/output + parser.add_option('-c', '--input-charset', metavar='CHARSET', default='utf-8', + help="Encoding used for input files") + + # + parser.add_option('--doctest', action='store_true', + help="Run module doctests") + + # defaults + parser.set_defaults( + loglevel = logging.WARN, + ) + + # parse + options, args = parser.parse_args(argv[1:]) + + # configure + logging.basicConfig( + format = prog + ': %(name)s: %(levelname)s %(funcName)s : %(message)s', + level = options.loglevel, + ) + + return options, args + +def parse_fixedaddrs (file) : + """ + Go through lines in given .conf file, looking for fixed-address stanzas. + """ + + filename = file.name + + for lineno, line in enumerate(file) : + # comments? + if '#' in line : + line, comment = line.split('#', 1) + + else : + comment = None + + # whitespace + line = line.strip() + + if not line : + # empty + continue + + # grep + if 'fixed-address' in line : + # great parsing :) + fixedaddr = line.replace('fixed-address', '').replace(';', '').strip() + + log.debug("%s:%d: %s: %s", filename, lineno, fixedaddr, line) + + yield lineno, fixedaddr + +def resolve_addr (addr, af=socket.AF_INET, socktype=socket.SOCK_STREAM) : + """ + Resolve given address for given AF_INET, returning a list of resolved addresses. + + Raises an Exception if failed. + + >>> resolve_addr('127.0.0.1') + ['127.0.0.1'] + """ + + if not addr : + raise Exception("Empty addr: %r", addr) + + # resolve + result = socket.getaddrinfo(addr, None, af, socktype) + + #log.debug("%s: %s", addr, result) + + # addresses + addrs = list(sorted(set(sockaddr[0] for family, socktype, proto, canonname, sockaddr in result))) + + return addrs + +def check_file_hosts (file) : + """ + Check all fixed-address parameters in given file. + """ + + filename = file.name + fail = 0 + + for lineno, addr in parse_fixedaddrs(file) : + # lookup + try : + resolved = resolve_addr(addr) + + except Exception as e: + log.warning("%s:%d: failed to resolve: %s: %s", filename, lineno, addr, e) + fail += 1 + + else : + log.debug("%s:%d: %s: %r", filename, lineno, addr, resolved) + + return fail + +def open_file (path, mode, charset) : + """ + Open unicode-enabled file from path, with - using stdio. + """ + + if path == '-' : + # use stdin/out based on mode + stream, func = { + 'r': (sys.stdin, codecs.getreader), + 'w': (sys.stdout, codecs.getwriter), + }[mode[0]] + + # wrap + return func(charset)(stream) + + else : + # open + return codecs.open(path, mode, charset) + +def main (argv) : + global options + + options, args = parse_options(argv) + + if options.doctest : + import doctest + fail, total = doctest.testmod() + return fail + + if args : + # open files + input_files = [open_file(path, 'r', options.input_charset) for path in args] + + else : + # default to stdout + input_files = [open_file('-', 'r', options.input_charset)] + + # process zone data + for file in input_files : + log.info("Reading zone: %s", file) + + fail = check_file_hosts(file) + + if fail : + log.warn("DHCP hosts check failed: %d", fail) + return 2 + + else : + log.info("DHCP hosts check OK") + + return 0 + +if __name__ == '__main__': + import sys + + sys.exit(main(sys.argv)) +