import collections
import ipaddress
import logging; log = logging.getLogger('pvl.hosts.host')
import pvl.dns
class HostError (Exception):
"""
An error associated with some specific Host.
"""
def __init__(self, host, error):
"""
host : Host which caused error
error : Exception or str message
"""
self.host = host
self.error = error
def __str__ (self):
return "{self.host}: {self.error}".format(self=self)
def parse_bool(value):
"""
Normalize optional boolean value.
"""
if value is None:
return None
elif value:
return True
else:
return False
def parse_ip(value, type):
if value:
return type(value)
else:
return None
def parse_list(value):
"""
Parse list of strings.
"""
if value:
return value.split()
else:
return ()
def parse_location(location, domain):
"""
Parse location@domain.
"""
if not location:
return None
if '@' in location:
location, location_domain = location.split('@', 1)
else:
location_domain = domain
return (location, location_domain)
def parse_ethernet(value):
"""
Normalize ethernet str.
"""
return ':'.join('%02x' % int(x, 16) for x in value.split(':'))
def parse_str(value):
"""
Normalize optional string value.
"""
if value is None:
# omit
return None
elif value:
return str(value)
else:
# empty value
return False
def parse_dict(value, parse, **opts):
if not value:
return { }
if isinstance(value, dict):
values = value
else:
values = {None: value}
return {instance: parse(value, **opts) for instance, value in values.iteritems()}
class Host (object) :
"""
A host is a network node that can have multiple ethernet interfaces, and multiple IP addresses in different domains.
"""
EXTENSIONS = { }
EXTENSION_FIELDS = { }
@classmethod
def parse_extensions(cls, extensions, extra):
"""
Parse extensions and extension fields to yield
(HostExtension, field, value)
"""
for extension, values in extensions.iteritems():
extension_cls = cls.EXTENSIONS.get(extension)
if not extension_cls:
log.warning("skip unknown extension: %s", extension)
continue
for field, value in values.iteritems():
yield extension_cls, field, value
for field, value in extra.iteritems():
extension_cls = cls.EXTENSION_FIELDS.get(field)
if not extension_cls:
raise ValueError("unknown field: {field}".format(field=field))
yield extension_cls, field, value
@classmethod
def build (cls, name, domain,
ip=None, ip6=None,
ethernet=None,
owner=None,
location=None,
alias=None, alias4=None, alias6=None,
forward=None, reverse=None,
down=None,
extensions={ },
**extra
) :
"""
Return a Host initialized from data attributes.
This handles all string parsing to our data types.
"""
extension_classes = { }
for extension_cls, field, value in cls.parse_extensions(extensions, extra):
extension_classes.setdefault(extension_cls, dict())[field] = value
extensions = {extension_cls.EXTENSION: extension_cls.build(**params) for extension_cls, params in extension_classes.iteritems()}
return cls(name,
domain = domain,
ip4 = parse_ip(ip, ipaddress.IPv4Address),
ip6 = parse_ip(ip6, ipaddress.IPv6Address),
ethernet = parse_dict(ethernet, parse_ethernet),
owner = owner,
location = parse_location(location, domain),
alias = parse_list(alias),
alias4 = parse_list(alias4),
alias6 = parse_list(alias6),
forward = parse_str(forward),
reverse = parse_str(reverse),
down = parse_bool(down),
extensions = extensions,
)
def __init__ (self, name, domain,
ip4=None, ip6=None,
ethernet={ },
owner=None,
location=None,
alias=(), alias4=(), alias6=(),
forward=None, reverse=None,
down=None,
extensions={},
):
"""
name - str
domain - str
ip4 - ipaddress.IPv4Address
ip6 - ipaddress.IPv6Address
ethernet - { index: ethernet }
alias - [ str ]: generate CNAMEs for given relative names
owner - str: LDAP uid
location - None or (name, domain)
alias4 - [ str ]: generate additional A records for given relative names
alias6 - [ str ]: generate additional AAAA records for given relative names
forward - None: generate forward zone A/AAAA records per ip/ip6
False: omit A/AAAA records (and any alias= CNAMEs)
str: generate forward zone CNAME to given fqdn
reverse - None: generate reverse zone PTR records per ip/ip6
False: omit PTR records for ip/ip6
str: generate IPv4 reverse zone CNAME to given fqdn, and omit IPv6 PTR
down - mark as offline for polling
"""
self.name = name
self.domain = domain
self.ip4 = ip4
self.ip6 = ip6
self.ethernet = ethernet
self.alias = alias
self.alias4 = alias4
self.alias6 = alias6
self.owner = owner
self.location = location
self.forward = forward
self.reverse = reverse
self.down = down
self.extensions = extensions
def sort_key (self):
"""
Stable sort ordering
"""
if self.ip4:
return self.ip4
else:
# sorts first
return ipaddress.IPvAddress(0)
def addresses (self):
"""
Yield (sublabel, ipaddress.IP*Address) records.
"""
if self.ip4:
yield None, self.ip4
if self.ip6:
yield None, self.ip6
for extension in self.extensions.itervalues():
for sublabel, ip in extension.addresses():
yield sublabel, ip
def fqdn (self):
"""
Return DNS FQDN for this host in its domain.
"""
if self.domain:
return pvl.dns.fqdn(self.name, self.domain)
else:
return pvl.dns.fqdn(self.name)
def __str__ (self):
return "{self.name}@{domain}".format(self=self,
domain = self.domain or '',
)
class HostExtension (object):
"""
Base class for Host.EXTENSIONS
Provides default no-op behaviours for extension hooks.
"""
EXTENSION = None
EXTENSION_FIELDS = ()
def addresses (self):
"""
Yield additional (sublabel, ipaddr) records.
"""
return ()
def register_extension (cls):
"""
Register an extension class
"""
Host.EXTENSIONS[cls.EXTENSION] = cls
for field in cls.EXTENSION_FIELDS:
Host.EXTENSION_FIELDS[field] = cls