bin/pvl.dns-zone
author Tero Marttila <terom@paivola.fi>
Mon, 16 Dec 2013 11:41:37 +0200
changeset 258 1ad9cec4f556
parent 252 0ea4450fdd40
child 293 6351acf3eb3b
permissions -rwxr-xr-x
pvl.dns-zone: use pvl.args.apply_files
#!/usr/bin/env python

"""
    Process bind zonefiles.

    Takes a zonefile as input, and gives a zonefile as output.
"""

import optparse

import pvl.args
import pvl.dns.zone
from pvl.dns import __version__
from pvl.dns.zone import ZoneRecord, reverse_ipv4, reverse_ipv6, fqdn

import logging; log = logging.getLogger('main')

def parse_options (argv) :
    """
        Parse command-line arguments.
    """

    prog = argv[0]

    parser = optparse.OptionParser(
            prog        = prog,
            usage       = '%prog: [options]',
            version     = __version__,

            # module docstring
            description = __doc__,
    )

    # logging
    parser.add_option_group(pvl.args.parser(parser))

    # input/output
    parser.add_option('-c', '--input-charset',  metavar='CHARSET',  default='utf-8', 
            help="Encoding used for input files")

    parser.add_option('-o', '--output',         metavar='FILE',     default=None,
            help="Write to output file; default stdout")

    parser.add_option('--output-charset',       metavar='CHARSET',  default='utf-8', 
            help="Encoding used for output files")

    # check stage
    parser.add_option('--check-hosts',          action='store_true',
            help="Check that host/IPs are unique. Use --quiet to silence warnings, and test exit status")

    parser.add_option('--check-exempt',         metavar='HOST', action='append',
            help="Allow given names to have multiple records")

    # meta stage
    parser.add_option('--meta-zone',            action='store_true',
            help="Generate host metadata zone; requires --input-line-date")

    parser.add_option('--meta-ignore',          metavar='HOST', action='append',
            help="Ignore given hostnames in metadata output")

    parser.add_option('--input-line-date',      action='store_true',
            help="Parse timestamp prefix from each input line (e.g. `hg blame | ...`)")

    # forward stage
    parser.add_option('--forward-zone',         action='store_true', 
            help="Generate forward zone")

    parser.add_option('--forward-txt',          action='store_true',
            help="Generate TXT records for forward zone")

    parser.add_option('--forward-mx',           metavar='MX',
            help="Generate MX records for forward zone")

    # reverse stage
    parser.add_option('--reverse-domain',       metavar='DOMAIN',
            help="Domain to use for hosts in reverse zone")

    parser.add_option('--reverse-zone',         metavar='NET',
            help="Generate forward zone for given subnet (x.z.y | a:b:c:d)")

    # other
    parser.add_option('--serial',               metavar='YYMMDDXX',
            help="Set serial for SOA record")

    # defaults
    parser.set_defaults(
        # XXX: combine
        check_exempt        = [],
        meta_ignore         = [],
    )
    
    # parse
    options, args = parser.parse_args(argv[1:])

    # apply
    pvl.args.apply(options, prog)

    return options, args

def check_zone_hosts (zone, whitelist=None, whitelist_types=set(['TXT'])) :
    """
        Parse host/IP pairs from the zone, and verify that they are unique.

        As an exception, names listed in the given whitelist may have multiple IPs.
    """

    by_name = {}
    by_ip = {}

    fail = None

    last_name = None

    for r in zone :
        name = r.name or last_name

        name = (r.origin, name)

        # name
        if r.type not in whitelist_types :
            if name not in by_name :
                by_name[name] = r

            elif r.name in whitelist :
                log.debug("Duplicate whitelist entry: %s", r)

            else :
                # fail!
                log.warn("%s: Duplicate name: %s <-> %s", r.line, r, by_name[name])
                fail = True

        # ip
        if r.type == 'A' :
            ip, = r.data

            if ip not in by_ip :
                by_ip[ip] = r

            else :
                # fail!
                log.warn("%s: Duplicate IP: %s <-> %s", r.line, r, by_ip[ip])
                fail = True

    return fail

def process_zone_soa (soa, serial) :
    return pvl.dns.zone.SOA(
        soa.master, soa.contact,
        serial, soa.refresh, soa.retry, soa.expire, soa.nxttl
    )

def process_zone_serial (zone, serial) :
    for rr in zone :
        if rr.type == 'SOA' :
            # XXX: as SOA record..
            yield process_zone_soa(pvl.dns.zone.SOA.parse(rr.line), serial)
        else :
            yield rr

def process_zone_forwards (zone, txt=False, mx=False) :
    """
        Process zone data -> forward zone data.
    """

    for r in zone :
        yield r

        if r.type == 'A' :
            if txt :
                # comment?
                comment = r.line.comment

                if comment :
                    yield ZoneRecord.TXT(None, comment, ttl=r.ttl)

           
            # XXX: RP, do we need it?

            if mx :
                # XXX: is this even a good idea?
                yield ZoneRecord.MX(None, 10, mx, ttl=r.ttl)

def process_zone_meta (zone, ignore=None) :
    """
        Process zone metadata -> output.
    """
    
    TIMESTAMP_FORMAT = '%Y/%m/%d'
    
    for r in zone :
        if ignore and r.name in ignore :
            # skip
            log.debug("Ignore record: %s", r)
            continue

        # for hosts..
        if r.type == 'A' :
            # timestamp?
            timestamp = r.line.timestamp

            if timestamp :
                yield ZoneRecord.TXT(r.name, timestamp.strftime(TIMESTAMP_FORMAT), ttl=r.ttl)
     
def process_zone_reverse (zone, origin, domain) :
    """
        Process zone data -> reverse zone data.
    """

    name = None

    for r in zone :
        # keep name from previous..
        if r.name :
            name = r.name

        if r.type == 'A' :
            ip, = r.data
            ptr = reverse_ipv4(ip)

        elif r.type == 'AAAA' :
            ip, = r.data
            ptr = reverse_ipv6(ip)
            
        else :
            continue

        # verify
        if zone and ptr.endswith(origin) :
            ptr = ptr[:-(len(origin) + 1)]

        else :
            log.warning("Reverse does not match zone origin, skipping: (%s) -> %s <-> %s", ip, ptr, origin)
            continue

        # domain to use
        host_domain = r.origin or domain
        host_fqdn = fqdn(name, host_domain)

        yield ZoneRecord.PTR(ptr, host_fqdn)

def write_zone_records (file, zone) :
    for r in zone :
        file.write(unicode(r))
        file.write('\n')

def main (argv) :
    options, args = parse_options(argv)
    
    # open files, default to stdout
    input_files = pvl.args.apply_files(args, 'r', options.input_charset)
   
    # process zone data
    zone = []

    for file in input_files :
        log.info("Reading zone: %s", file)

        zone += list(pvl.dns.zone.ZoneRecord.load(file, 
            line_timestamp_prefix   = options.input_line_date,
        ))

    # check?
    if options.check_hosts :
        whitelist = set(options.check_exempt)

        log.debug("checking hosts; whitelist=%r", whitelist)

        if check_zone_hosts(zone, whitelist=whitelist) :
            log.warn("Hosts check failed")
            return 2

        else :
            log.info("Hosts check OK")

    if options.serial :
        log.info("Set zone serial: %s", options.serial)

        zone = list(process_zone_serial(zone, serial=options.serial))

    # output file
    output = open_file(options.output, 'w', options.output_charset)

    if options.forward_zone :
        log.info("Write forward zone: %s", output)

        zone = list(process_zone_forwards(zone, txt=options.forward_txt, mx=options.forward_mx))

    elif options.meta_zone :
        log.info("Write metadata zone: %s", output)

        if not options.input_line_date :
            log.error("--meta-zone requires --input-line-date")
            return 1

        zone = list(process_zone_meta(zone, ignore=set(options.meta_ignore)))

    elif options.reverse_zone :
        if ':' in options.reverse_zone :
            # IPv6
            origin = reverse_ipv6(options.reverse_zone)

        else :
            # IPv4
            origin = reverse_ipv4(options.reverse_zone)

        domain = options.reverse_domain

        if not domain :
            log.error("--reverse-zone requires --reverse-domain")
            return 1

        zone = list(process_zone_reverse(zone, origin=origin, domain=domain))

    elif options.check_hosts :
        # we only did that, done
        return 0

    else :
        # pass-through
        log.info("Passing through zonefile")

    write_zone_records(output, zone)

    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv))