aboutsummaryrefslogtreecommitdiff
path: root/rzc.py
blob: da1107df4f57849f5434d546737b7fbd3d7c9199 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3

"""
Reverse Zone Compiler (document this...)
"""

from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
from datetime import datetime, UTC
from pathlib import Path
from os import getenv
from sys import argv

from dns.rdatatype import A, AAAA, SOA, NS, PTR, CNAME
from dns.rdataclass import IN
from dns.node import NodeKind

import dns.rdtypes.ANY.CNAME
import dns.rdtypes.ANY.SOA
import dns.rdtypes.ANY.PTR
import dns.tsigkeyring
import dns.reversename
import dns.rdataclass
import dns.rdatatype
import dns.rdataset
import dns.query
import dns.rdata
import dns.name
import dns.zone

import toml

class Main:

    def __init__(self):
        self.jane = Path(argv[0]).resolve()

        ap = ArgumentParser(formatter_class = ArgumentDefaultsHelpFormatter)
        ap.add_argument("-c", "--config", type = FileType("r"),
                        default = getenv("RZC_CONFIG", f"{self.jane.stem}.toml"),
                        help = "TOML configuration file")
        args = ap.parse_args()

        self.cfg = toml.load(args.config, JinjaDict)
        self.now = datetime.now(UTC)

        self.output_dir = Path(self.cfg.output.directory)
        self.cache_dir  = Path(self.cfg.input.cache)

        self.fetch_input_zones()
        self.initialize_output_zones()
        self.populate_output_zones()
        self.write_output_zones()

    def fetch_input_zones(self):
        self.cache_dir.mkdir(parents = True, exist_ok = True)
        self.forward = []

        for iz in self.cfg.input.zone:
            origin = dns.name.from_text(iz.name)
            path = self.cache_dir / origin.to_text(omit_final_dot = True)
            try:
                with path.open("r") as f:
                    fz = dns.zone.from_file(f, origin = origin, relativize = False)
            except:
                fz = dns.zone.Zone(origin = origin, relativize = False)
            try:
                keyring = dns.tsigkeyring.from_text(iz.tsig) if iz.tsig else None
                query, serial = dns.xfr.make_query(txn_manager = fz, keyring = keyring)
                dns.query.inbound_xfr(where = iz.server, txn_manager = fz, query = query)
            except Exception as e:
                print(f"Unable to XFR {iz.name}: {e}")
                continue
            with path.open("w") as f:
                fz.to_file(f, sorted = True, relativize = False)
            print(f"Updated {iz.name}")
            self.forward.append(fz)

    def initialize_output_zones(self):
        self.reverse = []

        for oz in self.cfg.output.zones:
            rz = dns.zone.Zone(dns.name.from_text(oz), rdclass = IN, relativize = False)
            apex = rz.find_node(rz.origin, create = True)
            apex.replace_rdataset(
                dns.rdataset.from_rdata(
                    self.cfg.output.ttl,
                    dns.rdtypes.ANY.SOA.SOA(
                        IN, SOA,
                        dns.name.from_text(self.cfg.output.soa.mname),
                        dns.name.from_text(self.cfg.output.soa.rname),
                        int(self.now.timestamp()),
                        self.cfg.output.soa.refresh, self.cfg.output.soa.retry,
                        self.cfg.output.soa.expire, self.cfg.output.soa.minimum)))
            apex.replace_rdataset(
                dns.rdataset.from_text_list(
                    IN, NS, self.cfg.output.ttl, self.cfg.output.ns,
                    relativize = False, origin = dns.name.root))
            rz.check_origin()
            self.reverse.append(rz)

    def populate_output_zones(self):
        for fz in self.forward:
            for qtype in (A, AAAA):
                for name, ttl, addr in fz.iterate_rdatas(qtype):
                    name = name.derelativize(fz.origin)
                    rname = dns.reversename.from_address(addr.to_text())
                    rdata = dns.rdtypes.ANY.PTR.PTR(IN, PTR, name)
                    for rz in self.reverse:
                        if rname.is_subdomain(rz.origin):
                            rz.find_rdataset(rname, PTR, create = True).add(rdata, ttl)
                            break

    def write_output_zones(self):
        self.output_dir.mkdir(parents = True, exist_ok = True)
        for rz in self.reverse:
            path = self.output_dir / rz.origin.to_text(omit_final_dot = True)
            with path.open("w") as f:
                f.write(f";; Automatically generated {self.now} by {self.jane}.  Do not edit.\n\n")
                rz.to_file(f, sorted = True, relativize = False)
            print(f"Wrote {path}")

class JinjaDict(dict):
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            return None

if __name__ == "__main__":
    Main()