pvl.hosts: rename Host.ip -> Host.ip4; support instanced ip.foo = ... for foo.host A .... sub-labels
import collections
import ipaddr
import logging; log = logging.getLogger('pvl.hosts.host')
import pvl.dns
class HostError (Exception):
"""
An error associated with some specific Host.
"""
def __init__(self, host, error):
"""
host : Host which caused error
error : Exception or str message
"""
self.host = host
self.error = error
def __str__ (self):
return "{self.host}: {self.error}".format(self=self)
def parse_bool(value):
"""
Normalize optional boolean value.
"""
if value is None:
return None
elif value:
return True
else:
return False
def parse_ip(value, type):
if value:
return type(value)
else:
return None
def parse_list(value):
"""
Parse list of strings.
"""
if value:
return value.split()
else:
return ()
def parse_location(location, domain):
"""
Parse location@domain.
"""
if not location:
return None
if '@' in location:
location, location_domain = location.split('@', 1)
else:
location_domain = domain
return (location, location_domain)
def parse_ethernet(value):
"""
Normalize ethernet str.
"""
return ':'.join('%02x' % int(x, 16) for x in value.split(':'))
def parse_dhcp_boot(boot):
"""
Parse the dhcp boot=... option
>>> print parse_dhcp_boot(None)
{}
>>> print parse_dhcp_boot({'filename': '/foo'})
{'filename': '/foo'}
>>> print parse_dhcp_boot({'filename': '/foo', 'next-server': 'bar'})
{'next-server': 'bar', 'filename': '/foo'}
>>> print parse_dhcp_boot('/foo')
{'filename': '/foo'}
>>> print parse_dhcp_boot('bar:/foo')
{'next-server': 'bar', 'filename': '/foo'}
>>> print parse_dhcp_boot('bar:')
{'next-server': 'bar'}
>>> print parse_dhcp_boot('foo')
Traceback (most recent call last):
...
ValueError: invalid boot=foo
"""
# normalize to dict
if not boot:
boot = { }
elif not isinstance(boot, dict):
boot = { None: boot }
else:
boot = dict(boot)
# support either an instanced dict or a plain str or a mixed instanced-with-plain-str
boot_str = boot.pop(None, None)
if not (set(boot) <= set(('filename', 'next-server', None))):
raise ValueError("Invalid boot.*: {instances}".format(instances=' '.join(boot)))
# any boot= given overrides boot.* fields
if not boot_str:
pass
elif boot_str.startswith('/'):
boot['filename'] = boot_str
elif boot_str.endswith(':'):
boot['next-server'] = boot_str[:-1]
elif ':' in boot_str:
boot['next-server'], boot['filename'] = boot_str.split(':', 1)
else :
raise ValueError("invalid boot={boot}".format(boot=boot_str))
return boot
def parse_str(value):
"""
Normalize optional string value.
"""
if value is None:
# omit
return None
elif value:
return str(value)
else:
# empty value
return False
def parse_dict(value, parse):
if not value:
return { }
if isinstance(value, dict):
values = value
else:
values = {None: value}
return { instance: parse(value) for instance, value in values.iteritems() }
class Host (object) :
"""
A host is a network node that can have multiple ethernet interfaces, and multiple IP addresses in different domains.
"""
# the label used for alias4/6 hosts
ALIAS4_FMT = '{host}-ipv4'
ALIAS6_FMT = '{host}-ipv6'
@classmethod
def build (cls, name, domain,
ip=None, ip6=None,
ethernet=None,
owner=None,
location=None,
alias=None, alias4=None, alias6=None,
forward=None, reverse=None,
down=None,
boot=None,
extensions={ },
) :
"""
Return a Host initialized from data attributes.
This handles all string parsing to our data types.
"""
ip4 = parse_dict(ip, ipaddr.IPv4Address)
ip6 = parse_dict(ip6, ipaddr.IPv6Address)
ip = {label: (ip4.get(label), ip6.get(label)) for label in set(ip4) | set(ip6)}
return cls(name,
domain = domain,
ip4 = ip4.get(None),
ip6 = ip6.get(None),
ip = ip,
ethernet = parse_dict(ethernet, parse_ethernet),
owner = owner,
location = parse_location(location, domain),
alias = parse_list(alias),
alias4 = parse_list(alias4),
alias6 = parse_list(alias6),
forward = parse_str(forward),
reverse = parse_str(reverse),
down = parse_bool(down),
boot = parse_dhcp_boot(boot),
extensions = extensions,
)
def __init__ (self, name, domain,
ip4=None, ip6=None,
ip={},
ethernet={ },
owner=None,
location=None,
alias=(), alias4=(), alias6=(),
forward=None, reverse=None,
down=None,
boot=None,
extensions={},
):
"""
name - str
domain - str
ip4 - primary ipaddr.IPv4Address
ip6 - primary ipaddr.IPv6Address
ip - secondary { index: (ip4, ip6) } interface addresses
ethernet - { index: ethernet }
alias - [ str ]: generate CNAMEs for given relative names
owner - str: LDAP uid
location - None or (name, domain)
alias4 - [ str ]: generate additional A records for given relative names
alias6 - [ str ]: generate additional AAAA records for given relative names
forward - None: generate forward zone A/AAAA records per ip/ip6
False: omit A/AAAA records (and any alias= CNAMEs)
str: generate forward zone CNAME to given fqdn
reverse - None: generate reverse zone PTR records per ip/ip6
False: omit PTR records for ip/ip6
str: generate IPv4 reverse zone CNAME to given fqdn, and omit IPv6 PTR
down - mark as offline for polling
"""
self.name = name
self.domain = domain
self.ip4 = ip4
self.ip6 = ip6
self.ip = ip
self.ethernet = ethernet
self.alias = alias
self.alias4 = alias4
self.alias6 = alias6
self.owner = owner
self.location = location
self.boot = boot
self.forward = forward
self.reverse = reverse
self.down = down
self.extensions = extensions
def sort_key (self):
"""
Stable sort ordering
"""
if self.ip:
return self.ip
else:
# sorts first
return ipaddr.IPAddress(0)
def fqdn (self):
if self.domain:
return pvl.dns.fqdn(self.name, self.domain)
else:
return pvl.dns.fqdn(self.name)
def __str__ (self):
return "{self.name}@{domain}".format(self=self,
domain = self.domain or '',
)