aboutsummaryrefslogtreecommitdiff
path: root/rzc.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2025-06-06 22:27:49 +0000
committerRob Austein <sra@hactrn.net>2025-06-06 22:28:58 +0000
commit0e17c12b16a471c79333d9e684a9bd56a082615a (patch)
treedd0343c193327488f5201dff2254cdb3f4c8b779 /rzc.py
Initial public versionHEADtrunk
Diffstat (limited to 'rzc.py')
-rwxr-xr-xrzc.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/rzc.py b/rzc.py
new file mode 100755
index 0000000..da1107d
--- /dev/null
+++ b/rzc.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python3
+
+"""
+Reverse Zone Compiler (document this...)
+"""
+
+from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
+from datetime import datetime, UTC
+from pathlib import Path
+from os import getenv
+from sys import argv
+
+from dns.rdatatype import A, AAAA, SOA, NS, PTR, CNAME
+from dns.rdataclass import IN
+from dns.node import NodeKind
+
+import dns.rdtypes.ANY.CNAME
+import dns.rdtypes.ANY.SOA
+import dns.rdtypes.ANY.PTR
+import dns.tsigkeyring
+import dns.reversename
+import dns.rdataclass
+import dns.rdatatype
+import dns.rdataset
+import dns.query
+import dns.rdata
+import dns.name
+import dns.zone
+
+import toml
+
+class Main:
+
+ def __init__(self):
+ self.jane = Path(argv[0]).resolve()
+
+ ap = ArgumentParser(formatter_class = ArgumentDefaultsHelpFormatter)
+ ap.add_argument("-c", "--config", type = FileType("r"),
+ default = getenv("RZC_CONFIG", f"{self.jane.stem}.toml"),
+ help = "TOML configuration file")
+ args = ap.parse_args()
+
+ self.cfg = toml.load(args.config, JinjaDict)
+ self.now = datetime.now(UTC)
+
+ self.output_dir = Path(self.cfg.output.directory)
+ self.cache_dir = Path(self.cfg.input.cache)
+
+ self.fetch_input_zones()
+ self.initialize_output_zones()
+ self.populate_output_zones()
+ self.write_output_zones()
+
+ def fetch_input_zones(self):
+ self.cache_dir.mkdir(parents = True, exist_ok = True)
+ self.forward = []
+
+ for iz in self.cfg.input.zone:
+ origin = dns.name.from_text(iz.name)
+ path = self.cache_dir / origin.to_text(omit_final_dot = True)
+ try:
+ with path.open("r") as f:
+ fz = dns.zone.from_file(f, origin = origin, relativize = False)
+ except:
+ fz = dns.zone.Zone(origin = origin, relativize = False)
+ try:
+ keyring = dns.tsigkeyring.from_text(iz.tsig) if iz.tsig else None
+ query, serial = dns.xfr.make_query(txn_manager = fz, keyring = keyring)
+ dns.query.inbound_xfr(where = iz.server, txn_manager = fz, query = query)
+ except Exception as e:
+ print(f"Unable to XFR {iz.name}: {e}")
+ continue
+ with path.open("w") as f:
+ fz.to_file(f, sorted = True, relativize = False)
+ print(f"Updated {iz.name}")
+ self.forward.append(fz)
+
+ def initialize_output_zones(self):
+ self.reverse = []
+
+ for oz in self.cfg.output.zones:
+ rz = dns.zone.Zone(dns.name.from_text(oz), rdclass = IN, relativize = False)
+ apex = rz.find_node(rz.origin, create = True)
+ apex.replace_rdataset(
+ dns.rdataset.from_rdata(
+ self.cfg.output.ttl,
+ dns.rdtypes.ANY.SOA.SOA(
+ IN, SOA,
+ dns.name.from_text(self.cfg.output.soa.mname),
+ dns.name.from_text(self.cfg.output.soa.rname),
+ int(self.now.timestamp()),
+ self.cfg.output.soa.refresh, self.cfg.output.soa.retry,
+ self.cfg.output.soa.expire, self.cfg.output.soa.minimum)))
+ apex.replace_rdataset(
+ dns.rdataset.from_text_list(
+ IN, NS, self.cfg.output.ttl, self.cfg.output.ns,
+ relativize = False, origin = dns.name.root))
+ rz.check_origin()
+ self.reverse.append(rz)
+
+ def populate_output_zones(self):
+ for fz in self.forward:
+ for qtype in (A, AAAA):
+ for name, ttl, addr in fz.iterate_rdatas(qtype):
+ name = name.derelativize(fz.origin)
+ rname = dns.reversename.from_address(addr.to_text())
+ rdata = dns.rdtypes.ANY.PTR.PTR(IN, PTR, name)
+ for rz in self.reverse:
+ if rname.is_subdomain(rz.origin):
+ rz.find_rdataset(rname, PTR, create = True).add(rdata, ttl)
+ break
+
+ def write_output_zones(self):
+ self.output_dir.mkdir(parents = True, exist_ok = True)
+ for rz in self.reverse:
+ path = self.output_dir / rz.origin.to_text(omit_final_dot = True)
+ with path.open("w") as f:
+ f.write(f";; Automatically generated {self.now} by {self.jane}. Do not edit.\n\n")
+ rz.to_file(f, sorted = True, relativize = False)
+ print(f"Wrote {path}")
+
+class JinjaDict(dict):
+ def __getattr__(self, name):
+ try:
+ return self[name]
+ except KeyError:
+ return None
+
+if __name__ == "__main__":
+ Main()