pvl.hosts: refactor as a package; cleanup pvl.hosts.config with some basic tests
"""
Load Hosts from config files.
"""
import configobj
import logging; log = logging.getLogger('pvl.hosts')
import optparse
import os.path
import pvl.args
import sys
from pvl.hosts.host import Host
def optparser (parser):
hosts = optparse.OptionGroup(parser, "Hosts config files")
hosts.add_option('--hosts-charset', metavar='CHARSET', default='utf-8',
help="Encoding used for host files")
hosts.add_option('--hosts-domain', metavar='DOMAIN',
help="Default domain for hosts. Default uses config file basename")
hosts.add_option('--hosts-include', metavar='PATH',
help="Optional path for hosts includes, in addition to the host config dir")
return hosts
class HostConfigError (Exception):
"""
Generic error for file path.
"""
def __init__ (self, config, error):
self.config = config
self.error = error
def __str__ (self):
return "{self.config}: {self.error}".format(self=self)
class HostConfigObjError (Exception):
"""
An error from ConfigObj for a config file path.
"""
def __init__ (self, config, error):
self.config = config
self.error = error
self.line_contents = error.line
def __str__ (self):
return "{self.config}:{self.error.line_number}: {self.error.message}".format(self=self)
def apply_host_expand (options, range, name, domain, **params):
"""
Expand a templated host item.
"""
name = pvl.dns.zone.parse_generate_field(name)
# expand all fields
params = {param: pvl.dns.zone.parse_generate_field(value) for param, value in params.iteritems()}
for i in range:
yield Host.build(host(i), domain,
**{param: value(i) for param, value in params.iteritems()}
)
def parse_expand(name):
"""
Parse a name containing an optional expansion part.
name: hostname containing optional "{...}" part
Returns (name, range or None).
"""
if '{' in name:
pre, name = name.split('{', 1)
range, post = name.rsplit('}', 1)
range = pvl.dns.zone.parse_generate_range(range)
name = pre + "$" + post
else:
range = None
return name, range
def apply_host_config (options, path, parent, name,
domain=None,
**params
):
"""
Yield Hosts from a given config section.
options global options
path config file for errors
parent parent filename/section containing this host item, or None.
used for domain
name name of the section (or file) containing this host item.
used for hostname
**params host parameters
"""
if domain:
log.debug("%s: using explicit domain: %s", name, domain)
elif '.' in name:
log.debug("%s: using as fqdn without domain", name)
domain = None
elif parent:
log.debug("%s: default domain to section: %s", name, parent)
domain = parent
elif options.hosts_domain:
log.debug("%s: default domain to --hosts-domain: %s", name, options.hosts_domain)
domain = options.hosts_domain
else:
raise HostsConfigError(path, "{name}: no domain given".format(name=name))
# expand
name, range = parse_expand(name)
if range:
for host in apply_host_expand(options, range, name, domain, **params):
yield host
else:
yield Host.build(name, domain, **params)
def apply_host_includes (options, config_path, include):
"""
Yield files from a given config's include=... value
"""
include_paths = [os.path.dirname(config_path)]
if options.hosts_include:
include_paths.append[options.hosts_include]
for include in include.split():
for include_path in include_paths:
path = os.path.join(include_path, include)
if os.path.exists(path):
break
else:
raise HostConfigError(config_path, "Unable to find include {include} in include path: {include_path}".format(
include=include,
include_path=' '.join(include_paths),
))
if include.endswith('/'):
for name in os.listdir(path):
file_path = os.path.join(path, name)
if name.startswith('.'):
pass
elif not os.path.exists(file_path):
log.warn("%s: skip nonexistant include: %s", config_path, file_path)
else:
log.info("%s: include: %s", config_path, file_path)
yield open(file_path)
else:
log.info("%s: include: %s", config_path, path)
yield open(path)
def apply_hosts_config (options, path, name, config, defaults={}, parent=None):
"""
Load hosts from a configobj.Section (which can be the top-level ConfigObj).
options global options
path filesystem path of file (for errors)
name name of section/file
config configobj.Section
defaults hierarchial section defaults
parent optional parent section for included files
"""
# items in this section
section = dict(defaults)
for scalar in config.scalars:
section[scalar] = config[scalar]
if config.sections:
# this is a top-level section that includes hosts
# process includes?
includes = section.pop('include', '')
for file in apply_host_includes(options, path, includes):
# within our domain context
for host in apply_hosts_file(options, file, parent=name):
yield host
# recurse until we hit a scalar-only section representing a host
for section_name in config.sections:
log.debug("%s: %s: %s", path, name, section_name)
for host in apply_hosts_config(options, path, section_name, config[section_name], defaults=section, parent=name):
yield host
elif parent:
# this is a host section
log.debug("%s: %s@%s", path, name, parent)
for host in apply_host_config(options, path, parent, name, **section):
yield host
else:
raise HostConfigError(path, "No sections in config")
def apply_hosts_file (options, file, parent=None):
"""
Load Hosts from a file path.
path:str filesystem path
parent included from given name
Raises
HostConfigError
HostConfigObjError
"""
# use file basename as default domain
path = file.name
name = os.path.basename(path)
try:
config = configobj.ConfigObj(file,
raise_errors = True, # raise ConfigPObjError immediately
interpolation = 'Template',
encoding = options.hosts_charset,
)
except configobj.ConfigObjError as ex:
raise HostConfigObjError(path, ex)
return apply_hosts_config(options, path, name, config, parent=parent)
def apply_hosts (options, files):
"""
Load Hosts from files.
files:[str] list of filesystem paths
"""
for path in files:
try:
file = open(path)
except IOError as ex:
raise HostConfigError(path, ex.strerror)
for host in apply_hosts_file(options, file):
yield host
def apply (options, args):
"""
Load Hosts from arguments.
"""
try:
# load hosts from configs
hosts = list(apply_hosts(options, args))
except HostConfigObjError as error:
log.error("%s", error)
log.error("\t%s", error.line_contents)
sys.exit(2)
except HostConfigError as error:
log.error("%s", error)
sys.exit(2)
# stable ordering
hosts = sorted(hosts, key=Host.sort_key)
return hosts