--- a/pvl/dhcp/config.py Sun Mar 01 20:25:52 2015 +0200
+++ b/pvl/dhcp/config.py Sun Mar 01 22:26:29 2015 +0200
@@ -1,225 +1,211 @@
-"""
- Simple parser for ISC dhcpd config files.
-"""
+import logging; log = logging.getLogger('pvl.dhcp.config')
+import shlex
-import logging; log = logging.getLogger('pvl.dhcp.config')
+class DHCPConfigError(Exception):
+ def __init__ (self, parser, error, line=None):
+ self.parser = parser
-class DHCPConfigParser (object) :
+ self.name = parser.name
+ self.line = line
+ self.error = error
+
+ def __str__ (self):
+ return "{self.name}:{self.line}: {self.error}".format(self=self)
+
+def split (line) :
"""
- Simplistic parser for a dhcpd.leases file.
+ Split given line-data into raw tokens.
+
+ >>> list(split('foo'))
+ ['foo']
+ >>> list(split('foo bar'))
+ ['foo', 'bar']
+ >>> list(split('foo;'))
+ ['foo', ';']
+ >>> list(split('"foo"'))
+ ['foo']
+ >>> list(split('foo "asdf quux" bar'))
+ ['foo', 'asdf quux', 'bar']
+ >>> list(split('foo "asdf quux"'))
+ ['foo', 'asdf quux']
+ >>> list(split('# nevermind'))
+ []
+ >>> list(split(''))
+ []
+ """
+
+ if line is None:
+ raise TypeError(line)
+
+ lexer = shlex.shlex(line, posix=True)
+ lexer.commenters = '#'
+ lexer.wordchars += '-./'
+
+ while True:
+ item = lexer.get_token()
+
+ if item is None:
+ break
+
+ yield item
+
+class Block (object):
+ """
+ A block in a dhcp conf includes parameters and sub-blocks.
+ """
+
+ def __init__ (self, key, items=None, blocks=None):
+ """
+ key: tuple - name of block
+ """
+
+ self.key = key
+ self.items = items or [ ]
+ self.blocks = blocks or [ ]
+
+ def __str__ (self):
+ return ' '.join(self.key)
+
+ def __repr__ (self):
+ return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r}".format(self=self)
+
+class DHCPConfigParser (object):
+ """
+ Simple parser for ISC dhcpd conf files.
+
+ Supports iterative parsing as required for following a dhcpd.leases file.
Doesn't implement the full spec, but a useful approximation.
"""
@classmethod
- def load (cls, file) :
- return cls().parse_file(file)
-
- def __init__ (self) :
- self.stack = []
- self.block = None
- self.items = []
- self.blocks = []
-
- @classmethod
- def split (cls, line) :
- """
- Split given line-data.
-
- >>> split = DHCPConfigParser.split
- >>> split('foo bar')
- ['foo', 'bar']
- >>> split('"foo"')
- ['foo']
- >>> split('foo "asdf quux" bar')
- ['foo', 'asdf quux', 'bar']
- >>> split('foo "asdf quux"')
- ['foo', 'asdf quux']
+ def load (cls, file, name=None):
"""
-
- # parse out one str
- if '"' in line :
- log.debug("%s", line)
-
- # crude
- pre, line = line.split('"', 1)
- data, post = line.rsplit('"', 1)
+ Parse an complete file, returning the top-level Block.
- return pre.split() + [data] + post.split()
- else :
- return line.split()
-
- @classmethod
- def lex (self, line) :
+ >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'])
+ Block(None, items=[('foo', )], blocks=[Block(('bar', ), items=[('asdf', 'quux')], blocks=[])])
"""
- Yield tokens from the given lines.
+
+ if name is None:
+ name = file.name
- >>> lex = DHCPConfigParser.lex
- >>> list(lex('foo;'))
- [('item', ['foo'])]
- >>> list(item for line in ['foo {', ' bar;', '}'] for item in lex(line))
- [('open', ['foo']), ('item', ['bar']), ('close', None)]
+ parser = cls(name=name)
+ for lineno, line in enumerate(file, 1):
+ try:
+ for item in parser.parse_line(line):
+ log.debug("%s", item)
+ except DHCPConfigError as error:
+ error.line = lineno
+ raise
+
+ if parser.token:
+ raise DHCPConfError(parser, "Trailing data: {token}".format(token=token), line=lineno)
+
+ return parser.block
+
+ def __init__ (self, name=None):
+ self.name = name
+
+ # lexer state
+ self.token = []
+
+ # parser state
+ self.stack = []
+
+ # top-level block
+ self.block = Block(None)
+
+ def lex (self, line):
+ """
+ Lex one line of input into basic expressions:
+
+ open: [...] {
+ item: [..];
+ close: [] }
+
+ Yields (event, (...)) tokens.
+
+ Raises DHCPConfError.
"""
log.debug("%s", line)
- # comments?
- if '#' in line :
- line, comment = line.split('#', 1)
- else :
- comment = None
-
- # clean?
- line = line.strip()
+ for word in split(line):
+ if word == '{':
+ yield 'open', tuple(self.token)
+
+ elif word == ';':
+ yield 'item', tuple(self.token)
- # parse
- if not line :
- # ignore, empty/comment
- return
-
- elif line.startswith('uid') :
- # XXX: too hard to parse properly
- return
+ elif word == '}':
+ if self.token:
+ raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token))
- elif '{' in line :
- decl, line = line.split('{', 1)
+ yield 'close', None
- # we are in a new decl
- yield 'open', self.split(decl)
+ else:
+ self.token.append(word)
+ continue
+
+ self.token = [ ]
- elif ';' in line :
- param, line = line.split(';', 1)
-
- # a stanza
- yield 'item', self.split(param)
-
- elif '}' in line :
- close, line = line.split('}', 1)
-
- if close.strip() :
- log.warn("Predata on close: %s", close)
+ def parse (self, tokens):
+ """
+ Parse given tokens, yielding any complete Blocks.
- # end
- yield 'close', None
-
- else :
- log.warn("Unknown line: %s", line)
- return
-
- # got the whole line?
- if line.strip() :
- log.warn("Data remains: %s", line)
-
- def push_block (self, block) :
- """
- Open new block.
+ Note that the Blocks yielded may be within some other Block which is still incomplete.
"""
- self.stack.append((self.block, self.items, self.blocks))
-
- self.block = block
- self.items = []
- self.blocks = []
-
- def feed_item (self, item) :
- """
- Add item to block
- """
-
- self.items.append(item)
-
- def pop_block (self) :
- """
- Close block. Returns
- (block, [items])
- """
-
- assert self.block
-
- block = (self.block, self.items, self.blocks)
-
- self.block, self.items, self.blocks = self.stack.pop(-1)
-
- self.blocks.append(block)
-
- return block
+ for token, args in tokens:
+ if token == 'open':
+ block = Block(args)
- def parse_line (self, line) :
- """
- Parse given line, yielding any complete blocks that come out.
-
- Yields (block, [ lines ]) tuples.
-
- >>> parser = DHCPConfigParser()
- >>> list(parser.parse_lines(['foo {', ' bar;', ' quux asdf;', '}']))
- [(['foo'], [['bar'], ['quux', 'asdf']], [])]
+ log.debug("open block: %s > %s", self.block, block)
+
+ self.block.blocks.append(block)
+ self.block = block
+ self.stack.append(block)
+
+ # must be within block!
+ elif token == 'item' :
+ log.debug("block %s item: %s", self.block, args)
- >>> parser = DHCPConfigParser()
- >>> list(parser.parse_line('foo {'))
- []
- >>> list(parser.parse_lines([' bar;', ' quux asdf;']))
- []
- >>> list(parser.parse_line('}'))
- [(['foo'], [['bar'], ['quux', 'asdf']], [])]
- """
+ self.block.items.append(args)
- for token, args in self.lex(line) :
- #log.debug("%s: %s [block=%s]", token, args, self.block)
-
- if token == 'open' :
- # open new block
- block = args
-
- if self.block :
- log.debug("nested block: %s > %s", self.block, block)
- else :
- log.debug("open block: %s", block)
-
- self.push_block(block)
-
elif token == 'close' :
log.debug("close block: %s", self.block)
- # collected block items
- yield self.pop_block()
-
- # must be within block!
- elif token == 'item' :
- item = args
+ block = self.block
- log.debug("block %s item: %s", self.block, item)
- self.feed_item(item)
+ if self.stack:
+ self.block = self.stack.pop()
+ else:
+ raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block))
- else :
- # ???
- raise KeyError("Unknown token: {0}: {1}".format(token, args))
+ yield block
+
+ else:
+ raise ValueError(token, args)
+ def parse_line (self, line):
+ """
+ Lex and parse line tokens.
+ """
+
+ for block in self.parse(self.lex(line)):
+ yield block
+
def parse_lines (self, lines) :
"""
Trivial wrapper around parse to parse multiple lines.
"""
- for line in lines :
- for item in self.parse_line(line) :
- yield item
-
- def parse_file (self, file) :
- """
- Parse an entire file, returning (items, blocks) lists.
-
- >>> DHCPConfigParser().parse_file(['foo;', 'barfoo {', 'bar;', '}'])
- ([['foo']], [(['barfoo'], [['bar']], [])])
- """
-
- for line in file :
- for item in self.parse_line(line) :
- log.debug("%s", item)
-
- assert not self.block
-
- return self.items, self.blocks
-
+ for line in lines:
+ for block in self.parse_line(line) :
+ yield block
+
def build_field (value):
"""
Build a single field as part of a dhcp.conf line.
@@ -260,18 +246,18 @@
return build_line(item, end=';', **opts)
-def build_block (block, items, blocks=(), indent=0, comment=None):
+def build_block (block, indent=0, comment=None):
"""
- Build a complete block.
+ Build a complete Block, recursively, yielding output lines.
- >>> for line in build_block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], comment="Testing"): print line
+ >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]), comment="Testing"): print line
# Testing
host foo {
hardware ethernet 00:11:22:33:44:55;
}
- >>> for line in build_block(('group', ), [('next-server', 'booter')], [ \
+ >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \
(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], ()) \
- ]): print line
+ ])): print line
group {
next-server booter;
<BLANKLINE>
@@ -284,15 +270,15 @@
if comment:
yield build_line((), end="# {comment}".format(comment=comment), indent=indent)
- yield build_line(block, end=' {', indent=indent)
+ yield build_line(block.key, end=' {', indent=indent)
- for item in items:
+ for item in block.items:
yield build_item(item, indent=indent+1)
- for subblock, subitems, subblocks in blocks:
+ for subblock in block.blocks:
yield ''
- for line in build_block(subblock, subitems, subblocks, indent=indent+1):
+ for line in build_block(subblock, indent=indent+1):
yield line
yield build_line((), end='}', indent=indent)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/dhcp/tests.py Sun Mar 01 22:26:29 2015 +0200
@@ -0,0 +1,94 @@
+import itertools
+import unittest
+
+from pvl.dhcp import config
+from StringIO import StringIO
+
+class File(StringIO):
+ @classmethod
+ def lines (cls, *lines):
+ return cls('\n'.join(lines) + '\n')
+
+ def __init__(self, buffer, name='test.file'):
+ StringIO.__init__(self, buffer)
+ self.name = name
+
+class ConfigTest(unittest.TestCase):
+ def setUp(self):
+ self.parser = config.DHCPConfigParser(name='test')
+
+ def assertLexEqual(self, lexed, expected):
+ self.assertEqual(list(lexed), expected)
+
+ def assertBlockEqual(self, block, (key, items, blocks)):
+ self.assertEqual(block.key, key)
+ self.assertEqual(block.items, items)
+
+ for _block, expect_block in itertools.izip_longest(block.blocks, blocks):
+ self.assertBlockEqual(_block, expect_block)
+
+ def _testLex(self, lines, expected):
+ lexed = [item for line in lines for item in self.parser.lex(line)]
+
+ self.assertEqual(lexed, expected)
+
+ def _testParse(self, lines, expected):
+ for block, expect in itertools.izip_longest(self.parser.parse_lines(lines), expected):
+ self.assertIsNotNone(block, expect)
+ self.assertIsNotNone(expect, block)
+
+ self.assertBlockEqual(block, expect)
+
+ def testLexerEmpty(self):
+ self._testLex([''], [])
+
+ def testLexerSingleToken(self):
+ self._testLex(['foo;'], [
+ ('item', ('foo', )),
+ ])
+
+ def testLexerSingleTokenWhitespace(self):
+ self._testLex([' foo ;'], [
+ ('item', ('foo', )),
+ ])
+
+ def testLexerSingleLine(self):
+ self._testLex(['foo { bar "quux"; } # ignore'], [
+ ('open', ('foo', )),
+ ('item', ('bar', 'quux')),
+ ('close', None),
+ ])
+
+ def testLexerSingleTokenLines(self):
+ self._testLex(['foo {'], [('open', ('foo', ))])
+ self._testLex([' bar;'], [('item', ('bar', ))])
+ self._testLex(['}'], [('close', None)])
+
+ def testLexerSingleTokens(self):
+ self._testLex(['foo', ' { ', 'bar', '', '"quux"', ';', '}'], [
+ ('open', ('foo', )),
+ ('item', ('bar', 'quux')),
+ ('close', None),
+ ])
+
+ def testParse(self):
+ self._testParse(['foo {'], [])
+ self._testParse([' bar;', ' quux asdf;'], [])
+ self._testParse(['}'], [
+ (('foo', ), [('bar', ), ('quux', 'asdf')], []),
+ ])
+
+ def testParseConf(self):
+ self.assertBlockEqual(config.DHCPConfigParser.load(File("""
+group {
+ next-server boot.test;
+ filename "/debian/wheezy/pxelinux.0";
+
+ include "hosts/test.conf";
+}
+ """)), (('group', ), [
+ ('next-server', 'boot.test'),
+ ('filename', "/debian/wheezy/pxelinux.0"),
+ ('include', "hosts/test.conf"),
+ ], []))
+