#!/usr/bin/env python
"""
Process bind zonefiles.
Takes a zonefile as input, and gives a zonefile as output.
"""
import pvl.args
import pvl.dns.zone
from pvl.dns import __version__
from pvl.dns.zone import ZoneRecord, reverse_ipv4, reverse_ipv6, fqdn
import optparse
import os.path
import logging; log = logging.getLogger('main')
def parse_options (argv) :
"""
Parse command-line arguments.
"""
prog = argv[0]
parser = optparse.OptionParser(
prog = prog,
usage = '%prog: [options]',
version = __version__,
# module docstring
description = __doc__,
)
# logging
parser.add_option_group(pvl.args.parser(parser))
# input/output
parser.add_option('--input-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for input files")
parser.add_option('-o', '--output', metavar='FILE', default=None,
help="Write to output file; default stdout")
parser.add_option('--output-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for output files")
# check stage
parser.add_option('--check-hosts', action='store_true',
help="Check that host/IPs are unique. Use --quiet to silence warnings, and test exit status")
parser.add_option('--check-exempt', metavar='HOST', action='append',
help="Allow given names to have multiple records")
# meta stage
parser.add_option('--meta-zone', action='store_true',
help="Generate host metadata zone; requires --input-line-date")
parser.add_option('--meta-ignore', metavar='HOST', action='append',
help="Ignore given hostnames in metadata output")
parser.add_option('--input-line-date', action='store_true',
help="Parse timestamp prefix from each input line (e.g. `hg blame | ...`)")
# forward stage
parser.add_option('--forward-zone', action='store_true',
help="Generate forward zone")
parser.add_option('--forward-txt', action='store_true',
help="Generate TXT records for forward zone")
parser.add_option('--forward-mx', metavar='MX',
help="Generate MX records for forward zone")
# reverse stage
parser.add_option('--reverse-domain', metavar='DOMAIN',
help="Domain to use for hosts in reverse zone")
parser.add_option('--reverse-zone', metavar='NET',
help="Generate forward zone for given subnet (x.z.y | a:b:c:d)")
# other
parser.add_option('--serial', metavar='YYMMDDXX',
help="Set serial for SOA record")
parser.add_option('--include-path', metavar='PATH',
help="Rewrite includes to given absolute path")
# defaults
parser.set_defaults(
# XXX: combine
check_exempt = [],
meta_ignore = [],
)
# parse
options, args = parser.parse_args(argv[1:])
# apply
pvl.args.apply(options, prog)
return options, args
def apply_zone_input (options, args) :
"""
Yield ZoneLine, ZoneRecord pairs from files.
"""
for file in pvl.args.apply_files(args, 'r', options.input_charset) :
log.info("Reading zone: %s", file)
for line, record in pvl.dns.zone.ZoneLine.load(file,
line_timestamp_prefix = options.input_line_date,
) :
yield line, record
# TODO: --check-types to limit this to A/AAAA/CNAME etc
def check_zone_hosts (zone, whitelist=None, whitelist_types=set(['TXT'])) :
"""
Parse host/IP pairs from the zone, and verify that they are unique.
As an exception, names listed in the given whitelist may have multiple IPs.
"""
by_name = {}
by_ip = {}
fail = None
last_name = None
for l, r in zone :
if r :
name = r.name or last_name
name = (r.origin, name)
# name
if r.type not in whitelist_types :
if name not in by_name :
by_name[name] = r
elif r.name in whitelist :
log.debug("Duplicate whitelist entry: %s", r)
else :
# fail!
log.warn("%s: Duplicate name: %s <-> %s", r.line, r, by_name[name])
fail = True
# ip
if r.type == 'A' :
ip, = r.data
if ip not in by_ip :
by_ip[ip] = r
else :
# fail!
log.warn("%s: Duplicate IP: %s <-> %s", r.line, r, by_ip[ip])
fail = True
if fail :
log.error("Check failed, see warnings")
sys.exit(2)
yield l, r
def process_zone_serial (zone, serial) :
"""
Update the serial in the SOA record.
"""
for line, rr in zone :
if rr and rr.type == 'SOA' :
# XXX: as SOA record..
try :
soa = pvl.dns.zone.SOA.parse(line)
except TypeError as error :
log.exception("%s: unable to parse SOA: %s", rr.name, rr)
sys.exit(2)
yield line, pvl.dns.zone.SOA(
soa.master, soa.contact,
serial, soa.refresh, soa.retry, soa.expire, soa.nxttl
)
else :
yield line, rr
def process_zone_forwards (zone, txt=False, mx=False) :
"""
Process zone data -> forward zone data.
"""
for line, r in zone :
yield line, r
if r and r.type == 'A' :
if txt :
# comment?
comment = r.line.comment
if comment :
yield line, ZoneRecord.TXT(None, comment, ttl=r.ttl)
# XXX: RP, do we need it?
if mx :
# XXX: is this even a good idea?
yield line, ZoneRecord.MX(None, 10, mx, ttl=r.ttl)
def process_zone_meta (zone, ignore=None) :
"""
Process zone metadata -> output.
"""
TIMESTAMP_FORMAT = '%Y/%m/%d'
for line, r in zone :
if ignore and r.name in ignore :
# skip
log.debug("Ignore record: %s", r)
continue
# for hosts..
if r.type == 'A' :
# timestamp?
timestamp = r.line.timestamp
if timestamp :
yield line, ZoneRecord.TXT(r.name, timestamp.strftime(TIMESTAMP_FORMAT), ttl=r.ttl)
def process_zone_reverse (zone, origin, domain) :
"""
Process zone data -> reverse zone data.
"""
for line, r in zone :
if r and r.type == 'A' :
ip, = r.data
ptr = reverse_ipv4(ip)
elif r and r.type == 'AAAA' :
ip, = r.data
ptr = reverse_ipv6(ip)
else :
yield line, r
continue
# verify
if zone and ptr.endswith(origin) :
ptr = ptr[:-(len(origin) + 1)]
else :
log.warning("Reverse does not match zone origin, skipping: (%s) -> %s <-> %s", ip, ptr, origin)
continue
# domain to use
host_domain = r.origin or domain
host_fqdn = fqdn(name, host_domain)
yield line, ZoneRecord.PTR(ptr, host_fqdn)
def process_zone_includes (options, zone, path) :
"""
Rewrite include paths in zones.
"""
for line, rr in zone :
if line.parts[0] == '$INCLUDE' :
_, include = line.parts
yield pvl.dns.zone.ZoneLine(
line.file,
line.lineno,
line.line,
line.indent,
['$INCLUDE', '"{path}"'.format(path=os.path.join(path, include))],
), rr
else :
yield line, rr
def apply_zone_output (options, zone) :
"""
Write out the resulting zonefile.
"""
file = pvl.args.apply_file(options.output, 'w', options.output_charset)
for line, r in zone :
if r :
file.write(unicode(r))
else :
file.write(unicode(line))
file.write('\n')
def main (argv) :
options, args = parse_options(argv)
# input
zone = apply_zone_input(options, args)
if options.check_hosts :
whitelist = set(options.check_exempt)
log.info("Checking hosts: whitelist=%r", whitelist)
zone = list(check_zone_hosts(zone, whitelist=whitelist))
if options.serial :
log.info("Set zone serial: %s", options.serial)
zone = list(process_zone_serial(zone, serial=options.serial))
if options.forward_zone :
log.info("Generate forward zone...")
zone = list(process_zone_forwards(zone, txt=options.forward_txt, mx=options.forward_mx))
if options.meta_zone :
log.info("Generate metadata zone...")
if not options.input_line_date :
log.error("--meta-zone requires --input-line-date")
return 1
zone = list(process_zone_meta(zone, ignore=set(options.meta_ignore)))
if options.reverse_zone :
if ':' in options.reverse_zone :
# IPv6
origin = reverse_ipv6(options.reverse_zone)
else :
# IPv4
origin = reverse_ipv4(options.reverse_zone)
domain = options.reverse_domain
if not domain :
log.error("--reverse-zone requires --reverse-domain")
return 1
zone = list(process_zone_reverse(zone, origin=origin, domain=domain))
if options.include_path :
zone = list(process_zone_includes(options, zone, options.include_path))
# output
apply_zone_output(options, zone)
return 0
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))