#!/usr/bin/env python
"""
Import hosts from existing BIND or dhcpd files.
"""
import pvl.args
import pvl.dns.zone
import pvl.dhcp.config
import pvl.hosts
import pvl.ldap.args
import collections
import ipaddr
import logging; log = logging.getLogger('pvl.hosts-import')
import optparse
import os.path
import re
__version__ = '0.1'
def parse_options (argv) :
"""
Parse command-line arguments.
"""
parser = optparse.OptionParser(
prog = argv[0],
usage = '%prog: [options]',
version = __version__,
# module docstring
description = __doc__,
)
# logging
parser.add_option_group(pvl.args.parser(parser))
parser.add_option_group(pvl.ldap.args.parser(parser))
parser.add_option('--input-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for input files")
parser.add_option('--output-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for output files")
# input
parser.add_option('--import-zone-hosts', metavar='FILE', action='append',
help="Load hosts from DNS zone")
parser.add_option('--import-zone-origin', metavar='ORIGIN',
help="Initial origin for given zone file; default is basename")
parser.add_option('--import-zone-comments-owner', action='store_const',
dest='import_zone_comments', const='owner',
help="Import DNS zone comment as owner comment")
parser.add_option('--import-zone-comments-host', action='store_const',
dest='import_zone_comments', const='host',
help="Import DNS zone comment as host comment")
parser.add_option('--import-dhcp-hosts', metavar='FILE', action='append',
help="Load hosts from DHCP config")
parser.add_option('--import-dhcp-boot-server', metavar='NEXT-SERVER',
help="Default boot_server for dpc hosts")
parser.add_option('--dump-host-comments', action='store_true',
help="Dump out info on imported host comments")
# defaults
parser.add_option('--zone-unused', metavar='HOST',
help="DNS name for unallocated hosts")
# output
parser.add_option('--output-hosts', metavar='FILE', default='-',
help="Output hosts file")
parser.add_option('--output-prefix', metavar='PREFIX',
help="Select hosts by ip prefix")
parser.add_option('--output-domain', metavar='DOMAIN',
help="Select hosts by domain")
parser.add_option('--output-others', action='store_true',
help="Negate selection")
# defaults
parser.set_defaults(
import_zone_hosts = [],
import_dhcp_hosts = [],
)
# parse
options, args = parser.parse_args(argv[1:])
# apply
pvl.args.apply(options, argv[0])
return options, args
def import_zone_host_name (options, name, origin) :
"""
Import zone name from rr
"""
if '.' in name :
host, domain = name.split('.', 1)
domain = pvl.dns.join(domain, origin)
else :
host = name
domain = origin
if domain :
# not a fqdn
domain = domain.rstrip('.')
log.info("%s: %s@%s", name, host, domain)
else :
log.warn("%s: no domain", name)
return host, domain
def import_zone_hosts (options, file) :
"""
Yield host info from zonefile records.
"""
origin = options.import_zone_origin or os.path.basename(file.name)
for line, rr in pvl.dns.zone.ZoneLine.load(file,
# used to determine domain
origin = origin,
# lazy-import generated hosts on demand
expand_generate = True,
) :
if rr :
pass
elif line.parts[0] == '$ORIGIN' :
# handled by ZoneLine.load
continue
else :
log.warn("%s: skip non-rr line: %s", line, line.line)
continue
host, domain = import_zone_host_name(options, rr.name, rr.origin)
if rr.type in ('A', 'AAAA') :
ip, = rr.data
ip = ipaddr.IPAddress(ip)
type = { 'A': 'ip', 'AAAA': 'ip6' }[rr.type]
if options.zone_unused and rr.name == options.zone_unused :
yield (str(ip), domain), 'ip.unused', ip
else :
yield (host, domain), type, ip,
if rr.comment :
yield (host, domain), 'comment', rr.comment
if line.parts[0] == '$GENERATE' :
# only import as host if used for other purposes as well
yield (host, domain), 'lazy-import', True
elif rr.type == 'CNAME' :
alias, = rr.data
alias_host, alias_domain = import_zone_host_name(options, alias, rr.origin)
if domain == alias_domain :
yield (alias_host, alias_domain), 'alias', host
elif domain.endswith('.' + alias_domain) :
yield (alias_host, alias_domain), 'alias', pvl.dns.join(host, domain[:(len(domain) - len(alias_domain) - 1)])
else :
log.warn("%s@%s: alias outside of target domain: %s", host, domain, alias_domain)
elif rr.type == 'TXT' :
txt, = rr.data
yield (host, domain), 'comment', txt
else :
log.warn("%s: unknown rr: %s", host, rr)
def import_dhcp_host (options, dhcp_host, items) :
"""
Yield host infos from a dhcp host ... { ... }
"""
host_name = None
ethernet = []
fixed_address = None
boot_server = options.import_dhcp_boot_server
boot_filename = None
for item in items :
item, args = item[0], item[1:]
if item == 'hardware' :
_ethernet, ethernet = args
assert _ethernet == 'ethernet'
elif item == 'fixed-address' :
fixed_address, = args
elif item == 'option' :
option = args.pop(0)
if option == 'host-name' :
host_name, = args
else :
log.warn("host %s: ignore unknown option: %s", dhcp_host, option)
elif item == 'next-sever' :
boot_server, = args
elif item == 'filename' :
boot_filename, = args
else :
log.warn("host %s: ignore unknown item: %s", dhcp_host, item)
# determine host
host = None
domain = None
suffix = None
if not fixed_address :
log.warn("%s: fixed-address is missing, unable to determine hostname/domainname", dhcp_host)
elif re.match(r'\d+\.\d+\.\d+.\d+', fixed_address) :
log.warn("%s: fixed-address is an IP, unable to determine hostname/domainname", dhcp_host)
else :
host, domain = fixed_address.split('.', 1)
# XXX: not actually true... eh
if host and dhcp_host.lower() == host.lower() :
# do not split suffix from host
pass
elif host and '-' in dhcp_host :
dhcp_host, suffix = dhcp_host.rsplit('-', 1)
elif '-' in dhcp_host :
host, suffix = dhcp_host.rsplit('-', 1)
else :
host = dhcp_host
if not (host or ethernet) :
log.warn("%s: no hostname/ethernet: %s/%s", dhcp_host, hostname, ethernet)
elif suffix :
log.info("%s: %s@%s: %s: %s", dhcp_host, host, domain, suffix, ethernet)
yield (host, domain), 'ethernet.{suffix}'.format(suffix=suffix), ethernet
else :
log.info("%s: %s@%s: %s", dhcp_host, host, domain, ethernet)
yield (host, domain), 'ethernet', ethernet
if boot_server and boot_filename :
yield (host, domain), 'boot', "{server}:{filename}".format(
server = boot_server,
filename = boot_filename,
)
elif boot_filename :
yield (host, domain), 'boot', "{filename}".format(filename=boot_filename)
def import_dhcp_hosts (options, file_name, blocks) :
"""
Process hosts from a parsed block
"""
for block, items, blocks in blocks :
block, args = block[0], block[1:]
if block == 'group' :
log.info("%s: group", file_name)
for info in import_dhcp_hosts(options, file_name, blocks) :
yield info
elif block == 'host' :
host, = args
log.info("%s: host: %s", file_name, host)
try :
for info in import_dhcp_host(options, host, items) :
yield info
except ValueError as error :
log.exception("%s: invalid host %s: %s", file_name, host, error)
else:
log.warn("%s: ignore unknown block: %s", file_name, block)
def import_dhcp_conf (options, file) :
items, blocks = pvl.dhcp.config.DHCPConfigParser().load(file)
for item in items :
item, args = item[0], item[1:]
if item == 'include' :
include, = args
for info in import_dhcp_conf(options, pvl.args.apply_file(include)) :
yield info
else :
log.warn("ignore unknown item: %s", item)
for info in import_dhcp_hosts(options, file.name, blocks) :
yield info
ZONE_COMMENTS = (
re.compile(r'(?P<owner>[^/]+)\s*-\s+(?P<host>.+)'),
re.compile(r'(?P<group>.+?)\s*/\s*(?P<owner>.+)\s+[/-]\s+(?P<host>.+)'),
re.compile(r'(?P<group>.+?)\s*/\s*(?P<owner>.+)\s+[(]\s*(?P<host>.+)[)]'),
re.compile(r'(?P<group>.+?)\s*/\s*(?P<owner>.+)'),
)
ZONE_OWNER_MAIL = re.compile(r'(?P<owner>.*?)\s*<(?P<mail>.+?)>')
def process_zone_comment (options, hostname, comment) :
"""
Attempt to parse a host comment field... :D
Yields (field, value) bits
"""
for regex in ZONE_COMMENTS :
match = regex.match(comment)
if match :
matches = match.groupdict()
log.info("%s: matched comment: %s", hostname, comment)
break
else :
if options.import_zone_comments :
log.info("%s: default comment: %s", hostname, comment)
matches = { options.import_zone_comments: comment }
else :
log.warn("%s: unknown comment: %s", hostname, comment)
return
owner = matches.pop('owner', None)
if owner :
mail_match = ZONE_OWNER_MAIL.match(owner)
if mail_match :
mail_matches = mail_match.groupdict()
owner = mail_matches['owner']
yield 'mail', mail_matches['mail'].strip()
yield 'owner', owner.strip()
for field, value in matches.iteritems() :
if value :
yield field, value.strip()
NONE_OWNERS = set((
u'tech',
u'atk',
u'toimisto',
))
def process_host_owner_ldap (options, host, info) :
"""
Yield guesses for user from LDAP.
"""
if info.get('mail') :
for user in options.ldap.users.filter(
{ 'mailLocalAddress': info['mail'] },
{ 'uid': info['mail'] },
) :
yield user, None
if info.get('group') and info.get('owner') :
groups = options.ldap.groups.filter(cn=info['group'])
for group in groups :
for user in options.ldap.users.filter({
'gidNumber': group['gidNumber'],
'cn': info['owner'],
}) :
yield user, group
if info.get('owner') :
for user in options.ldap.users.filter({
'cn': info['owner'],
}) :
yield user, None
def process_host_owner (options, host, info) :
"""
Return (owner, comment) for host based on info, or None.
"""
owner = info.get('owner')
if owner and owner.lower() in NONE_OWNERS :
return False
# from ldap?
for ldap in process_host_owner_ldap(options, host, info) :
user, group = ldap
if not group :
# get group from ldap
group = options.ldap.users.group(user)
return user['uid'], u"{group} / {user}".format(
user = user.getunicode('cn'),
group = group.getunicode('cn'),
)
def process_host_comments (options, host, info) :
"""
Process host fields from comment.
Attempts to find owner from LDAP..
"""
log.debug("%s: %s", host, info)
owner = process_host_owner(options, host, info)
if owner is False :
# do not mark any owner
pass
elif owner :
owner, comment = owner
log.info("%s: %s (%s)", host, owner, comment)
yield 'comment.owner', comment
yield 'owner', owner,
elif 'group' in info or 'owner' in info :
log.warn("%s: unknown owner: %s", host, info)
yield 'comment.owner', "{group} / {owner}".format(
group = info.get('group', ''),
owner = info.get('owner', ''),
)
if info.get('host') :
yield 'comment.host', info['host']
def process_hosts_comments (options, import_hosts) :
"""
Parse out comments from host imports..
"""
for host, field, value in import_hosts :
if field != 'comment':
yield host, field, value
continue
fields = dict(process_zone_comment(options, host, value))
if options.dump_host_comments :
print u"{host:20} {comment:80} = {group:15} / {owner:20} <{mail:20}> / {hostinfo}".format(
host = host,
comment = value,
group = fields.get('group', ''),
owner = fields.get('owner', ''),
mail = fields.get('mail', ''),
hostinfo = fields.get('host', ''),
).encode('utf-8')
for field, value in process_host_comments(options, host, fields) :
yield host, field, value
def import_hosts_files (options, zone_files, dhcp_files) :
"""
Import host infos from given files.
"""
for zone_file in zone_files:
file = pvl.args.apply_file(zone_file, 'r', options.input_charset)
for info in import_zone_hosts(options, file) :
yield info
for dhcp_file in dhcp_files :
file = pvl.args.apply_file(dhcp_file, 'r', options.input_charset)
for info in import_dhcp_conf(options, file) :
yield info
def process_import_hosts (options, import_hosts) :
"""
Build hosts from imported fields.
Yields (domain, host), { (field, ...): value }
"""
# gather
hosts = collections.defaultdict(lambda: collections.defaultdict(list))
for (host, domain), field, value in import_hosts :
hosts[domain, host][tuple(field.split('.'))].append(value)
# process
for (domain, host), fields in hosts.iteritems() :
SINGLE_FIELDS = (
'ip',
'ip.unused',
'ip6',
'comment.owner',
'owner',
'boot',
)
MULTI_FIELDS = (
'comment.host',
'ethernet',
'alias',
)
host_fields = {}
for field_name in SINGLE_FIELDS :
field = tuple(field_name.split('.'))
values = fields.get(field)
if not values :
continue
elif len(values) == 1 :
value, = values
else :
log.error("%s@%s: multiple %s: %s", host, domain, field, values)
value = values[0]
log.debug("%s@%s: %s: %s", host, domain, field, value)
host_fields[field] = value
for field_name in MULTI_FIELDS :
field_prefix = tuple(field_name.split('.'))
# find labled fields by prefix, or unlabled multi-fields
for field, values in fields.iteritems() :
pre, field_index = field[:-1], field[-1]
if not values :
pass
elif pre == field_prefix :
log.debug("%s@%s: %s.%s: %s", host, domain, field_prefix, field_index, value)
host_fields[field] = values
elif field == field_prefix :
log.debug("%s@%s: %s.*: %s", host, domain, field_prefix, value)
host_fields[field_prefix] = values
lazy_import = fields.get(tuple('lazy-import'.split('.')))
if not lazy_import :
pass
elif set(host_fields) == set([('ip', )]) :
log.info("%s: omit lazy-import with fields: %s", host, ' '.join('.'.join(field) for field in host_fields))
continue
else :
log.info("%s: import lazy-import with fields: %s", host, ' '.join('.'.join(field) for field in host_fields))
yield (host, domain), host_fields
def apply_import_hosts (options) :
"""
Import hosts.
"""
import_hosts = import_hosts_files(options, options.import_zone_hosts, options.import_dhcp_hosts)
# process
import_hosts = process_hosts_comments(options, import_hosts)
# gather
return process_import_hosts(options, import_hosts)
def process_hosts (options, hosts) :
"""
Sanity-check and post-process hosts.
Does alias4 mapping, nonexistant alias checks, duplicate ip checks..
"""
by_name = dict(hosts)
by_ip = dict()
# scan for alias4
for (host, domain), fields in by_name.items() :
for fmt, ip_field, alias_field in (
(pvl.hosts.Host.ALIAS4_FMT, 'ip', 'alias4'),
(pvl.hosts.Host.ALIAS6_FMT, 'ip6', 'alias6'),
) :
alias = fmt.format(host=host)
alias_fields = by_name.get((alias, domain))
if not alias_fields :
continue
elif alias_fields[(ip_field, )] != fields[(ip_field, )] :
log.warn("%s: %s %s collision with %s", host, alias_field, ip_field, alias)
elif ('alias', ) in alias_fields :
log.warn("%s: mapped to %s on %s", alias, alias_field, host)
fields[(alias_field, )] = alias_fields.get(('alias', ), ())
del by_name[(alias, domain)]
else :
log.warn("%s: %s mapped to %s, but no aliases", alias, alias_field, host)
del by_name[(alias, domain)]
# scan by alias
by_alias = { }
for (host, domain), fields in hosts :
for alias in fields.get(('alias', ), ()) :
if (alias, domain) in by_alias :
log.warn("%s: duplicate alias %s: %s", host, alias, by_alias[(alias, domain)])
else :
by_alias[(alias, domain)] = (host, fields)
for (host, domain), fields in hosts :
fields = by_name.get((host, domain))
if not fields :
# skip
continue
if set(fields) == set([('alias', )]) :
aliases = fields[('alias', )]
if (host, domain) in by_alias :
alias_host, alias_fields = by_alias[(host, domain)]
log.info("%s: chain as alias to %s: %s", host, alias_host, ' '.join(aliases))
alias_fields[('alias', )].extend(aliases)
continue
else :
log.warn("%s@%s: nonexistant alias target for: %s", host, domain, ' '.join(aliases))
ip = fields.get(('ip', ))
if ip in by_ip :
log.warn("%s: duplicate ip %s: %s", host, ip, by_ip[ip])
elif ip :
by_ip[ip] = host
else :
log.warn("%s: no ip", host)
yield (host, domain), fields
def sort_export_hosts (options, hosts) :
"""
Generate a sortable version of hosts, yielding (sort, host, fields).
"""
if options.output_prefix :
prefix = ipaddr.IPNetwork(options.output_prefix)
else :
prefix = None
if options.output_domain :
select_domain = options.output_domain
else :
select_domain = None
for (host, domain), fields in hosts :
ip = fields.get(('ip', )) or fields.get(('ip', 'unused'))
log.debug("%s@%s: ip=%s", host, domain, ip)
# sort by IP
if ip :
sort = ip
else :
# fake, to sort correctly
sort = ipaddr.IPAddress(0)
# select
match = True
if prefix:
if not (ip and ip in prefix) :
match = False
if select_domain :
if not (domain and domain == select_domain) :
match = False
if match and options.output_others :
pass
elif not match and not options.output_others :
pass
else :
yield (domain, sort), (host, domain), fields
def export_hosts (options, hosts) :
"""
Generate hosts config lines for given hosts.
"""
# filter + sort
hosts = [(host, fields) for sort, host, fields in sorted(sort_export_hosts(options, hosts))]
if options.output_domain :
# global
output_domain = False
else :
output_domain = None
for (host, domain), fields in hosts :
if output_domain is False :
pass
elif domain != output_domain :
yield u"[{domain}]".format(domain=domain)
output_domain = domain
# special handling for "unused" hosts
if ('ip', 'unused') in fields :
yield u"{indent}# {unused} {ip}".format(
indent = '\t' if output_domain else '',
unused = options.zone_unused,
ip = fields[('ip', 'unused')],
)
yield u""
continue
# optional host-comments
for comment in fields.get(('comment', 'host'), ()):
yield u"{indent}# {comment}".format(
indent = '\t' if output_domain else '',
comment = comment,
)
if output_domain :
yield u"\t[[{host}]]".format(host=host)
else :
yield u"[{host}]".format(host=host)
#if not options.output_domain and domain :
# yield u"\t{field:15} = {domain}".format(field='domain', domain=domain)
for field_name in (
'ip',
'ip6',
'ethernet',
'owner',
'alias',
'alias4',
'alias6',
'boot',
) :
for field, value in fields.iteritems() :
if field[0] == field_name :
# optional field-comment
comment = fields.get(('comment', field_name), None)
if isinstance(value, list) :
value = ' '.join(value)
yield u"{indent}{field:15} = {value} {comment}".format(
indent = '\t\t' if output_domain else '\t',
field = '.'.join(str(label) for label in field),
value = value,
comment = u"# {comment}".format(comment=comment) if comment else '',
).rstrip()
yield ""
def apply_hosts_export (options, hosts) :
"""
Export hosts to file.
"""
file = pvl.args.apply_file(options.output_hosts, 'w', options.output_charset)
for line in export_hosts(options, hosts) :
print >>file, line
def main (argv) :
options, args = parse_options(argv)
options.ldap = pvl.ldap.args.apply(options)
# import
hosts = list(apply_import_hosts(options))
# verify
hosts = process_hosts(options, hosts)
# output
if options.output_hosts :
apply_hosts_export(options, hosts)
if __name__ == '__main__':
pvl.args.main(main)