--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/dns/tests.py Fri Feb 27 14:06:03 2015 +0200
@@ -0,0 +1,182 @@
+import itertools
+import re
+import unittest
+
+from pvl.dns import zone
+from StringIO import StringIO
+
+class File(StringIO):
+ @classmethod
+ def lines (cls, *lines):
+ return cls('\n'.join(lines) + '\n')
+
+ def __init__(self, buffer, name='test.file'):
+ StringIO.__init__(self, buffer)
+ self.name = name
+
+class TestMixin(object):
+ def assertEqualWhitespace(self, value, expected):
+ # normalize
+ value = re.sub(r'\s+', ' ', value)
+
+ self.assertEqual(value, expected)
+
+ def assertZoneLineEqual(self, zl, expected):
+ self.assertEqual(unicode(zl), expected)
+
+ def assertZoneLinesEqual(self, zls, expected):
+ for zl, expect in itertools.izip_longest(zls, expected):
+ self.assertZoneLineEqual(zl, expect)
+
+ def assertZoneRecordEqual(self, zr, expect):
+ self.assertEqualWhitespace(unicode(zr), expect)
+
+ def assertZoneRecordsEqual(self, zrs, expected):
+ for zr, expect in itertools.izip_longest(zrs, expected):
+ self.assertZoneRecordEqual(zr, expect)
+
+class ZoneLineTest(TestMixin, unittest.TestCase):
+ def testZoneLine(self):
+ self.assertEqual(unicode(zone.ZoneLine(['foo', 'A', '192.0.2.1'])), "foo\tA\t192.0.2.1")
+
+ def testZoneLineParse(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo A 192.0.2.1")), ["foo\tA\t192.0.2.1"])
+
+ def testZoneLineParseWhitespace(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo \t A\t192.0.2.1")), ["foo\tA\t192.0.2.1"])
+
+ def testZoneLineParseComment(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo A 192.0.2.1 ; bar")), ["foo\tA\t192.0.2.1\t; bar"])
+
+ def testZoneLineParseQuoteSimple(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo TXT \"asdf quux\"")), ["foo\tTXT\tasdf quux"])
+
+ def testZoneLineParseQuoteTrailing(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo TXT \"asdf quux\" ok ; yay")), ["foo\tTXT\tasdf quux\tok\t; yay"])
+
+ def testZoneLineParseIndent(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File(" A 192.0.2.2")), ["\tA\t192.0.2.2"])
+
+ def testZoneLineParseMultilineSingle(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File("@ SOA ( 1 2 3 4 5 )")), ["@\tSOA\t1\t2\t3\t4\t5"])
+
+ def testZoneLineParseMultiline(self):
+ self.assertZoneLinesEqual(zone.ZoneLine.parse(File.lines("@ SOA (", "\t1\t", "2", "\t3\t\t4", "\t5 )")), ["@\tSOA\t1\t2\t3\t4\t5"])
+
+ def testZoneLineLoad(self):
+ self.assertZoneRecordsEqual(
+ zone.ZoneLine.load(File.lines(
+ "$TTL 3600",
+ " ",
+ "@ NS ns1",
+ " NS ns2",
+ "$ORIGIN asdf", # note lack of .
+ "foo A 192.0.2.1",
+ "$ORIGIN quux.test.", # note lack of .
+ "bar A 192.0.2.2",
+ )), [
+ "$TTL 3600",
+ "@ NS ns1",
+ " NS ns2",
+ "$ORIGIN asdf",
+ "foo A 192.0.2.1",
+ "$ORIGIN quux.test.",
+ "bar A 192.0.2.2",
+ ]
+ )
+
+class ZoneDirectiveTest(TestMixin, unittest.TestCase):
+ def testZoneDirective(self):
+ self.assertEqual(unicode(zone.ZoneDirective('ORIGIN', ['foo.'])), "$ORIGIN\tfoo.")
+
+ def testZoneDirectiveBuild(self):
+ self.assertEqual(unicode(zone.ZoneDirective.build('ORIGIN', 'foo.')), "$ORIGIN\tfoo.")
+
+ def testZoneDirectiveParse(self):
+ self.assertEqual(unicode(zone.ZoneDirective.parse(['$ORIGIN', 'foo.'])), "$ORIGIN\tfoo.")
+
+ def testZoneDirectiveParseUpper(self):
+ self.assertEqual(unicode(zone.ZoneDirective.parse(['$include', 'foo.zone'])), "$INCLUDE\tfoo.zone")
+
+ def testZoneDirectiveComment(self):
+ self.assertEqual(unicode(zone.ZoneDirective('ORIGIN', ['foo.'], comment="bar")), "$ORIGIN\tfoo.\t; bar")
+
+class ZoneRecordTest(TestMixin, unittest.TestCase):
+ def testZoneRecordShort(self):
+ rr = zone.ZoneRecord('test', None, None, 'A', ['192.0.2.1'])
+
+ self.assertZoneRecordEqual((rr), 'test A 192.0.2.1')
+
+ def testZoneRecordImplicit(self):
+ rr = zone.ZoneRecord(None, None, None, 'A', ['192.0.2.1'])
+
+ self.assertZoneRecordEqual((rr), ' A 192.0.2.1')
+
+ def testZoneRecordFull(self):
+ rr = zone.ZoneRecord('test', 60, 'IN', 'A', ['192.0.2.1'])
+
+ self.assertZoneRecordEqual((rr), 'test 60 IN A 192.0.2.1')
+
+ def testZoneRecordComment(self):
+ rr = zone.ZoneRecord('test', None, None, 'A', ['192.0.2.1'], comment='Testing')
+
+ self.assertZoneRecordEqual((rr), 'test A 192.0.2.1 ; Testing')
+
+ def testZoneRecordA(self):
+ self.assertZoneRecordEqual((zone.ZoneRecord.A('test', '192.0.2.1')), "test A 192.0.2.1")
+
+ def testZoneRecordAAAA(self):
+ self.assertZoneRecordEqual((zone.ZoneRecord.AAAA('test', '2001:db8::c000:201')), "test AAAA 2001:db8::c000:201")
+
+ def testZoneRecordCNAME(self):
+ self.assertZoneRecordEqual((zone.ZoneRecord.CNAME('test', 'test.example.net.')), "test CNAME test.example.net.")
+
+ def testZoneRecordTXT(self):
+ self.assertZoneRecordEqual((zone.ZoneRecord.TXT('test', u"Foo Bar")), u"test TXT \"Foo Bar\"")
+
+ def testZoneRecordPTR(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.PTR('1', 'test.example.com.'), "1 PTR test.example.com.")
+
+ def testZoneRecordMX(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.MX('@', 10, 'mail1'), "@ MX 10 mail1")
+
+ def testZoneRecordBuildTTL(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.A('test', '192.0.2.1', ttl=60), "test 60 A 192.0.2.1")
+
+ def testZoneRecordBuildZeroTTL(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.A('test', '192.0.2.1', ttl=0), "test 0 A 192.0.2.1")
+
+ def testZoneRecordBuildImplicit(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.A(None, '192.0.2.1'), " A 192.0.2.1")
+
+ def testZoneRecordBuildCls(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.A('foo', '192.0.2.1', cls='in'), "foo IN A 192.0.2.1")
+
+ def testZoneRecordParse(self):
+ self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', 'A', '192.0.2.1']), "test A 192.0.2.1")
+ self.assertZoneRecordEqual(zone.ZoneRecord.parse(['', 'A', '192.0.2.1']), " A 192.0.2.1")
+ self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', '60', 'A', '192.0.2.1']), "test 60 A 192.0.2.1")
+ self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', '60', 'in', 'A', '192.0.2.1']), "test 60 IN A 192.0.2.1")
+
+ def testZoneRecordParseError(self):
+ with self.assertRaises(zone.ZoneLineError):
+ self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', 'A']), None)
+
+ def testZoneRecordLoad(self):
+ self.assertZoneRecordsEqual(
+ zone.ZoneRecord.load(File.lines(
+ "$TTL 3600",
+ " ",
+ "@ NS ns1",
+ " NS ns2",
+ "$ORIGIN asdf", # relative
+ "foo A 192.0.2.1",
+ "$ORIGIN quux.test.", # absolute
+ "bar A 192.0.2.2",
+ ), 'test'), [
+ "@ 3600 NS ns1",
+ "@ 3600 NS ns2",
+ "foo 3600 A 192.0.2.1", # asdf.test
+ "bar 3600 A 192.0.2.2", # quux.test
+ ]
+ )
--- a/pvl/dns/zone.py Fri Feb 27 14:05:39 2015 +0200
+++ b/pvl/dns/zone.py Fri Feb 27 14:06:03 2015 +0200
@@ -6,9 +6,11 @@
import codecs
import datetime
+import ipaddr
import logging
import math
import os.path
+import pvl.dns.labels
log = logging.getLogger('pvl.dns.zone')
@@ -23,35 +25,19 @@
def __init__ (self, line, msg, *args, **kwargs) :
super(ZoneLineError, self).__init__("%s: %s" % (line, msg.format(*args, **kwargs)))
-def process_generate (line, origin, parts) :
+def zone_quote (field):
"""
- Process a
- $GENERATE <start>-<stop>[/<step>] lhs [ttl] [class] type rhs [comment]
- directive into a series of ZoneResource's.
+ Quote a value for inclusion into TXT record.
- Raises ZoneLineError
+ >>> print zone_quote("foo")
+ "foo"
+ >>> print zone_quote("foo\\bar")
+ "foo\\bar"
+ >>> print zone_quote("foo \"bar\" quux")
+ "foo \"bar\" quux"
"""
- parts = list(parts)
-
- try:
- range = parse_generate_range(parts.pop(0))
-
- lhs_func = parse_generate_field(parts.pop(0), line=line)
- rhs_func = parse_generate_field(parts.pop(-1), line=line)
- except ValueError as error:
- raise ZoneLineError(line, "{error}", error=error)
-
- body = parts
-
- for i in range :
- # build
- parts = [lhs_func(i)] + body + [rhs_func(i)]
-
- log.debug(" %03d: %r", i, parts)
-
- # parse
- yield ZoneRecord.parse(line, parts=parts, origin=origin)
+ return u'"' + field.replace('\\', '\\\\').replace('"', '\\"') + u'"'
class ZoneLine (object) :
@@ -60,87 +46,26 @@
"""
@classmethod
- def load (cls, file, ttl=None, origin=None, expand_generate=False, expand_include=False, **opts) :
+ def load (cls, file) :
"""
- Parse ZoneLine, ZoneRecord items from the given zonefile.
+ Load (ZoneDirective or ZoneRecord) items from the given zonefile.
+
+ Tracks $ORIGIN and $TTL state for ZoneRecords/ZoneDirectives.
"""
- name = None
-
- for line in cls.parse(file, **opts) :
- if not line.parts :
- log.debug("%s: skip empty line", line)
-
- elif line.line.startswith('$') :
- # control record
- directive = ZoneDirective.parse(line,
- origin = origin,
- comment = line.comment,
- )
-
- if directive.directive == 'ORIGIN' :
- # update
- origin, = directive.arguments
-
- log.info("%s: origin: %s", line, origin)
-
- yield line, None
-
- elif directive.directive == 'TTL' :
- ttl, = directive.arguments
-
- log.info("%s: ttl: %s", line, ttl)
-
- yield line, None
-
- elif directive.directive == 'GENERATE' :
- if expand_generate :
- # process...
- log.info("%s: generate: %s", line, directive.arguments)
-
- for record in process_generate(line, origin, directive.arguments) :
- yield line, record
- else :
- yield line, None
+ for line in cls.parse(file) :
+ if line.parts[0].startswith('$'):
+ # control directive
+ line_type = ZoneDirective
- elif directive.directive == 'INCLUDE' :
- if expand_include :
- include, = directive.arguments
-
- path = os.path.join(os.path.dirname(file.name), include)
-
- log.info("%s: include: %s: %s", line, include, path)
-
- for record in cls.load(open(path)) :
- yield line, record
- else :
- yield line, None
-
- else :
- log.warn("%s: skip unknown control record: %r", line, directive)
- yield line, None
-
else :
- # normal record?
- record = ZoneRecord.parse(line,
- name = name,
- origin = origin,
- ttl = ttl,
- comment = line.comment,
- )
+ # normal record
+ line_type = ZoneRecord
- if record :
- yield line, record
-
- # keep name across lines
- name = record.name
-
- else :
- # unknown
- log.warning("%s: skip unknown line: %s", line, line.line)
-
- yield line, None
-
+ yield line_type.parse(line.parts,
+ line = line,
+ comment = line.comment,
+ )
@classmethod
def parse (cls, file, filename=None):
@@ -163,6 +88,10 @@
indent = raw_line.startswith(' ') or raw_line.startswith('\t')
line = raw_line.strip()
+ if not line:
+ # empty line
+ continue
+
# parse comment
if ';' in line:
line, comment = line.split(';', 1)
@@ -175,12 +104,17 @@
log.debug("%s:%d: indent=%r, line=%r, comment=%r", filename, lineno, indent, line, comment)
# split (quoted) fields
+ if indent and not multiline_start:
+ parts = ['']
+ else:
+ parts = []
+
if '"' in line :
pre, data, post = line.split('"', 2)
- parts = pre.split() + [data] + post.split()
+ parts.extend(pre.split() + [data] + post.split())
else :
- parts = line.split()
+ parts.extend(line.split())
# handle multi-line statements...
if '(' in parts :
@@ -188,7 +122,7 @@
log.debug("%s:%d: Start of multi-line statement: %s", filename, lineno, line)
- multiline_start = (lineno, indent, comment)
+ multiline_start = (lineno, comment)
multiline_line = raw_line
multiline_parts = []
@@ -204,7 +138,7 @@
log.debug("%s:%d: End of multi-line statement: %s", filename, lineno, line)
- lineno, indent, comment = multiline_start
+ lineno, comment = multiline_start
raw_line = multiline_line
parts = multiline_parts
@@ -214,40 +148,39 @@
if multiline_start:
pass
else:
- yield ZoneLine(filename, lineno, raw_line, indent, parts, comment)
-
- file = None
- lineno = None
+ yield ZoneLine(parts,
+ file = filename,
+ lineno = lineno,
+ line = raw_line,
+ comment = comment,
+ )
- # data
- indent = None # was the line indented?
- parts = None # split line fields
+ def __init__ (self, parts, file=None, lineno=None, line=None, comment=None) :
+ """
+ parts : [str] - list of parsed (quoted) fields
+ if the line has a leading indent, the first field should be an empty string
+ """
- # optional
- comment = None
-
- def __init__ (self, file, lineno, line, indent, parts, comment=None) :
+ self.parts = parts
+
# source
self.file = file
self.lineno = lineno
self.line = line
-
- # parse data
- self.indent = indent
- self.parts = parts
-
- # metadata
+
+ # meta
self.comment = comment
def __unicode__ (self) :
- return u"{indent}{parts}".format(
- indent = u"\t" if self.indent else '',
+ return u"{parts}{comment}".format(
parts = u'\t'.join(self.parts),
+ comment = u'\t; ' + self.comment if self.comment is not None else '',
)
def __repr__ (self) :
return "{file}:{lineno}".format(file=self.file, lineno=self.lineno)
+
class ZoneDirective (object) :
"""
An $DIRECTIVE in a zonefile.
@@ -262,35 +195,46 @@
arguments = None
@classmethod
- def parse (cls, line, **opts) :
- # control record
- args = list(line.parts)
- directive = args[0][1:]
- arguments = args[1:]
+ def parse (cls, parts, **opts) :
+ """
+ Parse from ZoneLine.parts
+ """
- return cls.build(line, directive, *arguments, **opts)
+ directive = parts[0][1:].upper()
+ arguments = parts[1:]
+
+ return cls(directive, arguments, **opts)
@classmethod
- def build (cls, line, directive, *arguments, **opts) :
- return cls(directive, arguments,
- line = line,
- **opts
- )
+ def build (cls, directive, *arguments, **opts):
+ """
+ Build directive from optional parts
+ """
- def __init__ (self, directive, arguments, line=None, origin=None, comment=None) :
+ return cls(directive, arguments, **opts)
+
+ def __init__ (self, directive, arguments, comment=None, line=None, origin=None):
+ """
+ directive - uppercase directive name, withtout leading $
+ arguments [str] - list of directive arguments
+ comment - optional trailing comment
+ line - optional associated ZoneLine
+ origin - context origin
+ """
+
self.directive = directive
self.arguments = arguments
+ self.comment = comment
self.line = line
self.origin = origin
- self.comment = comment
- def __unicode__ (self) :
+ def __unicode__ (self):
"""
Construct a zonefile-format line...
"""
- if self.comment :
+ if self.comment:
comment = '\t; ' + self.comment
else :
comment = ''
@@ -303,131 +247,270 @@
def __repr__ (self) :
return '%s(%s)' % (self.__class__.__name__, ', '.join(repr(arg) for arg in (
- (self.directive) + tuple(self.arguments)
+ (self.directive, ) + tuple(self.arguments)
)))
+def process_generate (line, range, parts, **opts) :
+ """
+ Parse a
+ $GENERATE <start>-<stop>[/<step>] lhs [ttl] [class] type rhs [comment]
+ directive and yield expanded ZoneRecords.
+
+ Raises ZoneLineError
+ """
+
+
+ try:
+ range = pvl.dns.generate.parse_generate_range(range)
+
+ lhs_func = pvl.dns.generate.parse_generate_field(parts[0], line=line)
+ body = parts[1:-1]
+ rhs_func = pvl.dns.generate.parse_generate_field(parts[-1], line=line)
+
+ except ValueError as error:
+ raise ZoneLineError(line, "{error}", error=error)
+
+ log.info("%s: generate %s: %s ... %s", line, range, lhs_func, rhs_func)
+
+ for i in range :
+ # build
+ parts = [lhs_func(i)] + body + [rhs_func(i)]
+
+ log.debug(" %03d: %r", i, parts)
+
+ yield ZoneRecord.parse(parts, line=line, **opts)
+
+def process_include (line, origin, include_filename, include_origin=None, **opts):
+ """
+ Parse a
+ $INCLUDE filename [ origin ]
+ directive and yield expanded ZoneRecords.
+
+ Raises ZoneLineError.
+ """
+
+ path = os.path.join(os.path.dirname(file.name), include_include)
+
+ if include_origin:
+ origin = pvl.dns.labels.join(include_origin, origin)
+
+ for record in ZoneRecord.load(open(path), origin, **opts) :
+ yield line, record
+
class ZoneRecord (object) :
"""
A record from a zonefile.
"""
-
- # context
- line = None # the underlying line
- origin = None # possible $ORIGIN context
-
- # record fields
- name = None
- ttl = None # optional
- cls = None # optional
- type = None
- data = None # list of data fields
-
- @classmethod
- def load (cls, file, **opts) :
- """
- Yield ZoneRecords from a file.
- """
-
- for line, record in ZoneLine.load(file, **opts) :
- if record :
- yield record
- else :
- log.warn("%s: unparsed line: %s", file.name, line)
@classmethod
- def parse (cls, line, name=None, parts=None, ttl=None, **opts) :
+ def load (cls, file, origin,
+ ttl=None,
+ ):
"""
- Build a ZoneRecord from a ZoneLine.
+ Yield ZoneRecords from a file. Processes any ZoneDirectives.
+ """
+
+ name = None
- name - default for name, if continuing previous line
+ for line in ZoneLine.parse(file):
+ if line.parts[0].startswith('$'):
+ directive = ZoneDirective.parse(line.parts,
+ origin = origin,
+ line = line,
+ comment = line.comment,
+ )
+
+ log.debug("%s: %s", line, directive)
+
+ if directive.directive == 'ORIGIN':
+ directive_origin, = directive.arguments
+
+ log.info("%s: $ORIGIN %s -> %s", file, origin, directive_origin)
+
+ origin = pvl.dns.labels.join(origin, directive_origin)
+
+ elif directive.directive == 'TTL' :
+ directive_ttl, = directive.arguments
+
+ log.info("%s: $TTL %d -> %s", file, ttl, directive_ttl)
+
+ ttl = int(directive_ttl)
+
+ elif directive.directive == 'GENERATE' :
+ for record in process_generate(line, directive.arguments,
+ name = name,
+ ttl = ttl,
+ origin = origin,
+ ) :
+ yield record
+
+ elif directive.directive == 'INCLUDE' :
+ for record in process_include(line, origin,
+ ttl = ttl,
+ *directive.arguments
+ ):
+ yield record
+
+ else :
+ log.warn("%s: skip unknown control record: %r", line, directive)
+ yield line, None
+
+ else:
+ record = ZoneRecord.parse(line.parts,
+ name = name,
+ ttl = ttl,
+ comment = line.comment,
+ line = line,
+ origin = origin,
+ )
+
+ log.debug("%s: %s", line, record)
+
+ # keep name across lines
+ name = record.name
+
+ yield record
+
+ @classmethod
+ def parse (cls, parts, name=None, ttl=None, line=None, **opts) :
+ """
+ Build a ZoneRecord from ZoneLine.parts.
+
+ name - context for name, if continuing previous line
+ ttl - context for ttl, if using $TTL
Return: (name, ZoneRecord)
"""
-
- if parts is None :
- parts = list(line.parts)
-
- if not parts :
- # skip
- return
- if line.indent :
- # indented lines keep name from previous record
- pass
+ parts = list(parts)
- else :
- name = parts.pop(0)
+ # first field is either name or leading whitespace
+ leading = parts.pop(0)
+ if leading:
+ name = leading
+
if len(parts) < 2 :
- raise ZoneLineError(line, "Too few parts to parse: {0!r}", line.data)
+ raise ZoneLineError(line, "Too few parts to parse: {parts!r}", parts=parts)
# parse ttl/cls/type
_cls = None
if parts and parts[0][0].isdigit() :
- ttl = parts.pop(0)
+ ttl = int(parts.pop(0))
if parts and parts[0].upper() in ('IN', 'CH') :
- _cls = parts.pop(0)
+ _cls = parts.pop(0).upper()
# always have type
- type = parts.pop(0)
+ type = parts.pop(0).upper()
# remaining parts are data
data = parts
log.debug(" ttl=%r, cls=%r, type=%r, data=%r", ttl, _cls, type, data)
- return cls.build(line, name, ttl, _cls, type, data, **opts)
+ return cls(name, ttl, _cls, type, data, line=line, **opts)
@classmethod
- def build (cls, line, name, ttl, _cls, type, data, **opts) :
- return cls(name, type, data,
- ttl = ttl,
- cls = _cls,
- line = line,
- **opts
- )
+ def build (_cls, name, type, *data, **opts):
+ """
+ Simple interface to build ZoneRecord from required parts. All optional fields must be given as keyword arguments.
- @classmethod
- def A (cls, name, ip4, **opts) :
- return cls(str(name), 'A', [str(ip4)], **opts)
+ Normalizes all fields to strs.
+ """
+
+ # keyword-only arguments
+ ttl = opts.pop('ttl', None)
+ cls = opts.pop('cls', None)
- @classmethod
- def AAAA (cls, name, ip6, **opts) :
- return cls(str(name), 'AAAA', [str(ip6)], **opts)
+ if name:
+ name = str(name)
+ else:
+ name = None
+
+ if ttl or ttl == 0:
+ ttl = int(ttl)
+ else:
+ ttl = None
+
+ if cls:
+ cls = cls.upper()
+ else:
+ cls = None
+
+ type = type.upper()
+
+ data = [unicode(item) for item in data]
+
+ return _cls(name, ttl, cls, type, data, **opts)
@classmethod
- def CNAME (cls, name, host, **opts) :
- return cls(str(name), 'CNAME', [str(host)], **opts)
+ def A (_cls, name, ip4, **opts):
+ """
+ Build from ipaddr.IPv4Address.
+ """
+
+ return _cls.build(name, 'A', ipaddr.IPv4Address(ip4), **opts)
@classmethod
- def TXT (cls, name, text, **opts) :
- return cls(str(name), 'TXT',
- [u'"{0}"'.format(text.replace('"', '\\"'))],
- **opts
- )
+ def AAAA (_cls, name, ip6, **opts):
+ """
+ Build from ipaddr.IPv6Address.
+ """
+
+ return _cls.build(name, 'AAAA', ipaddr.IPv6Address(ip6), **opts)
@classmethod
- def PTR (cls, name, ptr, **opts) :
- return cls(str(name), 'PTR', [str(ptr)], **opts)
+ def CNAME (_cls, name, alias, **opts):
+ return _cls.build(name, 'CNAME', alias, **opts)
@classmethod
- def MX (cls, name, priority, mx, **opts) :
- return cls(str(name), 'MX', [int(priority), str(mx)], **opts)
+ def TXT (_cls, name, text, **opts):
+ """
+ Build from quoted (unicode) value.
+ """
- def __init__ (self, name, type, data, ttl=None, cls=None, line=None, origin=None, comment=None) :
+ return _cls.build(name, 'TXT', zone_quote(unicode(text)), **opts)
+
+ @classmethod
+ def PTR (_cls, name, ptr, **opts):
+ return _cls.build(name, 'PTR', ptr, **opts)
+
+ @classmethod
+ def MX (_cls, name, priority, mx, **opts):
+ """
+ priority : int
+ mx : str - hostname
+ """
+
+ return _cls.build(name, 'MX', int(priority), str(mx), **opts)
+
+ def __init__ (self, name, ttl, cls, type, data, comment=None, origin=None, line=None):
+ """
+ Using strict field ordering.
+
+ name - local label with respect to $ORIGIN
+ may also be @ to refer to $ORIGIN
+ ttl - int TTL for record, default to implict $TTL
+ cls - uppercase class for record, default to IN
+ type - uppercase type for record, required
+ data [...] - list of data fields, interpretation varies by type
+ comment - optional comment to include in zone
+ origin - track implicit $ORIGIN or XXX: previous record state
+ line - associated ZoneLine
+ """
+
self.name = name
+ self.ttl = ttl
+ self.cls = cls
self.type = type
self.data = data
- self.ttl = ttl
- self.cls = cls
-
+ self.comment = comment
+ self.origin = origin
self.line = line
- self.origin = origin
- self.comment = comment
def __unicode__ (self) :
"""
@@ -440,9 +523,9 @@
comment = ''
return u"{name:25} {ttl:4} {cls:2} {type:5} {data}{comment}".format(
- name = self.name or '',
- ttl = self.ttl or '',
- cls = self.cls or '',
+ name = '' if self.name is None else self.name,
+ ttl = '' if self.ttl is None else self.ttl,
+ cls = '' if self.cls is None else self.cls,
type = self.type,
data = ' '.join(unicode(data) for data in self.data),
comment = comment,
@@ -450,7 +533,7 @@
def __repr__ (self) :
return '%s(%s)' % (self.__class__.__name__, ', '.join(repr(arg) for arg in (
- self.name, self.type, self.data
+ self.name, self.ttl, self.cls, self.type, self.data
)))
class SOA (ZoneRecord) :