pvl/dhcp/config.py
author Tero Marttila <terom@paivola.fi>
Sun, 01 Mar 2015 22:26:29 +0200
changeset 666 a8ddcbe894ff
parent 478 517c359a683b
child 668 794f943c835d
permissions -rw-r--r--
pvl.dhcp.config: refactor DHCPConfigParser to use shlex and yield Block objects, change build_block() to use Block; tests
import logging; log = logging.getLogger('pvl.dhcp.config')
import shlex

class DHCPConfigError(Exception):
    def __init__ (self, parser, error, line=None):
        self.parser = parser

        self.name = parser.name
        self.line = line
        self.error = error

    def __str__ (self):
        return "{self.name}:{self.line}: {self.error}".format(self=self)

def split (line) :
    """
        Split given line-data into raw tokens.
        
        >>> list(split('foo'))
        ['foo']
        >>> list(split('foo bar'))
        ['foo', 'bar']
        >>> list(split('foo;'))
        ['foo', ';']
        >>> list(split('"foo"'))
        ['foo']
        >>> list(split('foo "asdf quux" bar'))
        ['foo', 'asdf quux', 'bar']
        >>> list(split('foo "asdf quux"'))
        ['foo', 'asdf quux']
        >>> list(split('# nevermind'))
        []
        >>> list(split(''))
        []
    """

    if line is None:
        raise TypeError(line)

    lexer = shlex.shlex(line, posix=True)
    lexer.commenters = '#'
    lexer.wordchars += '-./'

    while True:
        item = lexer.get_token()

        if item is None:
            break

        yield item

class Block (object):
    """
        A block in a dhcp conf includes parameters and sub-blocks.
    """

    def __init__ (self, key, items=None, blocks=None):
        """
            key: tuple      - name of block
        """

        self.key = key
        self.items = items or [ ]
        self.blocks = blocks or [ ]

    def __str__ (self):
        return ' '.join(self.key)

    def __repr__ (self):
        return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r}".format(self=self)

class DHCPConfigParser (object):
    """
        Simple parser for ISC dhcpd conf files.

        Supports iterative parsing as required for following a dhcpd.leases file.

        Doesn't implement the full spec, but a useful approximation.
    """

    @classmethod
    def load (cls, file, name=None):
        """
            Parse an complete file, returning the top-level Block.

            >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'])
            Block(None, items=[('foo', )], blocks=[Block(('bar', ), items=[('asdf', 'quux')], blocks=[])])
        """
        
        if name is None:
            name = file.name

        parser = cls(name=name)

        for lineno, line in enumerate(file, 1):
            try:
                for item in parser.parse_line(line):
                    log.debug("%s", item)
            except DHCPConfigError as error:
                error.line = lineno
                raise

        if parser.token:
            raise DHCPConfError(parser, "Trailing data: {token}".format(token=token), line=lineno)

        return parser.block 

    def __init__ (self, name=None):
        self.name = name

        # lexer state
        self.token = []
        
        # parser state
        self.stack = []
        
        # top-level block
        self.block = Block(None)

    def lex (self, line):
        """
            Lex one line of input into basic expressions:

                open:   [...] {
                item:       [..];
                close:  [] }

            Yields (event, (...)) tokens.

            Raises DHCPConfError.
        """

        log.debug("%s", line)

        for word in split(line):
            if word == '{':
                yield 'open', tuple(self.token)
            
            elif word == ';':
                yield 'item', tuple(self.token)

            elif word == '}':
                if self.token:
                    raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token))

                yield 'close', None

            else:
                self.token.append(word)
                continue

            self.token = [ ]
       
    def parse (self, tokens):
        """
            Parse given tokens, yielding any complete Blocks.

            Note that the Blocks yielded may be within some other Block which is still incomplete.
        """

        for token, args in tokens:
            if token == 'open':
                block = Block(args)

                log.debug("open block: %s > %s", self.block, block)
                
                self.block.blocks.append(block)
                self.block = block
                self.stack.append(block)
            
            # must be within block!
            elif token == 'item' :
                log.debug("block %s item: %s", self.block, args)

                self.block.items.append(args)

            elif token == 'close' :
                log.debug("close block: %s", self.block)

                block = self.block

                if self.stack:
                    self.block = self.stack.pop()
                else:
                    raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block))

                yield block

            else:
                raise ValueError(token, args)
    
    def parse_line (self, line):
        """
            Lex and parse line tokens.
        """

        for block in self.parse(self.lex(line)):
            yield block

    def parse_lines (self, lines) :
        """
            Trivial wrapper around parse to parse multiple lines.
        """

        for line in lines:
            for block in self.parse_line(line) :
                yield block
    
def build_field (value):
    """
        Build a single field as part of a dhcp.conf line.

        >>> print build_field('foo')
        foo
        >>> print build_field('foo bar')
        "foo bar"
    """

    if any(c.isspace() for c in value):
        # quoted
        return '"{value}"'.format(value=value)
    else:
        return value

def build_line (item, end, indent=0):
    """
        Build a structured line.

        >>> print build_line(['foo'], ';')
        foo;
        >>> print build_line(['host', 'foo'], ' {')
        host foo {
        >>> print build_line(['foo', 'bar quux'], ';', indent=1)
            foo "bar quux";
    """

    return '    '*indent + ' '.join(build_field(field) for field in item) + end

def build_item (item, **opts):
    """
        Build a single parameter line.

        >>> print build_item(['foo'])
        foo;
    """

    return build_line(item, end=';', **opts)

def build_block (block, indent=0, comment=None):
    """
        Build a complete Block, recursively, yielding output lines.

        >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]), comment="Testing"): print line
        # Testing
        host foo {
            hardware ethernet 00:11:22:33:44:55;
        }
        >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \
                    (('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], ()) \
                ])): print line
        group {
            next-server booter;
        <BLANKLINE>
            host foo {
                hardware ethernet 00:11:22:33:44:55;
            }
        }
    """

    if comment:
        yield build_line((), end="# {comment}".format(comment=comment), indent=indent)

    yield build_line(block.key, end=' {', indent=indent)
    
    for item in block.items:
        yield build_item(item, indent=indent+1)

    for subblock in block.blocks:
        yield ''

        for line in build_block(subblock, indent=indent+1):
            yield line

    yield build_line((), end='}', indent=indent)