author Tero Marttila <>
Tue, 17 Dec 2013 10:25:44 +0200
changeset 301 b41902b0b9cf
parent 288 2f2f92e4c58e
child 302 f50469a1da4d
pvl.hosts-import: support multiple --import-zone/dhcp-hosts files, change --import-zone-comments-*, import TXT as comments
#!/usr/bin/env python

    Import hosts from existing BIND or dhcpd files.

import pvl.args
import pvl.dhcp.config
import pvl.ldap.args

import ipaddr
import optparse
import collections
import re
import logging; log = logging.getLogger('pvl.hosts-import')

__version__ = '0.1'

def parse_options (argv) :
        Parse command-line arguments.

    parser = optparse.OptionParser(
            prog        = argv[0],
            usage       = '%prog: [options]',
            version     = __version__,

            # module docstring
            description = __doc__,

    # logging

    parser.add_option('-c', '--input-charset',  metavar='CHARSET',  default='utf-8', 
            help="Encoding used for input files")

    parser.add_option('--output-charset',       metavar='CHARSET',  default='utf-8', 
            help="Encoding used for output files")

    # input
    parser.add_option('--import-zone-hosts',    metavar='FILE',     action='append',
            help="Load hosts from DNS zone")

    parser.add_option('--import-dhcp-hosts',    metavar='FILE',     action='append',
            help="Load hosts from DHCP config")

    parser.add_option('--import-dhcp-boot-server',      metavar='NEXT-SERVER',
            help="Default boot_server for dpc hosts")

    parser.add_option('--import-zone-comments-owner',  action='store_const',
            dest='import_zone_comments', const='owner',
            help="Import DNS zone comment as owner comment")

    parser.add_option('--import-zone-comments-host',   action='store_const',
            dest='import_zone_comments', const='host',
            help="Import DNS zone comment as host comment")

    parser.add_option('--dump-host-comments',   action='store_true',
            help="Dump out info on imported host comments")

    # defaults
    parser.add_option('--hosts-domain',         metavar='DOMAIN',
            help="Default domain for hosts")
    parser.add_option('--zone-unused',          metavar='HOST',
            help="DNS name for unallocated hosts")

    # output
    parser.add_option('--output-hosts',         metavar='FILE',
            help="Output hosts file")

    parser.add_option('--output-prefix',        metavar='PREFIX',
            help="Select hosts by ip prefix")

    # defaults
        import_zone_hosts   = [],
        import_dhcp_hosts   = [],
    # parse
    options, args = parser.parse_args(argv[1:])

    # apply
    pvl.args.apply(options, argv[0])

    return options, args

def import_zone_hosts (options, file) :
        Yield host info from zonefile records.

    for rr in,
            # generated hosts need to imported by hand...
            expand_generate = False,
    ) :
        if options.zone_unused and == options.zone_unused :
            log.debug("%s: skip %s",, rr)

        elif rr.type in ('A', 'AAAA') :
            ip, =

            type = { 'A': 'ip', 'AAAA': 'ip6' }[rr.type]

            yield, type, ipaddr.IPAddress(ip)

            if rr.comment :
                yield, 'comment', rr.comment

            if rr.origin :
                # not a fqdn
                yield, 'domain', rr.origin.rstrip('.')

        elif rr.type == 'CNAME' :
            host, =

            yield host, 'alias',
        elif rr.type == 'TXT' :
            txt, =

            yield host, 'comment', txt

        else :
            log.warn("%s: unknown rr: %s",, rr)

def import_dhcp_host (options, host, items) :
        Yield host infos from a dhcp host ... { ... }

    hostname = None
    ethernet = []
    fixed_address = None

    boot_server = options.import_dhcp_boot_server
    boot_filename = None

    for item in items :
        item, args = item[0], item[1:]

        if item == 'hardware' :
            _ethernet, ethernet = args
            assert _ethernet == 'ethernet'
        elif item == 'fixed-address' :
            fixed_address, = args
        elif item == 'option' :
            option = args.pop(0)

            if option == 'host-name' :
                hostname, = args
            else :
                log.warn("host %s: ignore unknown option: %s", host, option)
        elif item == 'next-sever' :
            boot_server, = args
        elif item == 'filename' :
            boot_filename, = args
        else :
            log.warn("host %s: ignore unknown item: %s", host, item)

    # determine hostname
    suffix = None

    if '-' in host :
        hostname, suffix = host.rsplit('-', 1)
    else :
        hostname = host

    if fixed_address and not re.match(r'\d+\.\d+\.\d+.\d+', fixed_address) :
        hostname, domain = fixed_address.split('.', 1)

    if not (hostname or ethernet) :
        log.warn("%s: no hostname/ethernet: %s/%s", host, hostname, ethernet)
    yield hostname, 'ethernet', ethernet
    #if suffix :
    #    yield hostname, ('ethernet', suffix), ethernet

    if boot_server and boot_filename :
        yield hostname, 'boot', "{server}:{filename}".format(
                server      = boot_server,
                filename    = boot_filename,
    elif boot_filename :
        yield hostname, 'boot', "{filename}".format(filename=boot_filename)

def import_dhcp_hosts (options, blocks) :
        Process hosts from a parsed block

    for block, items, blocks in blocks :
        block, args = block[0], block[1:]

        if block == 'group' :
            for info in import_dhcp_hosts(options, blocks) :
                yield info
        elif block == 'host' :
            host, = args
  "host: %s", host)

            try :
                for info in import_dhcp_host(options, host, items) :
                    yield info
            except ValueError as error :
                log.exception("%s: invalid host: %s", host, error)
            log.warn("ignore unknown block: %s", block)

def import_dhcp_conf (options, file) :
    items, blocks = pvl.dhcp.config.DHCPConfigParser().load(file)

    for item in items :
        item, args = item[0], item[1:]

        if item == 'include' :
            include, = args
            for info in import_dhcp_conf(options, pvl.args.apply_file(include)) :
                yield info
        else :
            log.warn("ignore unknown item: %s", item)
    for info in import_dhcp_hosts(options, blocks) :
        yield info


ZONE_OWNER_MAIL = re.compile(r'(?P<owner>.*?)\s*<(?P<mail>.+?)>')

def process_zone_comment (options, hostname, comment) :
        Attempt to parse a host comment field... :D

        Yields (field, value) bits

    for regex in ZONE_COMMENTS :
        match = regex.match(comment)

        if match :
            matches = match.groupdict()

  "%s: matched comment: %s", hostname, comment)
    else :
        if options.import_zone_comments :
  "%s: default comment: %s", hostname, comment)
            matches = { options.import_zone_comments: comment }
        else :
            log.warn("%s: unknown comment: %s", hostname, comment)
    owner = matches.pop('owner', None)
    if owner :
        mail_match = ZONE_OWNER_MAIL.match(owner)

        if mail_match :
            mail_matches = mail_match.groupdict()
            owner = mail_matches['owner']
            yield 'mail', mail_matches['mail'].strip()
        yield 'owner', owner.strip()

    for field, value in matches.iteritems() :
        if value :
            yield field, value.strip()


def process_host_owner_ldap (options, host, info) :
        Yield guesses for user from LDAP.

    if info.get('mail') :
        for user in options.ldap.users.filter(
                { 'mailLocalAddress': info['mail'] },
                { 'uid': info['mail'] },
        ) :
            yield user, None

    if info.get('group') and info.get('owner') :
        groups = options.ldap.groups.filter(cn=info['group'])

        for group in groups :
            for user in options.ldap.users.filter({
                'gidNumber': group['gidNumber'],
                'cn': info['owner'],
            }) :
                yield user, group

    if info.get('owner') :
            for user in options.ldap.users.filter({
                'cn': info['owner'],
            }) :
                yield user, None

def process_host_owner (options, host, info) :
        Return (owner, comment) for host based on info, or None.

    owner = info.get('owner')

    if owner and owner.lower() in NONE_OWNERS :
        return False
    # from ldap?
    for ldap in process_host_owner_ldap(options, host, info) :
        user, group = ldap
        if not group :
            # get group from ldap
            group =
        return user['uid'], u"{group} / {user}".format(
                user    = user.getunicode('cn'),
                group   = group.getunicode('cn'),

def process_host_comments (options, host, info) :
        Process host fields from comment.

        Attempts to find owner from LDAP..

    log.debug("%s: %s", host, info)
    owner = process_host_owner(options, host, info) 

    if owner is False :
        # do not mark any owner

    elif owner :
        owner, comment = owner
       "%s: %s (%s)", host, owner, comment)
        yield 'comment-owner', comment
        yield 'owner', owner,

    elif 'group' in info or 'owner' in info :
        log.warn("%s: unknown owner: %s", host, info)
        yield 'comment-owner', "{group} / {owner}".format(
                group   = info.get('group', ''),
                owner   = info.get('owner', ''),
    if info.get('host') :
        yield 'comment-host', info['host']

def process_hosts_comments (options, import_hosts) :
        Parse out comments from host imports..

    for host, field, value in import_hosts :
        if field != 'comment':
            yield host, field, value

        fields = dict(process_zone_comment(options, host, value))
        if options.dump_host_comments :
            print u"{host:20} {comment:80} = {group:15} / {owner:20} <{mail:20}> / {hostinfo}".format(
                    host        = host,
                    comment     = value,
                    group       = fields.get('group', ''),
                    owner       = fields.get('owner', ''),
                    mail        = fields.get('mail', ''),
                    hostinfo    = fields.get('host', ''),

        for field, value in process_host_comments(options, host, fields) :
            yield host, field, value

def apply_hosts_import (options) :
        Import host infos from given files.

    for zone_file in options.import_zone_hosts:
        file = pvl.args.apply_file(zone_file, 'r', options.input_charset)
        for info in import_zone_hosts(options, file) :
            yield info
    for dhcp_file in options.import_dhcp_hosts:
        file = pvl.args.apply_file(dhcp_file, 'r', options.input_charset)
        for info in import_dhcp_conf(options, file) :
            yield info
def import_hosts (options) :
        Import hosts from dns/dhcp.

    import_hosts = apply_hosts_import(options)
    import_hosts = process_hosts_comments(options, import_hosts)
    # gather
    hosts = collections.defaultdict(lambda: collections.defaultdict(list))

    for host, field, value in import_hosts :
    return hosts.iteritems()

def check_hosts (options, hosts) :
    by_name = dict(hosts)

    for host, fields in hosts :
        if set(fields) == set(['alias']) :
            log.warn("%s: nonexistant alias target: %s", host, ' '.join(fields['alias']))

def sort_export_hosts (options, hosts) :
    if options.output_prefix :
        prefix = ipaddr.IPNetwork(options.output_prefix)
    else :
        prefix = None

    for host, fields in hosts :
        ip = fields.get('ip')
        # sort by IP
        if ip :
            sort = ip[0]
        else :
            # fake, to sort correctly
            sort = ipaddr.IPAddress(0)
        # select
        if prefix:
            if not (ip and ip in prefix) :

        yield sort, host, fields

def export_hosts (options, hosts) :
        Generate hosts config lines for given hosts.

    FMT = u"\t{field:15} = {value}"

    yield u"[{domain}]".format(domain=options.hosts_domain)

    # filter + sort
    hosts = [(host, fields) for sort, host, fields in sorted(sort_export_hosts(options, hosts))]

    for host, fields in hosts :
        for comment in fields.get('comment-host', ()):
            yield u"# {comment}".format(comment=comment)

        yield u"[[{host}]]".format(host=host)
        for domain in fields.get('domain', ()) :
            if domain != options.hosts_domain :
                yield FMT.format(field='domain', value=domain)

        for field, fmt in (
                ('ip',              FMT),
                ('ip6',             FMT),
                ('ethernet',        FMT),
                ('owner',           u"\t{field:15} = {value} # {fields[comment-owner][0]}"),
                ('alias',           FMT),
                ('boot',            FMT),
        ) :
            values = fields.get(field, ())

            if len(values) > 1 :
                for index, value in enumerate(values, 1) :
                    yield fmt.format(
                            field   = "{field}.{index}".format(field=field, index=index),
                            value   = value,
                            fields  = fields
            elif len(values) > 0 :
                value, = values
                yield fmt.format(field=field, value=value, fields=fields)
        yield ""
def apply_hosts_export (options, hosts) :
        Export hosts to file.

    file = pvl.args.apply_file(options.output_hosts, 'w', options.output_charset)

    for line in export_hosts(options, hosts) :
        print >>file, line

def main (argv) :
    options, args = parse_options(argv)

    options.ldap = pvl.ldap.args.apply(options)
    if args :
        # direct from file
        hosts = pvl.args.apply_files(args, 'r', options.input_charset)
    else :
        # import
        hosts = list(import_hosts(options))
    # verify
    check_hosts(options, hosts)

    # output
    if options.output_hosts :
        apply_hosts_export(options, hosts)

if __name__ == '__main__':