"""
Generate zonefile records from hosts
"""
import logging; log = logging.getLogger('pvl.hosts.zone')
import pvl.dns
import pvl.hosts.host
class HostZoneError(pvl.hosts.host.HostError):
pass
# TODO: generate location alias CNAMEs even if host itself is outside origin?
def host_forward (host, origin):
"""
Yield ZoneRecords for hosts within the given zone origin
"""
try:
label = pvl.dns.relative(origin, host.domain, host.name)
except ValueError as error:
log.info("%s: skip: %s", host, error)
return
if host.forward:
forward = pvl.dns.fqdn(host.forward)
log.info("%s: forward: %s", host, forward)
yield pvl.dns.ZoneRecord.CNAME(label, forward)
elif host.forward is None:
# forward
for sublabel, ip in host.addresses():
if sublabel:
sublabel = pvl.dns.join(sublabel, label)
else:
sublabel = label
if ip.version == 4:
log.info("%s: ip: %s@%s A %s", host, sublabel, origin, ip)
yield pvl.dns.ZoneRecord.A(sublabel, ip)
elif ip.version == 6:
log.info("%s: ip6: %s@%s AAAA %s", host, label, origin, ip)
yield pvl.dns.ZoneRecord.AAAA(sublabel, ip)
else:
raise ValueError(ip)
else:
log.info("%s: skip forward", host)
return
if host.location:
location_alias, location_domain = host.location
try:
yield pvl.dns.ZoneRecord.CNAME(pvl.dns.relative(origin, location_domain, location_alias), label)
except ValueError as error:
raise HostZoneError(host, error)
for alias in host.alias:
yield pvl.dns.ZoneRecord.CNAME(pvl.dns.relative(origin, host.domain, alias), label)
for alias in host.alias4:
if not host.ip4:
raise HostZoneError(host, "alias4={host.alias4} without ip4=".format(host=host))
yield pvl.dns.ZoneRecord.A(pvl.dns.relative(origin, host.domain, alias), host.ip4)
for alias in host.alias6:
if not host.ip6:
raise HostZoneError(host, "alias6={host.alias6} without ip6=".format(host=host))
yield pvl.dns.ZoneRecord.AAAA(pvl.dns.relative(origin, host.domain, alias), host.ip6)
def host_reverse (host, prefix) :
"""
Yield (ipaddress.IPAddress, ZoneRecord) tuples for host within given prefix's reverse-dns zone.
"""
for sublabel, ip in host.addresses():
if ip.version != prefix.version:
continue
if ip not in prefix:
log.debug("%s: %s out of prefix: %s", host, ip, prefix)
continue
if ip.version == 4:
# reverse= is IPv4-only
reverse = host.reverse
elif prefix.version == 6:
# if reverse= is set, always omit, for lack of reverse6=
reverse = None if host.reverse is None else False
# relative label
label = pvl.dns.reverse_label(prefix, ip)
if reverse:
alias = pvl.dns.fqdn(reverse)
log.info("%s %s[%s]: CNAME %s", host, prefix, ip, alias)
yield ip, pvl.dns.zone.ZoneRecord.CNAME(label, alias)
elif reverse is None:
fqdn = host.fqdn()
if sublabel:
fqdn = pvl.dns.join(sublabel, fqdn)
log.info("%s %s[%s]: PTR %s", host, prefix, ip, fqdn)
yield ip, pvl.dns.zone.ZoneRecord.PTR(label, fqdn)
else:
log.info("%s %s[%s]: omit", host, prefix, ip)
def apply_hosts_forward (hosts, origin,
add_origin = False,
check_conflicts = False,
) :
"""
Generate DNS ZoneRecords for for hosts within the given zone origin.
Verifies that there are no overlapping name/type records, or CNAME records from aliases.
hosts: [Host] - Host's to render
origin: str - generate records relative to given zone origin
add_origin: bool - generate an explicit $ORIGIN directive
check_conflits: bool- raise HostZoneError on dupliate records for the same name/type
overlapping CNAME records will always raise
Yields ZoneRecords in Host-order
"""
if add_origin:
yield pvl.dns.ZoneDirective.build('ORIGIN', pvl.dns.fqdn(origin))
by_name = dict()
by_name_type = dict()
for host in hosts:
for rr in host_forward(host, origin) :
if (rr.name, 'CNAME') in by_name_type:
raise HostZoneError(host, u"{cname} CNAME conflict with {other}".format(cname=rr.name, other=by_name_type[rr.name, 'CNAME']))
elif rr.type == 'CNAME' and rr.name in by_name:
raise HostZoneError(host, u"{cname} CNAME conflict with {other}".format(cname=rr.name, other=by_name[rr.name]))
elif check_conflicts and (rr.name, rr.type) in by_name_type:
raise HostZoneError(host, u"{name} {type} conflict with {other}".format(type=rr.type, name=rr.name, other=by_name_type[rr.name, rr.type]))
by_name[rr.name] = host
by_name_type[rr.name, rr.type] = host
# preserve ordering
yield rr
def apply_hosts_reverse (hosts, prefix,
unknown_host = None,
unknown_domain = None,
) :
"""
Generate DNS ZoneRecords within the given prefix's reverse-dns zone for hosts.
hosts: [Host] - Host's to render PTRs for
prefix: ipaddress.IPNetwork - IPv4/IPv6 prefix to render PTRs for within associated in-addr.arpa/ip6.arpa zone
unknown_host: str - render a PTR to the given host @unknown_domain for unassigned IPs within prefix
unknown_domain: str - required if unknown_host is given
Yields ZoneRecords in IPAddress-order
"""
if unknown_host and not unknown_domain:
raise ValueError("unknown_host requires unknown_domain=")
# collect data for records
by_ip = dict()
for host in hosts:
for ip, rr in host_reverse(host, prefix) :
if ip in by_ip :
raise HostZoneError(host, "{host}: IP {ip} conflict: {other}".format(host=host, ip=ip, other=by_ip[ip]))
# do not retain order
by_ip[ip] = rr
if unknown_host :
# enumerate all of them
iter_ips = prefix.hosts()
else :
iter_ips = sorted(by_ip)
for ip in iter_ips :
if ip in by_ip :
yield by_ip[ip]
elif unknown_host:
# synthesize a record
label = pvl.dns.reverse_label(prefix, ip)
fqdn = pvl.dns.fqdn(unknown_host, unknown_domain)
log.info("%s %s[%s]: unused PTR %s", unknown_host, ip, prefix, fqdn)
yield pvl.dns.ZoneRecord.PTR(label, fqdn)
else:
continue