diff options
Diffstat (limited to 'rzc.py')
-rwxr-xr-x | rzc.py | 130 |
1 files changed, 130 insertions, 0 deletions
@@ -0,0 +1,130 @@ +#!/usr/bin/env python3 + +""" +Reverse Zone Compiler (document this...) +""" + +from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter +from datetime import datetime, UTC +from pathlib import Path +from os import getenv +from sys import argv + +from dns.rdatatype import A, AAAA, SOA, NS, PTR, CNAME +from dns.rdataclass import IN +from dns.node import NodeKind + +import dns.rdtypes.ANY.CNAME +import dns.rdtypes.ANY.SOA +import dns.rdtypes.ANY.PTR +import dns.tsigkeyring +import dns.reversename +import dns.rdataclass +import dns.rdatatype +import dns.rdataset +import dns.query +import dns.rdata +import dns.name +import dns.zone + +import toml + +class Main: + + def __init__(self): + self.jane = Path(argv[0]).resolve() + + ap = ArgumentParser(formatter_class = ArgumentDefaultsHelpFormatter) + ap.add_argument("-c", "--config", type = FileType("r"), + default = getenv("RZC_CONFIG", f"{self.jane.stem}.toml"), + help = "TOML configuration file") + args = ap.parse_args() + + self.cfg = toml.load(args.config, JinjaDict) + self.now = datetime.now(UTC) + + self.output_dir = Path(self.cfg.output.directory) + self.cache_dir = Path(self.cfg.input.cache) + + self.fetch_input_zones() + self.initialize_output_zones() + self.populate_output_zones() + self.write_output_zones() + + def fetch_input_zones(self): + self.cache_dir.mkdir(parents = True, exist_ok = True) + self.forward = [] + + for iz in self.cfg.input.zone: + origin = dns.name.from_text(iz.name) + path = self.cache_dir / origin.to_text(omit_final_dot = True) + try: + with path.open("r") as f: + fz = dns.zone.from_file(f, origin = origin, relativize = False) + except: + fz = dns.zone.Zone(origin = origin, relativize = False) + try: + keyring = dns.tsigkeyring.from_text(iz.tsig) if iz.tsig else None + query, serial = dns.xfr.make_query(txn_manager = fz, keyring = keyring) + dns.query.inbound_xfr(where = iz.server, txn_manager = fz, query = query) + except Exception as e: + print(f"Unable to XFR {iz.name}: {e}") + continue + with path.open("w") as f: + fz.to_file(f, sorted = True, relativize = False) + print(f"Updated {iz.name}") + self.forward.append(fz) + + def initialize_output_zones(self): + self.reverse = [] + + for oz in self.cfg.output.zones: + rz = dns.zone.Zone(dns.name.from_text(oz), rdclass = IN, relativize = False) + apex = rz.find_node(rz.origin, create = True) + apex.replace_rdataset( + dns.rdataset.from_rdata( + self.cfg.output.ttl, + dns.rdtypes.ANY.SOA.SOA( + IN, SOA, + dns.name.from_text(self.cfg.output.soa.mname), + dns.name.from_text(self.cfg.output.soa.rname), + int(self.now.timestamp()), + self.cfg.output.soa.refresh, self.cfg.output.soa.retry, + self.cfg.output.soa.expire, self.cfg.output.soa.minimum))) + apex.replace_rdataset( + dns.rdataset.from_text_list( + IN, NS, self.cfg.output.ttl, self.cfg.output.ns, + relativize = False, origin = dns.name.root)) + rz.check_origin() + self.reverse.append(rz) + + def populate_output_zones(self): + for fz in self.forward: + for qtype in (A, AAAA): + for name, ttl, addr in fz.iterate_rdatas(qtype): + name = name.derelativize(fz.origin) + rname = dns.reversename.from_address(addr.to_text()) + rdata = dns.rdtypes.ANY.PTR.PTR(IN, PTR, name) + for rz in self.reverse: + if rname.is_subdomain(rz.origin): + rz.find_rdataset(rname, PTR, create = True).add(rdata, ttl) + break + + def write_output_zones(self): + self.output_dir.mkdir(parents = True, exist_ok = True) + for rz in self.reverse: + path = self.output_dir / rz.origin.to_text(omit_final_dot = True) + with path.open("w") as f: + f.write(f";; Automatically generated {self.now} by {self.jane}. Do not edit.\n\n") + rz.to_file(f, sorted = True, relativize = False) + print(f"Wrote {path}") + +class JinjaDict(dict): + def __getattr__(self, name): + try: + return self[name] + except KeyError: + return None + +if __name__ == "__main__": + Main() |