#!/usr/bin/env python3 """ Reverse Zone Compiler (document this...) """ from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter from datetime import datetime, UTC from pathlib import Path from os import getenv from sys import argv from dns.rdatatype import A, AAAA, SOA, NS, PTR, CNAME from dns.rdataclass import IN from dns.node import NodeKind import dns.rdtypes.ANY.CNAME import dns.rdtypes.ANY.SOA import dns.rdtypes.ANY.PTR import dns.tsigkeyring import dns.reversename import dns.rdataclass import dns.rdatatype import dns.rdataset import dns.query import dns.rdata import dns.name import dns.zone import toml class Main: def __init__(self): self.jane = Path(argv[0]).resolve() ap = ArgumentParser(formatter_class = ArgumentDefaultsHelpFormatter) ap.add_argument("-c", "--config", type = FileType("r"), default = getenv("RZC_CONFIG", f"{self.jane.stem}.toml"), help = "TOML configuration file") args = ap.parse_args() self.cfg = toml.load(args.config, JinjaDict) self.now = datetime.now(UTC) self.output_dir = Path(self.cfg.output.directory) self.cache_dir = Path(self.cfg.input.cache) self.fetch_input_zones() self.initialize_output_zones() self.populate_output_zones() self.write_output_zones() def fetch_input_zones(self): self.cache_dir.mkdir(parents = True, exist_ok = True) self.forward = [] for iz in self.cfg.input.zone: origin = dns.name.from_text(iz.name) path = self.cache_dir / origin.to_text(omit_final_dot = True) try: with path.open("r") as f: fz = dns.zone.from_file(f, origin = origin, relativize = False) except: fz = dns.zone.Zone(origin = origin, relativize = False) try: keyring = dns.tsigkeyring.from_text(iz.tsig) if iz.tsig else None query, serial = dns.xfr.make_query(txn_manager = fz, keyring = keyring) dns.query.inbound_xfr(where = iz.server, txn_manager = fz, query = query) except Exception as e: print(f"Unable to XFR {iz.name}: {e}") continue with path.open("w") as f: fz.to_file(f, sorted = True, relativize = False) print(f"Updated {iz.name}") self.forward.append(fz) def initialize_output_zones(self): self.reverse = [] for oz in self.cfg.output.zones: rz = dns.zone.Zone(dns.name.from_text(oz), rdclass = IN, relativize = False) apex = rz.find_node(rz.origin, create = True) apex.replace_rdataset( dns.rdataset.from_rdata( self.cfg.output.ttl, dns.rdtypes.ANY.SOA.SOA( IN, SOA, dns.name.from_text(self.cfg.output.soa.mname), dns.name.from_text(self.cfg.output.soa.rname), int(self.now.timestamp()), self.cfg.output.soa.refresh, self.cfg.output.soa.retry, self.cfg.output.soa.expire, self.cfg.output.soa.minimum))) apex.replace_rdataset( dns.rdataset.from_text_list( IN, NS, self.cfg.output.ttl, self.cfg.output.ns, relativize = False, origin = dns.name.root)) rz.check_origin() self.reverse.append(rz) def populate_output_zones(self): for fz in self.forward: for qtype in (A, AAAA): for name, ttl, addr in fz.iterate_rdatas(qtype): name = name.derelativize(fz.origin) rname = dns.reversename.from_address(addr.to_text()) rdata = dns.rdtypes.ANY.PTR.PTR(IN, PTR, name) for rz in self.reverse: if rname.is_subdomain(rz.origin): rz.find_rdataset(rname, PTR, create = True).add(rdata, ttl) break def write_output_zones(self): self.output_dir.mkdir(parents = True, exist_ok = True) for rz in self.reverse: path = self.output_dir / rz.origin.to_text(omit_final_dot = True) with path.open("w") as f: f.write(f";; Automatically generated {self.now} by {self.jane}. Do not edit.\n\n") rz.to_file(f, sorted = True, relativize = False) print(f"Wrote {path}") class JinjaDict(dict): def __getattr__(self, name): try: return self[name] except KeyError: return None if __name__ == "__main__": Main()