author Tero Marttila <terom@paivola.fi>
Tue, 10 Mar 2015 00:11:43 +0200
changeset 739 5149c39f3dfc
parent 707 13283078a929
permissions -rw-r--r--
pvl.hosts: improve HostExtension support enough to move boot= into pvl.hosts.dhcp
import logging; log = logging.getLogger('pvl.dhcp.config')
import shlex
import string

# simplified model of lexer chars
TOKEN_START = string.ascii_letters + string.digits + '-'

# be far more lenient when parsing

    ('host', ),
    ('fixed-address', ),
    ('next-server', ),
    ('hardware', 'ethernet'),

class DHCPConfigError(Exception):
    def __init__ (self, parser, error, line=None):
        self.parser = parser

        self.name = parser.name
        self.line = line
        self.error = error

    def __str__ (self):
        return "{self.name}:{self.line}: {self.error}".format(self=self)

def split (line) :
        Split given line-data into raw tokens.
        >>> list(split('foo'))
        >>> list(split('foo bar'))
        ['foo', 'bar']
        >>> list(split('foo;'))
        ['foo', ';']
        >>> list(split('"foo"'))
        >>> list(split('foo "asdf quux" bar'))
        ['foo', 'asdf quux', 'bar']
        >>> list(split('foo "asdf quux"'))
        ['foo', 'asdf quux']
        >>> list(split('# nevermind'))
        >>> list(split(''))
        >>> list(split('next-server foo'))
        ['next-server', 'foo']
        >>> list(split('include "foo/bar.conf";'))
        ['include', 'foo/bar.conf', ';']

    if line is None:
        raise TypeError(line)

    lexer = shlex.shlex(line, posix=True)
    lexer.commenters = '#'
    lexer.wordchars = TOKEN_EXTRA

    while True:
        item = lexer.get_token()

        if item is None:

        yield item

class Field (object):
        Pre-quoted fields for use in DHCP confs.

    def __init__(self, token):
        self.token = token

    def __str__(self):
        return self.token

class String (Field):
        A quoted string

    def __init__(self, string):
        self.string = string

    def __str__(self):
        # TODO: escape
        return '"{self.string}"'.format(self=self)

def quote (value, context=None):
        Build a single field as part of a dhcp.conf line.

        >>> print quote('foo')
        >>> print quote('foo bar')
        "foo bar"
        >>> print quote('foo/bar.conf')
        >>> print quote(5)
        >>> print quote('5')
        >>> print quote('foo.bar')
        >>> print quote('foo.bar', context=('host', ))
        >>> print quote('', context=('fixed-address', ))
        >>> print quote('00:11:22:33:44:55', context=('hardware', 'ethernet'))
        >>> print quote(Field('1:00:11:22:33:44:55'))
        >>> print quote(String('foobar'))

    if isinstance(value, Field):
        return str(value)
    elif isinstance(value, int):
        return str(value)
    elif context in UNQUOTED_CONTEXT:
        return str(value)
    elif value[0] in TOKEN_START and all(c in TOKEN for c in value):
        return str(value)
        # quoted
        return '"{value}"'.format(value=value)

class Block (object):
        A block in a dhcp conf includes parameters and sub-blocks.

    def __init__ (self, key, items=None, blocks=None, comment=None):
            key: tuple      - name of block

        self.key = key
        self.items = items or [ ]
        self.blocks = blocks or [ ]
        self.comment = comment

    def __str__ (self):
        if self.key:
            return ' '.join(self.key)
            # XXX: Item.__str__
            return '; '.join(' '.join(str(x) for x in item) for item in self.items)

    def __repr__ (self):
        return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r})".format(self=self)

class DHCPConfigParser (object):
        Simple parser for ISC dhcpd conf files.

        Supports iterative parsing as required for following a dhcpd.leases file.

        Doesn't implement the full spec, but a useful approximation.

    def load (cls, file, name=None):
            Parse an complete file, returning the top-level Block.

            >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'], name='test')
            Block(None, items=[('foo',)], blocks=[Block(('bar',), items=[('asdf', 'quux')], blocks=[])])
        if name is None:
            name = file.name

        parser = cls(name=name)

        for lineno, line in enumerate(file, 1):
                for item in parser.parse_line(line):
                    log.debug("%s", item)
            except DHCPConfigError as error:
                error.line = lineno

        if parser.token:
            raise DHCPConfError(parser, "Trailing data: {token}".format(token=parser.token), line=lineno)
        if parser.stack:
            raise DHCPConfError(parser, "Unterminated block: {stack}".format(stack=parser.stack), line=lineno)

        return parser.block 

    def __init__ (self, name=None):
        self.name = name

        # lexer state
        self.token = []
        # parser state
        self.stack = []
        # top-level block
        self.block = Block(None)

    def lex (self, line):
            Lex one line of input into basic expressions:

                open:   [...] {
                item:       [..];
                close:  [] }

            Yields (event, (...)) tokens.

            Raises DHCPConfError.

        log.debug("%s", line)

        for word in split(line):
            if word == '{':
                yield 'open', tuple(self.token)
            elif word == ';':
                yield 'item', tuple(self.token)

            elif word == '}':
                if self.token:
                    raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token))

                yield 'close', None


            self.token = [ ]
    def parse (self, tokens):
            Parse given tokens, yielding any complete Blocks.

            Note that the Blocks yielded may be within some other Block which is still incomplete.

        for token, args in tokens:
            if token == 'open':
                block = Block(args)

                log.debug("open block: %s > %s", self.block, block)
                self.block = block
            # must be within block!
            elif token == 'item' :
                log.debug("block %s item: %s", self.block, args)


            elif token == 'close' :
                log.debug("close block: %s", self.block)

                block = self.block

                if self.stack:
                    self.block = self.stack.pop()
                    raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block))

                yield block

                raise ValueError(token, args)
    def parse_line (self, line):
            Lex and parse line tokens.

        for block in self.parse(self.lex(line)):
            yield block

    def parse_lines (self, lines) :
            Trivial wrapper around parse to parse multiple lines.

        for line in lines:
            for block in self.parse_line(line) :
                yield block
def build_line (item, end, indent=0, context=None):
        Build a structured line.

        >>> print build_line(('foo', ), ';')
        >>> print build_line(('host', 'foo'), ' {')
        host foo {
        >>> print build_line(('foo', 'bar quux'), ';', indent=1)
            foo "bar quux";

    return '    '*indent + ' '.join(quote(field, context=context) for field in item) + end

def build_item (item, **opts):
        Build a single parameter line.

        >>> print build_item(('foo', ))

    return build_line(item, end=';',
        context     = item[:-1],

def build_block (block, indent=0):
        Build a complete Block, recursively, yielding output lines.

        >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], comment="Testing")): print line
        # Testing
        host foo {
            hardware ethernet 00:11:22:33:44:55;
        >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \
                    Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]) \
                ])): print line
        group {
            next-server booter;
            host foo {
                hardware ethernet 00:11:22:33:44:55;

    if block.comment:
        yield build_line((), end="# {comment}".format(comment=block.comment), indent=indent)
    if block.key:
        yield build_line(block.key, end=' {', indent=indent, context=block.key[0:1])
        indent += 1
    for item in block.items:
        yield build_item(item, indent=indent)

    for subblock in block.blocks:
        if block.items:
            yield ''

        for line in build_block(subblock, indent=indent):
            yield line
    if block.key:
        indent -= 1
        yield build_line((), end='}', indent=indent)