initial code
authorTero Marttila <terom@fixme.fi>
Thu, 02 Apr 2009 17:47:43 +0300
changeset 0 257003279747
child 1 2223ade4f259
initial code
addr.py
conf.py
conf_dhcp.py
test_dhcp.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/addr.py	Thu Apr 02 17:47:43 2009 +0300
@@ -0,0 +1,30 @@
+"""
+    Used to define IP-address/subnet stuff
+"""
+
+import IPy
+
+class IP (IPy.IP, object) :
+    """
+        A literal IPv4 address
+    """
+    
+    def __init__ (self, address) :
+        """
+            Parse the given literal IP address in "a.b.c.d" form
+        """
+    
+        super(IP, self).__init__(address)
+
+class Network (IPy.IP, object) :
+    """
+        An IPv4 network (subnet)
+    """
+
+    def __init__ (self, prefix) :
+        """
+            Parse the given prefix in "a.b.c.d/l" form
+        """
+
+        super(Network, self).__init__(prefix)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/conf.py	Thu Apr 02 17:47:43 2009 +0300
@@ -0,0 +1,114 @@
+"""
+    Generic configuration file output
+"""
+
+import os, tempfile, shutil
+
+class Object (object) :
+    """
+        An object that can be written to a ConfFile, as multiple lines of text.
+    """
+
+    def fmt_lines (self) :
+        """
+            Yield a series of lines to be output in the file
+        """
+        
+        abstract
+
+class File (object) :
+    """
+        A single configuration file on the filesystem.
+
+        Configuration files are 
+    """
+
+    def __init__ (self, name, path, backup_suffix='.bak', mode=0644) :
+        """
+            Initialize the config file, but don't open it yet
+
+            @param name the human-readable friendly name for the config file
+            @param path the full filesystem path for the file
+            @param backup_suffix rename the old file by adding this suffix when replacing it
+            @param mode the permission bits for the new file
+        """
+  
+        self.name = name
+        self.path = path
+        self.backup_suffix = backup_suffix
+        self.mode = mode
+    
+    def write_file (self, file, objects) :
+        """
+            Write out the given config objects into the given file
+        """
+
+        writer = Writer(file)
+        writer.write_objs(objects)
+
+    def write (self, objects) :
+        """
+            Write out a new config file with the given series of objects using the following procedure:
+                * lock the real file
+                * open a new temporary file, and write out the objects into that, closing it
+                * move the real file out of the way
+                * move the temporary file into the real file's place
+                * unlock the real file
+        """
+
+        # XXX: how to aquire the lock?
+        # XXX: this needs checking over
+
+        # open the new temporary file
+        # XXX: ensure fd is closed
+        tmp_fd, tmp_path = tempfile.mkstemp(prefix=self.name)
+
+        # ...as a file
+        tmp_file = os.fdopen(tmp_fd, "w")
+        
+        # fix the permissions
+        os.chmod(tmp_path, self.mode)
+
+        # write it
+        self.write_file(tmp_file, objects)
+
+        # close it
+        tmp_file.close()
+        del writer
+        del tmp_file
+        
+        # move the old file out of the way
+        os.rename(self.path, self.path + self.backup_suffix)
+
+        # move the new file in
+        shutil.move(tmp_path, self.path)
+
+class Writer (object) :
+    """
+        A conf.Writer is used to write out a new conf.File (as a temporary file)
+    """
+
+    def __init__ (self, file) :
+        """
+            @param file the temporary file object
+        """
+
+        self.file = file
+    
+    def write_obj (self, obj) :
+        """
+            Write a single object to the file
+        """
+        
+        # just write out all the lines
+        self.file.writelines(obj.fmt_lines())
+
+    def write_objs (self, objs) :
+        """
+            Write a series of objects to the file
+        """
+        
+        # just write each in turn
+        for obj in objs :
+            self.write_obj(obj)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/conf_dhcp.py	Thu Apr 02 17:47:43 2009 +0300
@@ -0,0 +1,227 @@
+"""
+    Configuration file output for the ISC DHCP server
+"""
+
+import conf
+
+import itertools
+
+class ConfDHCP (conf.File) :
+    def __init__ (self, name="dhcpd.conf", path="/etc/dhcp3/dhcpd.conf") :
+        """
+            Initialize the dhcpd config file, but don't open it yet
+
+            @see conf.ConfFile.__init__
+        """
+        
+        super(ConfDHCP, self).__init__(name, path)
+
+class Statement (conf.Object) :
+    """
+      A statement is a single line in the config file
+    """
+
+    def __init__ (self, name, *args) :
+        """
+            The statement will be formatted like this:
+                <name> [ <arg> [ ... ] ] ";"
+        """
+
+        self.name = name
+        self.args = args
+    
+    def _fmt_arg (self, arg) :
+        """
+            Formats a arg for use in output, the following types are supported:
+
+                list/tuple/iter:    results in a comma-and-space separated list of formatted values
+                unicode:            results in an encoded str
+                str:                results in the string itself, quoted if needed
+                other:              attempt to convert to a str, and then format that
+        """
+        
+        # format lists specially
+        # XXX: iterators?
+        if isinstance(arg, (list, tuple)) :
+            # recurse as a comma-and-space separated list
+            return ', '.join(self._fmt_arg(a) for a in arg)
+
+        elif isinstance(arg, Literal) :
+            # use what it specifies
+            return arg.fmt_arg()
+
+        elif isinstance(arg, unicode) :
+            # recurse with the str version
+            # XXX: what encoding to use?
+            return self._fmt_arg(arg.encode('utf8'))
+
+        elif isinstance(arg, str) :
+            # XXX: quoting
+            return arg
+        
+        else :
+            # try and use it as a string
+            return self._fmt_arg(str(arg))
+    
+    def _fmt_data (self) :
+        """
+            Formats the statement name/params as a single line
+        """
+
+        return "%s%s" % (self.name, (' ' + ' '.join(self._fmt_arg(a) for a in self.args)) if self.args else '')
+
+class Literal (Statement) :
+    """
+        A literal is something that goes into the config file as-is, with no formatting or escaping applied.
+    """
+
+    def __init__ (self, literal) :
+        self.literal = literal
+    
+    def fmt_arg (self) :
+        return self.literal
+
+    def fmt_lines (self) :
+        yield self.literal
+
+class Parameter (Statement) :
+    """
+        A parameter is a single statement that configures the behaviour of something.
+
+        Parameters have a name, and optionally, a number of arguments, and are formatted as statements terminated with
+        a semicolon.
+    """
+    
+    def fmt_lines (self) :
+        """
+            Yields a single ;-terminated line
+        """
+
+        yield "%s;" % self._fmt_data()
+
+class Declaration (Statement) :
+    """
+        A declaration begins like a statement (with name and args), but then contains a block of any number of
+        parameters followed by any number of nested declarations.
+
+        <name> [ <args> [ ... ] ] {
+            [ <parameters> ]
+            [ <declarations> ]
+        }
+        
+    """
+
+    def __init__ (self, name, args=[], params=[], decls=[]) :
+        """
+            The name/args will be formatted as in Statement, but params should be an iterable of Parameters, and decls
+            an iterable of Declarations.
+        """
+        
+        # init the statement bit
+        Statement.__init__(self, name, *args)
+
+        # store the iterables
+        self.params = params
+        self.decls = decls
+
+    def fmt_lines (self) :
+        """
+            Yields a header line, a series of indented body lines, and the footer line
+        """
+        
+        # the header to open the block
+        yield "%s {" % self._fmt_data()
+        
+        # then output each content line
+        for stmt in itertools.chain(self.params, self.decls) :
+            # ..indented
+            for line in stmt.fmt_lines() :
+                yield "\t%s" % line
+        
+        # and then close the block
+        yield "}"
+
+class SharedNetwork (Declaration) :
+    """
+        A shared-network declaration is used to define a set of subnets that share the same physical network,
+        optionally with some shared params.
+
+        shared-network <name> {
+            [ parameters ]
+            [ declarations ]
+        }
+    """
+
+    def __init__ (self, name, params=[], decls=[]) :
+        """
+            @param name the name of the shared-subnet
+            @param params optional parameters
+            @param decls the iterable of subnets or other declarations in the shared network
+        """
+
+        super(SharedNetwork, self).__init__("shared-network", [name], params, decls)
+
+class Subnet (Declaration) :
+    """
+        A subnet is used to provide the information about a subnet required to identify whether or not an IP address is
+        on that subnet, and may also be used to specify parameters/declarations for that subnet.
+        
+        subnet <subnet-number> netmask <netmask> {
+            [ parameters ]
+            [ declarations ]
+        }
+    """
+
+    def __init__ (self, network, params=[], decls=[]) :
+        """
+            @param network the addr.Network for the subnet
+            @param params optional parameters
+            @param decls optional decls, e.g. subnets
+        """
+
+        super(Subnet, self).__init__("subnet", [network.net(), "netmask", network.netmask()], params, decls)
+
+class Group (Declaration) :
+    """
+        A group is simply used to apply a set of parameters to a set of declarations.
+
+        group {
+            [ parameters ]
+            [ declarations ]
+        }
+    """
+
+    def __init__ (self, params=[], decls=[]) :
+        super(Group, self).__init__("group", [], params, decls)
+
+
+class Host (Declaration) :
+    """
+        A host is used to match a request against specific host, and then apply settings for that host.
+
+        The "hostname" is the DHCP name to identify the host. 
+
+        If no dhcp-client-identifier option is specified in the parameters, then the host is matched using the
+        "hardware" parameter.
+
+        host <hostname> {
+            [ parameters ]
+            [ declarations ]
+        }
+    """
+
+    def __init__ (self, hostname, params=[], decls=[]) :
+        super(Host, self).__init__("host", [hostname], params, decls)
+
+class Option (Parameter) :
+    """
+        A generic 'option' parameter for a dhcpd.conf file
+    """
+
+    def __init__ (self, name, *args) :
+        """
+            Formatted as a Satement with a name of "option <name>".
+        """
+
+        super(Option, self).__init__("option %s" % name, *args)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test_dhcp.py	Thu Apr 02 17:47:43 2009 +0300
@@ -0,0 +1,103 @@
+"""
+    Test conf_dhcp
+"""
+
+import conf_dhcp as dhcpc, conf, addr
+
+import unittest
+from cStringIO import StringIO
+
+class TestConfDHCP (unittest.TestCase) :
+    def assert_stmt (self, stmt, line) :
+        """
+            Formats the given Statement, and compares the output against the given line
+        """
+        
+        self.assertEqual(stmt._fmt_data(), line)
+    
+    def test_statement (self) :
+        self.assert_stmt(dhcpc.Statement("stmt0"),                           "stmt0")
+        self.assert_stmt(dhcpc.Statement("stmt3", [ "this", "that" ]),       "stmt3 this, that")
+        self.assert_stmt(dhcpc.Statement("stmt4", dhcpc.Literal("...")),     "stmt4 ...")
+        self.assert_stmt(dhcpc.Statement("stmt1", u"quux"),                  "stmt1 quux")
+        self.assert_stmt(dhcpc.Statement("stmt1", "bar"),                    "stmt1 bar")
+        self.assert_stmt(dhcpc.Statement("stmt2", 1),                        "stmt2 1")
+ 
+    def assert_obj (self, obj, lines) :
+        """
+            Formats the given conf.Object and compares the output against the given lines
+        """
+
+        self.assertEqual(list(obj.fmt_lines()), lines)
+    
+    def test_literal (self) :
+        self.assert_obj(dhcpc.Literal("///"),                               [ "///" ])
+
+    def test_parameter (self) :
+        self.assert_obj(dhcpc.Parameter("param0", "this", 13, "that"),      [ "param0 this 13 that;" ])
+    
+    def test_declaration (self) :
+        self.assert_obj(dhcpc.Declaration("decl0", ["arg0", "arg1"], [
+            dhcpc.Parameter("param0")
+        ], [
+            dhcpc.Declaration("decl0.0", params=[
+                dhcpc.Parameter("param0.0.1", "value")
+            ])
+        ]),  [
+            
+            "decl0 arg0 arg1 {",
+            "\tparam0;",
+            "\tdecl0.0 {",
+            "\t\tparam0.0.1 value;",
+            "\t}",
+            "}",
+        ])
+    
+    def test_shared_network (self) :
+        self.assert_obj(dhcpc.SharedNetwork("net0", params=[
+            dhcpc.Parameter("param0")
+        ]), [
+            "shared-network net0 {",
+            "\tparam0;",
+            "}"
+        ])
+
+    def test_subnet (self) :
+        self.assert_obj(dhcpc.Subnet(addr.Network("194.197.235.0/24"), params=[
+            dhcpc.Parameter("param0")
+        ]), [
+            "subnet 194.197.235.0 netmask 255.255.255.0 {",
+            "\tparam0;",
+            "}"
+        ])
+
+    def test_group (self) :
+        self.assert_obj(dhcpc.Group(decls=[
+            dhcpc.Declaration("decl0.0", params=[
+                dhcpc.Parameter("param0.0.1", "value")
+            ])
+        ]), [
+            "group {",
+            "\tdecl0.0 {",
+            "\t\tparam0.0.1 value;",
+            "\t}",
+            "}"
+        ])
+    
+    def test_host (self) :
+        self.assert_obj(dhcpc.Host("test-hostname", params=[
+            dhcpc.Parameter("param0")
+        ]), [
+            "host test-hostname {",
+            "\tparam0;",
+            "}"
+        ])
+
+    def test_option (self) :
+        self.assert_obj(dhcpc.Option("foo", "example.com"), [
+            "option foo example.com;",
+        ])
+    
+if __name__ == '__main__' :
+    unittest.main()
+