tero@639: import itertools tero@639: import re tero@639: import unittest tero@639: tero@641: from pvl.dns import zone, process tero@639: from StringIO import StringIO tero@639: tero@639: class File(StringIO): tero@639: @classmethod tero@639: def lines (cls, *lines): tero@639: return cls('\n'.join(lines) + '\n') tero@639: tero@639: def __init__(self, buffer, name='test.file'): tero@639: StringIO.__init__(self, buffer) tero@639: self.name = name tero@639: tero@639: class TestMixin(object): tero@639: def assertEqualWhitespace(self, value, expected): tero@639: # normalize tero@639: value = re.sub(r'\s+', ' ', value) tero@639: tero@639: self.assertEqual(value, expected) tero@639: tero@639: def assertZoneLineEqual(self, zl, expected): tero@639: self.assertEqual(unicode(zl), expected) tero@639: tero@639: def assertZoneLinesEqual(self, zls, expected): tero@639: for zl, expect in itertools.izip_longest(zls, expected): tero@639: self.assertZoneLineEqual(zl, expect) tero@639: tero@641: def assertZoneItemEqual(self, zr, expect): tero@639: self.assertEqualWhitespace(unicode(zr), expect) tero@639: tero@641: def assertZoneEqual(self, zrs, expected): tero@639: for zr, expect in itertools.izip_longest(zrs, expected): tero@641: self.assertZoneItemEqual(zr, expect) tero@641: tero@641: assertZoneRecordEqual = assertZoneItemEqual tero@641: assertZoneRecordsEqual = assertZoneEqual tero@639: tero@639: class ZoneLineTest(TestMixin, unittest.TestCase): tero@639: def testZoneLine(self): tero@639: self.assertEqual(unicode(zone.ZoneLine(['foo', 'A', '192.0.2.1'])), "foo\tA\t192.0.2.1") tero@639: tero@639: def testZoneLineParse(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo A 192.0.2.1")), ["foo\tA\t192.0.2.1"]) tero@639: tero@639: def testZoneLineParseWhitespace(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo \t A\t192.0.2.1")), ["foo\tA\t192.0.2.1"]) tero@639: tero@639: def testZoneLineParseComment(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo A 192.0.2.1 ; bar")), ["foo\tA\t192.0.2.1\t; bar"]) tero@639: tero@639: def testZoneLineParseQuoteSimple(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo TXT \"asdf quux\"")), ["foo\tTXT\tasdf quux"]) tero@639: tero@639: def testZoneLineParseQuoteTrailing(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("foo TXT \"asdf quux\" ok ; yay")), ["foo\tTXT\tasdf quux\tok\t; yay"]) tero@639: tero@639: def testZoneLineParseIndent(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File(" A 192.0.2.2")), ["\tA\t192.0.2.2"]) tero@639: tero@639: def testZoneLineParseMultilineSingle(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File("@ SOA ( 1 2 3 4 5 )")), ["@\tSOA\t1\t2\t3\t4\t5"]) tero@639: tero@639: def testZoneLineParseMultiline(self): tero@639: self.assertZoneLinesEqual(zone.ZoneLine.parse(File.lines("@ SOA (", "\t1\t", "2", "\t3\t\t4", "\t5 )")), ["@\tSOA\t1\t2\t3\t4\t5"]) tero@639: tero@639: def testZoneLineLoad(self): tero@639: self.assertZoneRecordsEqual( tero@639: zone.ZoneLine.load(File.lines( tero@639: "$TTL 3600", tero@639: " ", tero@639: "@ NS ns1", tero@639: " NS ns2", tero@639: "$ORIGIN asdf", # note lack of . tero@639: "foo A 192.0.2.1", tero@639: "$ORIGIN quux.test.", # note lack of . tero@639: "bar A 192.0.2.2", tero@639: )), [ tero@639: "$TTL 3600", tero@639: "@ NS ns1", tero@639: " NS ns2", tero@639: "$ORIGIN asdf", tero@639: "foo A 192.0.2.1", tero@639: "$ORIGIN quux.test.", tero@639: "bar A 192.0.2.2", tero@639: ] tero@639: ) tero@639: tero@639: class ZoneDirectiveTest(TestMixin, unittest.TestCase): tero@639: def testZoneDirective(self): tero@639: self.assertEqual(unicode(zone.ZoneDirective('ORIGIN', ['foo.'])), "$ORIGIN\tfoo.") tero@639: tero@639: def testZoneDirectiveBuild(self): tero@639: self.assertEqual(unicode(zone.ZoneDirective.build('ORIGIN', 'foo.')), "$ORIGIN\tfoo.") tero@639: tero@639: def testZoneDirectiveParse(self): tero@639: self.assertEqual(unicode(zone.ZoneDirective.parse(['$ORIGIN', 'foo.'])), "$ORIGIN\tfoo.") tero@639: tero@639: def testZoneDirectiveParseUpper(self): tero@639: self.assertEqual(unicode(zone.ZoneDirective.parse(['$include', 'foo.zone'])), "$INCLUDE\tfoo.zone") tero@639: tero@639: def testZoneDirectiveComment(self): tero@639: self.assertEqual(unicode(zone.ZoneDirective('ORIGIN', ['foo.'], comment="bar")), "$ORIGIN\tfoo.\t; bar") tero@639: tero@639: class ZoneRecordTest(TestMixin, unittest.TestCase): tero@639: def testZoneRecordShort(self): tero@639: rr = zone.ZoneRecord('test', None, None, 'A', ['192.0.2.1']) tero@639: tero@639: self.assertZoneRecordEqual((rr), 'test A 192.0.2.1') tero@639: tero@639: def testZoneRecordImplicit(self): tero@639: rr = zone.ZoneRecord(None, None, None, 'A', ['192.0.2.1']) tero@639: tero@639: self.assertZoneRecordEqual((rr), ' A 192.0.2.1') tero@639: tero@639: def testZoneRecordFull(self): tero@639: rr = zone.ZoneRecord('test', 60, 'IN', 'A', ['192.0.2.1']) tero@639: tero@639: self.assertZoneRecordEqual((rr), 'test 60 IN A 192.0.2.1') tero@639: tero@639: def testZoneRecordComment(self): tero@639: rr = zone.ZoneRecord('test', None, None, 'A', ['192.0.2.1'], comment='Testing') tero@639: tero@639: self.assertZoneRecordEqual((rr), 'test A 192.0.2.1 ; Testing') tero@639: tero@639: def testZoneRecordA(self): tero@639: self.assertZoneRecordEqual((zone.ZoneRecord.A('test', '192.0.2.1')), "test A 192.0.2.1") tero@639: tero@639: def testZoneRecordAAAA(self): tero@639: self.assertZoneRecordEqual((zone.ZoneRecord.AAAA('test', '2001:db8::c000:201')), "test AAAA 2001:db8::c000:201") tero@639: tero@639: def testZoneRecordCNAME(self): tero@639: self.assertZoneRecordEqual((zone.ZoneRecord.CNAME('test', 'test.example.net.')), "test CNAME test.example.net.") tero@639: tero@639: def testZoneRecordTXT(self): tero@639: self.assertZoneRecordEqual((zone.ZoneRecord.TXT('test', u"Foo Bar")), u"test TXT \"Foo Bar\"") tero@639: tero@639: def testZoneRecordPTR(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.PTR('1', 'test.example.com.'), "1 PTR test.example.com.") tero@639: tero@639: def testZoneRecordMX(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.MX('@', 10, 'mail1'), "@ MX 10 mail1") tero@639: tero@639: def testZoneRecordBuildTTL(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.A('test', '192.0.2.1', ttl=60), "test 60 A 192.0.2.1") tero@639: tero@639: def testZoneRecordBuildZeroTTL(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.A('test', '192.0.2.1', ttl=0), "test 0 A 192.0.2.1") tero@639: tero@639: def testZoneRecordBuildImplicit(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.A(None, '192.0.2.1'), " A 192.0.2.1") tero@639: tero@639: def testZoneRecordBuildCls(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.A('foo', '192.0.2.1', cls='in'), "foo IN A 192.0.2.1") tero@639: tero@639: def testZoneRecordParse(self): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', 'A', '192.0.2.1']), "test A 192.0.2.1") tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.parse(['', 'A', '192.0.2.1']), " A 192.0.2.1") tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', '60', 'A', '192.0.2.1']), "test 60 A 192.0.2.1") tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', '60', 'in', 'A', '192.0.2.1']), "test 60 IN A 192.0.2.1") tero@639: tero@639: def testZoneRecordParseError(self): tero@639: with self.assertRaises(zone.ZoneLineError): tero@639: self.assertZoneRecordEqual(zone.ZoneRecord.parse(['test', 'A']), None) tero@639: tero@639: def testZoneRecordLoad(self): tero@639: self.assertZoneRecordsEqual( tero@639: zone.ZoneRecord.load(File.lines( tero@639: "$TTL 3600", tero@639: " ", tero@639: "@ NS ns1", tero@639: " NS ns2", tero@639: "$ORIGIN asdf", # relative tero@639: "foo A 192.0.2.1", tero@639: "$ORIGIN quux.test.", # absolute tero@639: "bar A 192.0.2.2", tero@639: ), 'test'), [ tero@639: "@ 3600 NS ns1", tero@639: "@ 3600 NS ns2", tero@639: "foo 3600 A 192.0.2.1", # asdf.test tero@639: "bar 3600 A 192.0.2.2", # quux.test tero@639: ] tero@639: ) tero@641: tero@641: class TestProcessZone(TestMixin, unittest.TestCase): tero@641: def testZoneRecordLoad(self): tero@641: rrs = zone.ZoneLine.load(File(""" tero@641: $TTL 3600 tero@641: @ SOA foo.test. hostmaster.test. ( tero@641: 0 ; serial tero@641: 1d ; refresh tero@641: 5m ; retry tero@641: 10d ; expiry tero@641: 300 ; negative tero@641: ) tero@641: tero@641: NS foo tero@641: NS bar tero@641: tero@641: foo A 192.0.2.1 tero@641: bar A 192.0.2.2 tero@641: tero@641: $INCLUDE "includes/test" tero@641: """)) tero@716: tero@716: include_trace = [ ] tero@641: rrs = list(process.zone_serial(rrs, 1337)) tero@716: rrs = list(process.zone_includes(rrs, '...', include_trace)) tero@641: tero@641: self.assertZoneEqual(rrs, [ tero@641: "$TTL 3600", tero@641: "@ SOA foo.test. hostmaster.test. 1337 1d 5m 10d 300", tero@641: " NS foo", tero@641: " NS bar", tero@641: "foo A 192.0.2.1", tero@641: "bar A 192.0.2.2", tero@647: "$INCLUDE .../includes/test", tero@641: ]) tero@716: self.assertEqual(include_trace, [ tero@716: '.../includes/test', tero@716: ]) tero@641: