#!/usr/bin/env python3 """ Browse for printers on the local net, dump DNS records for inclusion in a DNS zone in master file format. """ from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo from socket import inet_ntop, AF_INET, AF_INET6 from dns.rdataclass import IN from dns.rdatatype import PTR, SRV, TXT from dns.rrset import RRset class Listener: """ mDNS browser, receives service advertisements, retrieves ServiceInfo records and drops them into a Queue for pickup. """ try: from queue import Queue, Empty # Python 3 except ImportError: from Queue import Queue, Empty # Python 2 def __init__(self, args): self.q = self.Queue() self.browse_timeout = args.browse_timeout self.query_timeout = args.query_timeout * 1000 def add_service(self, z, type, name): printer = z.get_service_info(type, name, timeout = self.query_timeout) if printer is not None: self.q.put(printer) def remove_service(self, z, type, name): pass def get(self): "Iterator to retrieve ServiceInfo results" while True: try: yield self.q.get(timeout = self.browse_timeout) except self.Empty: return def dns_name(name, prefix = (), suffix = ()): "Strip .local, add optional prefix and suffix." from dns.name import Name, from_text name = from_text(str(name)) assert name.labels[-2:] == (b"local", b"") return Name(prefix + name.labels[:-2] + suffix) def rr(name, rdata): r = RRset(name, rdata.rdclass, rdata.rdtype) r.add(rdata) return r def txt_rr(printer): "Regenerate ServiceInfo from property list and generate a TXT RR." from dns.rdata import from_wire si = ServiceInfo(printer.type, printer.name, properties = printer.properties) return rr(dns_name(printer.name), from_wire(IN, TXT, si.text, 0, len(si.text))) def srv_rr(printer): "Generate an SRV RR." from dns.rdtypes.IN.SRV import SRV as SRV_RR return rr(dns_name(printer.name), SRV_RR(IN, SRV, printer.priority, printer.weight, printer.port, dns_name(printer.server))) def ptr_rr(printer, *prefix): "Generate a PTR RR." from dns.rdtypes.ANY.PTR import PTR as PTR_RR return rr(dns_name(printer.type, prefix = prefix), PTR_RR(IN, PTR, dns_name(printer.name))) def main(): ap = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter) ap.add_argument("-q", "--quiet", action = "store_true", help = "omit comments from generated master file text") ap.add_argument("-o", "--output", type = FileType("w"), default = "-", help = "where to write master file text") ap.add_argument("--browse-timeout", type = int, default = 5, help = "timeout in seconds while browsing for printers") ap.add_argument("--query-timeout", type = int, default = 3, help = "timeout in seconds to retrieve printer data") ap.add_argument("--keep-adminurl", action = "store_true", help = "keep adminurl property in generated DNS data") ap.add_argument("mdns_type", nargs = "*", default = ["_ipp._tcp.local.", "_universal._sub._ipp._tcp.local.", "_pdl-datastream._tcp.local."], help = "mDNS types for which to browse") args = ap.parse_args() def write(s = ""): args.output.write("{!s}\n".format(s)) printers = {} z = Zeroconf() listener = Listener(args) for mdns_type in args.mdns_type: ServiceBrowser(z, mdns_type, listener) for printer in listener.get(): try: printers[printer.name].append(printer) except KeyError: printers[printer.name] = [printer] z.close() for printer in printers.values(): rrs = [ptr_rr(p) for p in printer] p = printer[0] if not args.keep_adminurl: p.properties.pop(b"adminurl", None) rrs.append(srv_rr(p)) rrs.append(txt_rr(p)) if not args.quiet: addr = inet_ntop({4:AF_INET, 6:AF_INET6}[len(p.address)], p.address) write(";; Name: {!r}".format(p.name)) write(";; Addr: {} ({})".format(addr, p.server)) write(";;") for rr in rrs: write(rr) write() if __name__ == "__main__": main()