pvl.dhcp.config: refactor DHCPConfigParser to use shlex and yield Block objects, change build_block() to use Block; tests
authorTero Marttila <terom@paivola.fi>
Sun, 01 Mar 2015 22:26:29 +0200
changeset 666 a8ddcbe894ff
parent 665 f0a516b2d3d3
child 667 2d5750797b8a
pvl.dhcp.config: refactor DHCPConfigParser to use shlex and yield Block objects, change build_block() to use Block; tests
pvl/dhcp/config.py
pvl/dhcp/tests.py
--- a/pvl/dhcp/config.py	Sun Mar 01 20:25:52 2015 +0200
+++ b/pvl/dhcp/config.py	Sun Mar 01 22:26:29 2015 +0200
@@ -1,225 +1,211 @@
-"""
-    Simple parser for ISC dhcpd config files.
-"""
+import logging; log = logging.getLogger('pvl.dhcp.config')
+import shlex
 
-import logging; log = logging.getLogger('pvl.dhcp.config')
+class DHCPConfigError(Exception):
+    def __init__ (self, parser, error, line=None):
+        self.parser = parser
 
-class DHCPConfigParser (object) :
+        self.name = parser.name
+        self.line = line
+        self.error = error
+
+    def __str__ (self):
+        return "{self.name}:{self.line}: {self.error}".format(self=self)
+
+def split (line) :
     """
-        Simplistic parser for a dhcpd.leases file.
+        Split given line-data into raw tokens.
+        
+        >>> list(split('foo'))
+        ['foo']
+        >>> list(split('foo bar'))
+        ['foo', 'bar']
+        >>> list(split('foo;'))
+        ['foo', ';']
+        >>> list(split('"foo"'))
+        ['foo']
+        >>> list(split('foo "asdf quux" bar'))
+        ['foo', 'asdf quux', 'bar']
+        >>> list(split('foo "asdf quux"'))
+        ['foo', 'asdf quux']
+        >>> list(split('# nevermind'))
+        []
+        >>> list(split(''))
+        []
+    """
+
+    if line is None:
+        raise TypeError(line)
+
+    lexer = shlex.shlex(line, posix=True)
+    lexer.commenters = '#'
+    lexer.wordchars += '-./'
+
+    while True:
+        item = lexer.get_token()
+
+        if item is None:
+            break
+
+        yield item
+
+class Block (object):
+    """
+        A block in a dhcp conf includes parameters and sub-blocks.
+    """
+
+    def __init__ (self, key, items=None, blocks=None):
+        """
+            key: tuple      - name of block
+        """
+
+        self.key = key
+        self.items = items or [ ]
+        self.blocks = blocks or [ ]
+
+    def __str__ (self):
+        return ' '.join(self.key)
+
+    def __repr__ (self):
+        return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r}".format(self=self)
+
+class DHCPConfigParser (object):
+    """
+        Simple parser for ISC dhcpd conf files.
+
+        Supports iterative parsing as required for following a dhcpd.leases file.
 
         Doesn't implement the full spec, but a useful approximation.
     """
 
     @classmethod
-    def load (cls, file) :
-        return cls().parse_file(file)
-
-    def __init__ (self) :
-        self.stack = []
-        self.block = None
-        self.items = []
-        self.blocks = []
-    
-    @classmethod
-    def split (cls, line) :
-        """
-            Split given line-data.
-            
-            >>> split = DHCPConfigParser.split
-            >>> split('foo bar')
-            ['foo', 'bar']
-            >>> split('"foo"')
-            ['foo']
-            >>> split('foo "asdf quux" bar')
-            ['foo', 'asdf quux', 'bar']
-            >>> split('foo "asdf quux"')
-            ['foo', 'asdf quux']
+    def load (cls, file, name=None):
         """
-
-        # parse out one str
-        if '"' in line :
-            log.debug("%s", line)
-
-            # crude
-            pre, line = line.split('"', 1)
-            data, post = line.rsplit('"', 1)
+            Parse an complete file, returning the top-level Block.
 
-            return pre.split() + [data] + post.split()
-        else :
-            return line.split()
-
-    @classmethod
-    def lex (self, line) :
+            >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'])
+            Block(None, items=[('foo', )], blocks=[Block(('bar', ), items=[('asdf', 'quux')], blocks=[])])
         """
-            Yield tokens from the given lines.
+        
+        if name is None:
+            name = file.name
 
-            >>> lex = DHCPConfigParser.lex
-            >>> list(lex('foo;'))
-            [('item', ['foo'])]
-            >>> list(item for line in ['foo {', ' bar;', '}'] for item in lex(line))
-            [('open', ['foo']), ('item', ['bar']), ('close', None)]
+        parser = cls(name=name)
 
+        for lineno, line in enumerate(file, 1):
+            try:
+                for item in parser.parse_line(line):
+                    log.debug("%s", item)
+            except DHCPConfigError as error:
+                error.line = lineno
+                raise
+
+        if parser.token:
+            raise DHCPConfError(parser, "Trailing data: {token}".format(token=token), line=lineno)
+
+        return parser.block 
+
+    def __init__ (self, name=None):
+        self.name = name
+
+        # lexer state
+        self.token = []
+        
+        # parser state
+        self.stack = []
+        
+        # top-level block
+        self.block = Block(None)
+
+    def lex (self, line):
+        """
+            Lex one line of input into basic expressions:
+
+                open:   [...] {
+                item:       [..];
+                close:  [] }
+
+            Yields (event, (...)) tokens.
+
+            Raises DHCPConfError.
         """
 
         log.debug("%s", line)
 
-        # comments?
-        if '#' in line :
-            line, comment = line.split('#', 1)
-        else :
-            comment = None
-
-        # clean?
-        line = line.strip()
+        for word in split(line):
+            if word == '{':
+                yield 'open', tuple(self.token)
+            
+            elif word == ';':
+                yield 'item', tuple(self.token)
 
-        # parse
-        if not line :
-            # ignore, empty/comment
-            return
-        
-        elif line.startswith('uid') :
-            # XXX: too hard to parse properly
-            return
+            elif word == '}':
+                if self.token:
+                    raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token))
 
-        elif '{' in line :
-            decl, line = line.split('{', 1)
+                yield 'close', None
 
-            # we are in a new decl
-            yield 'open', self.split(decl)
+            else:
+                self.token.append(word)
+                continue
+
+            self.token = [ ]
        
-        elif ';' in line :
-            param, line = line.split(';', 1)
-            
-            # a stanza
-            yield 'item', self.split(param)
-        
-        elif '}' in line :
-            close, line = line.split('}', 1)
-
-            if close.strip() :
-                log.warn("Predata on close: %s", close)
+    def parse (self, tokens):
+        """
+            Parse given tokens, yielding any complete Blocks.
 
-            # end
-            yield 'close', None
-    
-        else :
-            log.warn("Unknown line: %s", line)
-            return
-
-        # got the whole line?
-        if line.strip() :
-            log.warn("Data remains: %s", line)
-
-    def push_block (self, block) :
-        """
-            Open new block.
+            Note that the Blocks yielded may be within some other Block which is still incomplete.
         """
 
-        self.stack.append((self.block, self.items, self.blocks))
-
-        self.block = block
-        self.items = []
-        self.blocks = []
-
-    def feed_item (self, item) :
-        """
-            Add item to block
-        """
-
-        self.items.append(item)
-
-    def pop_block (self) :
-        """
-            Close block. Returns
-                (block, [items])
-        """
-
-        assert self.block
-
-        block = (self.block, self.items, self.blocks)
-
-        self.block, self.items, self.blocks = self.stack.pop(-1)
-
-        self.blocks.append(block)
-
-        return block
+        for token, args in tokens:
+            if token == 'open':
+                block = Block(args)
 
-    def parse_line (self, line) :
-        """
-            Parse given line, yielding any complete blocks that come out.
-
-            Yields (block, [ lines ]) tuples.
-
-            >>> parser = DHCPConfigParser()
-            >>> list(parser.parse_lines(['foo {', ' bar;', ' quux asdf;', '}']))
-            [(['foo'], [['bar'], ['quux', 'asdf']], [])]
+                log.debug("open block: %s > %s", self.block, block)
+                
+                self.block.blocks.append(block)
+                self.block = block
+                self.stack.append(block)
+            
+            # must be within block!
+            elif token == 'item' :
+                log.debug("block %s item: %s", self.block, args)
 
-            >>> parser = DHCPConfigParser()
-            >>> list(parser.parse_line('foo {'))
-            []
-            >>> list(parser.parse_lines([' bar;', ' quux asdf;']))
-            []
-            >>> list(parser.parse_line('}'))
-            [(['foo'], [['bar'], ['quux', 'asdf']], [])]
-        """
+                self.block.items.append(args)
 
-        for token, args in self.lex(line) :
-            #log.debug("%s: %s [block=%s]", token, args, self.block)
-
-            if token == 'open' :
-                # open new block
-                block = args
-
-                if self.block :
-                    log.debug("nested block: %s > %s", self.block, block)
-                else :
-                    log.debug("open block: %s", block)
-
-                self.push_block(block)
-            
             elif token == 'close' :
                 log.debug("close block: %s", self.block)
 
-                # collected block items
-                yield self.pop_block()
-
-            # must be within block!
-            elif token == 'item' :
-                item = args
+                block = self.block
 
-                log.debug("block %s item: %s", self.block, item)
-                self.feed_item(item)
+                if self.stack:
+                    self.block = self.stack.pop()
+                else:
+                    raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block))
 
-            else :
-                # ???
-                raise KeyError("Unknown token: {0}: {1}".format(token, args))
+                yield block
+
+            else:
+                raise ValueError(token, args)
     
+    def parse_line (self, line):
+        """
+            Lex and parse line tokens.
+        """
+
+        for block in self.parse(self.lex(line)):
+            yield block
+
     def parse_lines (self, lines) :
         """
             Trivial wrapper around parse to parse multiple lines.
         """
 
-        for line in lines :
-            for item in self.parse_line(line) :
-                yield item
-
-    def parse_file (self, file) :
-        """
-            Parse an entire file, returning (items, blocks) lists.
-
-            >>> DHCPConfigParser().parse_file(['foo;', 'barfoo {', 'bar;', '}'])
-            ([['foo']], [(['barfoo'], [['bar']], [])])
-        """
-
-        for line in file :
-            for item in self.parse_line(line) :
-                log.debug("%s", item)
-
-        assert not self.block
-
-        return self.items, self.blocks
-
+        for line in lines:
+            for block in self.parse_line(line) :
+                yield block
+    
 def build_field (value):
     """
         Build a single field as part of a dhcp.conf line.
@@ -260,18 +246,18 @@
 
     return build_line(item, end=';', **opts)
 
-def build_block (block, items, blocks=(), indent=0, comment=None):
+def build_block (block, indent=0, comment=None):
     """
-        Build a complete block.
+        Build a complete Block, recursively, yielding output lines.
 
-        >>> for line in build_block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], comment="Testing"): print line
+        >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]), comment="Testing"): print line
         # Testing
         host foo {
             hardware ethernet 00:11:22:33:44:55;
         }
-        >>> for line in build_block(('group', ), [('next-server', 'booter')], [ \
+        >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \
                     (('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], ()) \
-                ]): print line
+                ])): print line
         group {
             next-server booter;
         <BLANKLINE>
@@ -284,15 +270,15 @@
     if comment:
         yield build_line((), end="# {comment}".format(comment=comment), indent=indent)
 
-    yield build_line(block, end=' {', indent=indent)
+    yield build_line(block.key, end=' {', indent=indent)
     
-    for item in items:
+    for item in block.items:
         yield build_item(item, indent=indent+1)
 
-    for subblock, subitems, subblocks in blocks:
+    for subblock in block.blocks:
         yield ''
 
-        for line in build_block(subblock, subitems, subblocks, indent=indent+1):
+        for line in build_block(subblock, indent=indent+1):
             yield line
 
     yield build_line((), end='}', indent=indent)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/dhcp/tests.py	Sun Mar 01 22:26:29 2015 +0200
@@ -0,0 +1,94 @@
+import itertools
+import unittest
+
+from pvl.dhcp import config
+from StringIO import StringIO
+
+class File(StringIO):
+    @classmethod
+    def lines (cls, *lines):
+        return cls('\n'.join(lines) + '\n')
+
+    def __init__(self, buffer, name='test.file'):
+        StringIO.__init__(self, buffer)
+        self.name = name
+
+class ConfigTest(unittest.TestCase):
+    def setUp(self):
+        self.parser = config.DHCPConfigParser(name='test')
+
+    def assertLexEqual(self, lexed, expected):
+        self.assertEqual(list(lexed), expected)
+
+    def assertBlockEqual(self, block, (key, items, blocks)):
+        self.assertEqual(block.key, key)
+        self.assertEqual(block.items, items)
+
+        for _block, expect_block in itertools.izip_longest(block.blocks, blocks):
+            self.assertBlockEqual(_block, expect_block)
+
+    def _testLex(self, lines, expected):
+        lexed = [item for line in lines for item in self.parser.lex(line)]
+        
+        self.assertEqual(lexed, expected)
+
+    def _testParse(self, lines, expected):
+        for block, expect in itertools.izip_longest(self.parser.parse_lines(lines), expected):
+            self.assertIsNotNone(block, expect)
+            self.assertIsNotNone(expect, block)
+
+            self.assertBlockEqual(block, expect)
+
+    def testLexerEmpty(self):
+        self._testLex([''], [])
+    
+    def testLexerSingleToken(self):
+        self._testLex(['foo;'], [
+            ('item', ('foo', )),
+        ])
+
+    def testLexerSingleTokenWhitespace(self):
+        self._testLex([' foo ;'], [
+            ('item', ('foo', )),
+        ])
+
+    def testLexerSingleLine(self):
+        self._testLex(['foo { bar "quux"; } # ignore'], [
+            ('open', ('foo', )),
+            ('item', ('bar', 'quux')),
+            ('close', None),
+        ])
+
+    def testLexerSingleTokenLines(self):
+        self._testLex(['foo {'], [('open', ('foo', ))])
+        self._testLex([' bar;'], [('item', ('bar', ))])
+        self._testLex(['}'], [('close', None)])
+
+    def testLexerSingleTokens(self):
+        self._testLex(['foo', '  {  ', 'bar', '', '"quux"', ';', '}'], [
+            ('open', ('foo', )),
+            ('item', ('bar', 'quux')),
+            ('close', None),
+        ])
+    
+    def testParse(self):
+        self._testParse(['foo {'], [])
+        self._testParse([' bar;', ' quux asdf;'], [])
+        self._testParse(['}'], [
+            (('foo', ), [('bar', ), ('quux', 'asdf')], []),
+        ])
+
+    def testParseConf(self):
+        self.assertBlockEqual(config.DHCPConfigParser.load(File("""
+group {
+    next-server boot.test;
+    filename "/debian/wheezy/pxelinux.0";
+
+    include "hosts/test.conf";
+}
+        """)), (('group', ), [
+            ('next-server', 'boot.test'),
+            ('filename', "/debian/wheezy/pxelinux.0"),
+            ('include', "hosts/test.conf"),
+        ], []))
+