terom@666: import logging; log = logging.getLogger('pvl.dhcp.config') terom@666: import shlex terom@673: import string terom@673: terom@673: # simplified model of lexer chars terom@679: TOKEN_START = string.ascii_letters + string.digits + '-' terom@679: TOKEN = TOKEN_START + '_' terom@679: terom@679: # be far more lenient when parsing terom@679: TOKEN_EXTRA = TOKEN + ':.' terom@679: terom@679: UNQUOTED_CONTEXT = set(( terom@679: ('host', ), terom@679: ('fixed-address', ), terom@679: ('next-server', ), terom@679: ('hardware', 'ethernet'), terom@679: )) terom@255: terom@666: class DHCPConfigError(Exception): terom@666: def __init__ (self, parser, error, line=None): terom@666: self.parser = parser terom@255: terom@666: self.name = parser.name terom@666: self.line = line terom@666: self.error = error terom@666: terom@666: def __str__ (self): terom@666: return "{self.name}:{self.line}: {self.error}".format(self=self) terom@666: terom@666: def split (line) : terom@255: """ terom@666: Split given line-data into raw tokens. terom@666: terom@666: >>> list(split('foo')) terom@666: ['foo'] terom@666: >>> list(split('foo bar')) terom@666: ['foo', 'bar'] terom@666: >>> list(split('foo;')) terom@666: ['foo', ';'] terom@666: >>> list(split('"foo"')) terom@666: ['foo'] terom@666: >>> list(split('foo "asdf quux" bar')) terom@666: ['foo', 'asdf quux', 'bar'] terom@666: >>> list(split('foo "asdf quux"')) terom@666: ['foo', 'asdf quux'] terom@666: >>> list(split('# nevermind')) terom@666: [] terom@666: >>> list(split('')) terom@666: [] terom@673: >>> list(split('next-server foo')) terom@673: ['next-server', 'foo'] terom@673: >>> list(split('include "foo/bar.conf";')) terom@673: ['include', 'foo/bar.conf', ';'] terom@666: """ terom@666: terom@666: if line is None: terom@666: raise TypeError(line) terom@666: terom@666: lexer = shlex.shlex(line, posix=True) terom@666: lexer.commenters = '#' terom@679: lexer.wordchars = TOKEN_EXTRA terom@666: terom@666: while True: terom@666: item = lexer.get_token() terom@666: terom@666: if item is None: terom@666: break terom@666: terom@666: yield item terom@666: tero@695: class Field (object): tero@695: """ tero@695: Pre-quoted fields for use in DHCP confs. tero@695: """ tero@695: tero@695: def __init__(self, token): tero@695: self.token = token tero@695: tero@695: def __str__(self): tero@695: return self.token tero@695: tero@699: class String (Field): tero@699: """ tero@699: A quoted string tero@699: """ tero@699: tero@699: def __init__(self, string): tero@699: self.string = string tero@699: tero@699: def __str__(self): tero@699: # TODO: escape tero@699: return '"{self.string}"'.format(self=self) tero@699: terom@679: def quote (value, context=None): terom@673: """ terom@673: Build a single field as part of a dhcp.conf line. terom@673: terom@673: >>> print quote('foo') terom@673: foo terom@673: >>> print quote('foo bar') terom@673: "foo bar" terom@673: >>> print quote('foo/bar.conf') terom@673: "foo/bar.conf" terom@673: >>> print quote(5) terom@673: 5 terom@673: >>> print quote('5') terom@679: 5 terom@679: >>> print quote('foo.bar') terom@679: "foo.bar" terom@679: >>> print quote('foo.bar', context=('host', )) terom@679: foo.bar terom@679: >>> print quote('192.0.2.1', context=('fixed-address', )) terom@679: 192.0.2.1 terom@679: >>> print quote('00:11:22:33:44:55', context=('hardware', 'ethernet')) terom@679: 00:11:22:33:44:55 tero@695: >>> print quote(Field('1:00:11:22:33:44:55')) tero@695: 1:00:11:22:33:44:55 tero@699: >>> print quote(String('foobar')) tero@699: "foobar" terom@673: """ terom@673: tero@695: if isinstance(value, Field): tero@695: return str(value) tero@695: elif isinstance(value, int): terom@673: return str(value) terom@679: elif context in UNQUOTED_CONTEXT: terom@679: return str(value) terom@679: elif value[0] in TOKEN_START and all(c in TOKEN for c in value): terom@673: return str(value) terom@673: else: terom@673: # quoted terom@673: return '"{value}"'.format(value=value) terom@673: terom@666: class Block (object): terom@666: """ terom@666: A block in a dhcp conf includes parameters and sub-blocks. terom@666: """ terom@666: terom@668: def __init__ (self, key, items=None, blocks=None, comment=None): terom@666: """ terom@666: key: tuple - name of block terom@666: """ terom@666: terom@666: self.key = key terom@666: self.items = items or [ ] terom@666: self.blocks = blocks or [ ] terom@668: self.comment = comment terom@666: terom@666: def __str__ (self): tero@707: if self.key: tero@707: return ' '.join(self.key) tero@707: else: tero@707: # XXX: Item.__str__ tero@707: return '; '.join(' '.join(str(x) for x in item) for item in self.items) terom@666: terom@666: def __repr__ (self): terom@682: return "Block({self.key!r}, items={self.items!r}, blocks={self.blocks!r})".format(self=self) terom@666: terom@666: class DHCPConfigParser (object): terom@666: """ terom@666: Simple parser for ISC dhcpd conf files. terom@666: terom@666: Supports iterative parsing as required for following a dhcpd.leases file. terom@255: terom@255: Doesn't implement the full spec, but a useful approximation. terom@255: """ terom@255: terom@256: @classmethod terom@666: def load (cls, file, name=None): terom@255: """ terom@666: Parse an complete file, returning the top-level Block. terom@255: terom@673: >>> DHCPConfigParser.load(['foo;', 'bar {', '\tasdf "quux";', '}'], name='test') terom@682: Block(None, items=[('foo',)], blocks=[Block(('bar',), items=[('asdf', 'quux')], blocks=[])]) terom@255: """ terom@666: terom@666: if name is None: terom@666: name = file.name terom@255: terom@666: parser = cls(name=name) terom@255: terom@666: for lineno, line in enumerate(file, 1): terom@666: try: terom@666: for item in parser.parse_line(line): terom@666: log.debug("%s", item) terom@666: except DHCPConfigError as error: terom@666: error.line = lineno terom@666: raise terom@666: terom@666: if parser.token: terom@682: raise DHCPConfError(parser, "Trailing data: {token}".format(token=parser.token), line=lineno) terom@682: terom@682: if parser.stack: terom@682: raise DHCPConfError(parser, "Unterminated block: {stack}".format(stack=parser.stack), line=lineno) terom@666: terom@666: return parser.block terom@666: terom@666: def __init__ (self, name=None): terom@666: self.name = name terom@666: terom@666: # lexer state terom@666: self.token = [] terom@666: terom@666: # parser state terom@666: self.stack = [] terom@666: terom@666: # top-level block terom@666: self.block = Block(None) terom@666: terom@666: def lex (self, line): terom@666: """ terom@666: Lex one line of input into basic expressions: terom@666: terom@666: open: [...] { terom@666: item: [..]; terom@666: close: [] } terom@666: terom@666: Yields (event, (...)) tokens. terom@666: terom@666: Raises DHCPConfError. terom@255: """ terom@255: terom@255: log.debug("%s", line) terom@255: terom@666: for word in split(line): terom@666: if word == '{': terom@666: yield 'open', tuple(self.token) terom@666: terom@666: elif word == ';': terom@666: yield 'item', tuple(self.token) terom@255: terom@666: elif word == '}': terom@666: if self.token: terom@666: raise DHCPConfError(self, "Leading data on close: {token}".format(token=self.token)) terom@255: terom@666: yield 'close', None terom@255: terom@666: else: terom@666: self.token.append(word) terom@666: continue terom@666: terom@666: self.token = [ ] terom@255: terom@666: def parse (self, tokens): terom@666: """ terom@666: Parse given tokens, yielding any complete Blocks. terom@255: terom@666: Note that the Blocks yielded may be within some other Block which is still incomplete. terom@255: """ terom@256: terom@666: for token, args in tokens: terom@666: if token == 'open': terom@666: block = Block(args) terom@256: terom@666: log.debug("open block: %s > %s", self.block, block) terom@666: terom@682: self.stack.append(self.block) terom@666: self.block.blocks.append(block) terom@666: self.block = block terom@666: terom@666: # must be within block! terom@666: elif token == 'item' : terom@666: log.debug("block %s item: %s", self.block, args) terom@255: terom@666: self.block.items.append(args) terom@255: terom@255: elif token == 'close' : terom@255: log.debug("close block: %s", self.block) terom@255: terom@666: block = self.block terom@255: terom@666: if self.stack: terom@666: self.block = self.stack.pop() terom@666: else: terom@666: raise DHCPConfigError(self, "Mismatched block close: {block}".format(block=block)) terom@255: terom@666: yield block terom@666: terom@666: else: terom@666: raise ValueError(token, args) terom@255: terom@666: def parse_line (self, line): terom@666: """ terom@666: Lex and parse line tokens. terom@666: """ terom@666: terom@666: for block in self.parse(self.lex(line)): terom@666: yield block terom@666: terom@255: def parse_lines (self, lines) : terom@255: """ terom@255: Trivial wrapper around parse to parse multiple lines. terom@255: """ terom@255: terom@666: for line in lines: terom@666: for block in self.parse_line(line) : terom@666: yield block terom@666: terom@679: def build_line (item, end, indent=0, context=None): tero@476: """ tero@476: Build a structured line. tero@476: terom@679: >>> print build_line(('foo', ), ';') tero@476: foo; terom@679: >>> print build_line(('host', 'foo'), ' {') tero@476: host foo { terom@679: >>> print build_line(('foo', 'bar quux'), ';', indent=1) tero@476: foo "bar quux"; tero@476: """ tero@476: terom@679: return ' '*indent + ' '.join(quote(field, context=context) for field in item) + end tero@476: tero@476: def build_item (item, **opts): tero@476: """ tero@476: Build a single parameter line. tero@476: terom@679: >>> print build_item(('foo', )) tero@476: foo; tero@476: """ tero@476: terom@679: return build_line(item, end=';', terom@679: context = item[:-1], terom@679: **opts terom@679: ) tero@476: terom@668: def build_block (block, indent=0): tero@476: """ terom@666: Build a complete Block, recursively, yielding output lines. tero@476: terom@681: >>> for line in build_block(Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')], comment="Testing")): print line tero@476: # Testing tero@476: host foo { tero@476: hardware ethernet 00:11:22:33:44:55; tero@476: } terom@666: >>> for line in build_block(Block(('group', ), [('next-server', 'booter')], [ \ terom@681: Block(('host', 'foo'), [('hardware', 'ethernet', '00:11:22:33:44:55')]) \ terom@666: ])): print line tero@476: group { tero@476: next-server booter; tero@476: tero@476: host foo { tero@476: hardware ethernet 00:11:22:33:44:55; tero@476: } tero@476: } tero@476: """ tero@476: terom@668: if block.comment: terom@668: yield build_line((), end="# {comment}".format(comment=block.comment), indent=indent) tero@685: tero@685: if block.key: tero@685: yield build_line(block.key, end=' {', indent=indent, context=block.key[0:1]) tero@685: indent += 1 tero@476: terom@666: for item in block.items: tero@685: yield build_item(item, indent=indent) tero@476: terom@666: for subblock in block.blocks: tero@685: if block.items: tero@685: yield '' tero@476: tero@685: for line in build_block(subblock, indent=indent): tero@476: yield line tero@685: tero@685: if block.key: tero@685: indent -= 1 tero@685: yield build_line((), end='}', indent=indent)