#!/usr/bin/env python
"""
Process zonefiles.
"""
__version__ = '0.0.1-dev'
import optparse
import codecs
import logging
log = logging.getLogger()
# command-line options, global state
options = None
def parse_options (argv) :
"""
Parse command-line arguments.
"""
parser = optparse.OptionParser(
prog = argv[0],
usage = '%prog: [options]',
version = __version__,
# module docstring
description = __doc__,
)
# logging
general = optparse.OptionGroup(parser, "General Options")
general.add_option('-q', '--quiet', dest='loglevel', action='store_const', const=logging.ERROR, help="Less output")
general.add_option('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO, help="More output")
general.add_option('-D', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG, help="Even more output")
parser.add_option_group(general)
parser.add_option('-c', '--input-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for input files")
parser.add_option('-o', '--output', metavar='FILE', default='-',
help="Write to output file; default stdout")
parser.add_option('--output-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for output files")
parser.add_option('--forward-zone', action='store_true',
help="Generate forward zone")
parser.add_option('--forward-txt', action='store_true',
help="Generate TXT records for forward zone")
parser.add_option('--forward-mx', metavar='MX',
help="Generate MX records for forward zone")
parser.add_option('--reverse-domain', metavar='DOMAIN',
help="Domain to use for hosts in reverse zone")
parser.add_option('--reverse-zone', metavar='NET',
help="Generate forward zone for given subnet (x.z.y)")
# defaults
parser.set_defaults(
loglevel = logging.WARN,
)
# parse
options, args = parser.parse_args(argv[1:])
# configure
logging.basicConfig(
format = '%(processName)s: %(name)s: %(levelname)s %(funcName)s : %(message)s',
level = options.loglevel,
)
return options, args
def parse_record (line) :
"""
Parse (name, ttl, type, data, comment) from bind zonefile.
Returns None for empty/comment lines.
"""
# was line indented?
indent = line.startswith(' ') or line.startswith('\t')
# strip
line = line.strip()
if not line or line.startswith(';') :
# skip
return
#log.debug("line=%r", line)
# parse comment out
parts = line.split(';', 1)
if ';' in line :
data, comment = line.split(';', 1)
line = data.rstrip()
comment = comment.strip()
else :
line = line.rstrip()
comment = None
#log.debug("line=%r, comment=%r", line, comment)
# parse data out?
if '"' in line :
line, data, end = line.split('"')
parts = line.split()
else :
parts = line.split()
data = parts.pop(-1)
#log.debug("parts=%r, data=%r", parts, data)
# indented lines don't have name
if indent :
name = None
else :
name = parts.pop(0)
#log.debug("name=%r", name)
# parse ttl/cls/type
ttl = cls = None
if parts and parts[0][0].isdigit() :
ttl = parts.pop(0)
if parts and parts[0].upper() in ('IN', 'CH') :
cls = parts.pop(0)
type = parts.pop(0)
#log.debug("ttl=%r, cls=%r, parts=%r", ttl, cls, parts)
if parts :
log.debug("extra data: %r + %r", parts, data)
# extra data
data = ' '.join(parts + [data])
return name, ttl, type, data, comment
def parse_zone (file) :
"""
Parse
(name, ttl, type, data, comment)
data from zonefile.
"""
for lineno, line in enumerate(file) :
data = parse_record(line)
if data :
yield data
def process_zone_forwards (zone, txt=False, mx=False) :
"""
Process zone data -> forward zone data.
"""
for name, ttl, type, data, comment in zone :
yield name, ttl, type, data
if type == 'A' :
if txt and comment :
# name
yield None, ttl, 'TXT', u'"{0}"'.format(comment)
# XXX: RP, do we need it?
if mx :
# XXX: is this a good idea?
yield None, ttl, 'MX', '10 {mx}'.format(mx=mx)
def reverse_addr (ip) :
"""
Return in-addr.arpa reverse for given IPv4 IP.
"""
# parse
octets = tuple(int(part) for part in ip.split('.'))
for octet in octets :
assert 0 <= octet <= 255
return '.'.join([str(octet) for octet in reversed(octets)] + ['in-addr', 'arpa'])
def fqdn (*parts) :
return '.'.join(parts) + '.'
def process_zone_reverse (zone, origin, domain) :
"""
Process zone data -> reverse zone data.
"""
for name, ttl, type, data, comment in zone :
if type != 'A' :
continue
ip = data
# generate reverse-addr
reverse = reverse_addr(ip)
# verify
if zone and reverse.endswith(origin) :
reverse = reverse[:-(len(origin) + 1)]
else :
log.warning("Reverse does not match zone origin, skipping: (%s) -> %s <-> %s", ip, reverse, origin)
continue
# domain to use
host_domain = domain
host_fqdn = fqdn(name, domain)
yield reverse, 'PTR', host_fqdn
def build_zone (zone) :
for item in zone :
ttl = cls = comment = None
if len(item) == 3 :
name, type, data = item
elif len(item) == 4 :
name, ttl, type, data = item
elif len(item) == 5 :
name, ttl, type, data, comment = item
else :
raise Exception("Weird zone entry: {0}".format(item))
if not name :
name = ''
if not ttl :
ttl = ''
if not cls :
cls = ''
if comment :
comment = '\t;' + comment
else :
comment = ''
yield u"{name:25} {ttl:4} {cls:2} {type:5} {data}{comment}".format(name=name, ttl=ttl, cls=cls, type=type, data=data, comment=comment)
def write_zone (file, zone) :
for line in build_zone(zone) :
file.write(line + u'\n')
def open_file (path, mode, charset) :
"""
Open unicode-enabled file from path, with - using stdio.
"""
if path == '-' :
# use stdin/out based on mode
stream, func = {
'r': (sys.stdin, codecs.getreader),
'w': (sys.stdout, codecs.getwriter),
}[mode[0]]
# wrap
return func(charset)(stream)
else :
# open
return codecs.open(path, mode, charset)
def main (argv) :
global options
options, args = parse_options(argv)
if args :
# open files
input_files = [open_file(path, 'r', options.input_charset) for path in args]
else :
# default to stdout
input_files = [open_file('-', 'r', options.input_charset)]
# process zone data
zone = []
for file in input_files :
log.info("Reading zone: %s", file)
zone += list(parse_zone(file))
# output file
output = open_file(options.output, 'w', options.output_charset)
if options.forward_zone :
log.info("Write forward zone: %s", output)
zone = list(process_zone_forwards(zone, txt=options.forward_txt, mx=options.forward_mx))
elif options.reverse_zone :
origin = reverse_addr(options.reverse_zone)
domain = options.reverse_domain
if not domain :
log.error("--reverse-zone requires --reverse-domain")
return 1
zone = list(process_zone_reverse(zone, origin=origin, domain=domain))
else :
log.warn("Nothing to do")
write_zone(output, zone)
return 0
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))