pvl/dhcp/config.py
author Tero Marttila <tero.marttila@aalto.fi>
Mon, 02 Mar 2015 12:59:17 +0200
changeset 685 668f934bb958
parent 682 60dbd952a15e
child 695 c60924eca185
permissions -rw-r--r--
pvl.dhcp.config: fix build_block() to handle top-level config Blocks
import logging; log = logging.getLogger('pvl.dhcp.config')
import shlex
import string

# simplified model of lexer chars
TOKEN_START = string.ascii_letters + string.digits + '-'
TOKEN = TOKEN_START + '_'

# be far more lenient when parsing
TOKEN_EXTRA = TOKEN + ':.'

UNQUOTED_CONTEXT = set((
    ('host', ),
    ('fixed-address', ),
    ('next-server', ),
    ('hardware', 'ethernet'),
))

class DHCPConfigError(Exception):
    def __init__ (self, parser, error, line=None):
        self.parser = parser

        self.name = parser.name
        self.line = line
        self.error = error

    def __str__ (self):
        return "{self.name}:{self.line}: {self.error}".format(self=self)

def split (line) :
    """
        Split given line-data into raw tokens.
        
        >>> list(split('foo'))
        ['foo']
        >>> list(split('foo bar'))
        ['foo', 'bar']
        >>> list(split('foo;'))
        ['foo', ';']
        >>> list(split('"foo"'))
        ['foo']
        >>> list(split('foo "asdf quux" bar'))
        ['foo', 'asdf quux', 'bar']
        >>> list(split('foo "asdf quux"'))
        ['foo', 'asdf quux']
        >>> list(split('# nevermind'))
        []
        >>> list(split(''))
        []
        >>> list(split('next-server foo'))
        ['next-server', 'foo']
        >>> list(split('include "foo/bar.conf";'))
        ['include', 'foo/bar.conf', ';']
    """

    if line is None:
        raise TypeError(line)

    lexer = shlex.shlex(line, posix=True)
    lexer.commenters = '#'
    lexer.wordchars = TOKEN_EXTRA

    while True:
        item = lexer.get_token()

        if item is None:
            break

        yield item

def quote (value, context=None):
    """
        Build a single field as part of a dhcp.conf line.

        >>> print quote('foo')
        foo
        >>> print quote('foo bar')
        "foo bar"
        >>> print quote('foo/bar.conf')
        "foo/bar.conf"
        >>> print quote(5)
        5
        >>> print quote('5')
        5
        >>> print quote('foo.bar')
        "foo.bar"
        >>> print quote('foo.bar', context=('host', ))
        foo.bar
        >>> print quote('192.0.2.1', context=('fixed-address', ))
        192.0.2.1
        >>> print quote('00:11:22:33:44:55', context=('hardware', 'ethernet'))
        00:11:22:33:44:55
    """

    if isinstance(value, int):
        return str(value)
    elif context in UNQUOTED_CONTEXT:
        return str(value)
    elif value[0] in TOKEN_START and all(c in TOKEN for c in value):
        return str(value)
    else:
        # quoted
        return '"{value}"'.format(value=value)

class Block (object):
    """
        A block in a dhcp conf includes parameters and sub-blocks.
    """

    def __init__ (self, key, items=None, blocks=None, comment=None):
        """
            key: tuple      - name of block
        """

        self.key = key
        self.items = items or [ ]
        self.blocks = blocks or [ ]
        self.comment = comment

    def __str__ (self):
        return ' '.join(self.key)

    def __repr__ (self):
        return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r})".format(self=self)

class DHCPConfigParser (object):
    """
        Simple parser for ISC dhcpd conf files.

        Supports iterative parsing as required for following a dhcpd.leases file.

        Doesn't implement the full spec, but a useful approximation.
    """

    @classmethod
    def load (cls, file, name=None):
        """
            Parse an complete file, returning the top-level Block.

            >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'], name='test')
            Block(None, items=[('foo',)], blocks=[Block(('bar',), items=[('asdf', 'quux')], blocks=[])])
        """
        
        if name is None:
            name = file.name

        parser = cls(name=name)

        for lineno, line in enumerate(file, 1):
            try:
                for item in parser.parse_line(line):
                    log.debug("%s", item)
            except DHCPConfigError as error:
                error.line = lineno
                raise

        if parser.token:
            raise DHCPConfError(parser, "Trailing data: {token}".format(token=parser.token), line=lineno)
        
        if parser.stack:
            raise DHCPConfError(parser, "Unterminated block: {stack}".format(stack=parser.stack), line=lineno)

        return parser.block 

    def __init__ (self, name=None):
        self.name = name

        # lexer state
        self.token = []
        
        # parser state
        self.stack = []
        
        # top-level block
        self.block = Block(None)

    def lex (self, line):
        """
            Lex one line of input into basic expressions:

                open:   [...] {
                item:       [..];
                close:  [] }

            Yields (event, (...)) tokens.

            Raises DHCPConfError.
        """

        log.debug("%s", line)

        for word in split(line):
            if word == '{':
                yield 'open', tuple(self.token)
            
            elif word == ';':
                yield 'item', tuple(self.token)

            elif word == '}':
                if self.token:
                    raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token))

                yield 'close', None

            else:
                self.token.append(word)
                continue

            self.token = [ ]
       
    def parse (self, tokens):
        """
            Parse given tokens, yielding any complete Blocks.

            Note that the Blocks yielded may be within some other Block which is still incomplete.
        """

        for token, args in tokens:
            if token == 'open':
                block = Block(args)

                log.debug("open block: %s > %s", self.block, block)
                
                self.stack.append(self.block)
                self.block.blocks.append(block)
                self.block = block
            
            # must be within block!
            elif token == 'item' :
                log.debug("block %s item: %s", self.block, args)

                self.block.items.append(args)

            elif token == 'close' :
                log.debug("close block: %s", self.block)

                block = self.block

                if self.stack:
                    self.block = self.stack.pop()
                else:
                    raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block))

                yield block

            else:
                raise ValueError(token, args)
    
    def parse_line (self, line):
        """
            Lex and parse line tokens.
        """

        for block in self.parse(self.lex(line)):
            yield block

    def parse_lines (self, lines) :
        """
            Trivial wrapper around parse to parse multiple lines.
        """

        for line in lines:
            for block in self.parse_line(line) :
                yield block
    
def build_line (item, end, indent=0, context=None):
    """
        Build a structured line.

        >>> print build_line(('foo', ), ';')
        foo;
        >>> print build_line(('host', 'foo'), ' {')
        host foo {
        >>> print build_line(('foo', 'bar quux'), ';', indent=1)
            foo "bar quux";
    """

    return '    '*indent + ' '.join(quote(field, context=context) for field in item) + end

def build_item (item, **opts):
    """
        Build a single parameter line.

        >>> print build_item(('foo', ))
        foo;
    """

    return build_line(item, end=';',
        context     = item[:-1],
        **opts
    )

def build_block (block, indent=0):
    """
        Build a complete Block, recursively, yielding output lines.

        >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], comment="Testing")): print line
        # Testing
        host foo {
            hardware ethernet 00:11:22:33:44:55;
        }
        >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \
                    Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]) \
                ])): print line
        group {
            next-server booter;
        <BLANKLINE>
            host foo {
                hardware ethernet 00:11:22:33:44:55;
            }
        }
    """

    if block.comment:
        yield build_line((), end="# {comment}".format(comment=block.comment), indent=indent)
    
    if block.key:
        yield build_line(block.key, end=' {', indent=indent, context=block.key[0:1])
        indent += 1
    
    for item in block.items:
        yield build_item(item, indent=indent)

    for subblock in block.blocks:
        if block.items:
            yield ''

        for line in build_block(subblock, indent=indent):
            yield line
    
    if block.key:
        indent -= 1
        yield build_line((), end='}', indent=indent)