123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #!/usr/bin/env python3
- """
- Browse for printers on the local net, dump DNS records for inclusion
- in a DNS zone in master file format.
- """
- from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
- from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo
- from socket import inet_ntop, AF_INET, AF_INET6
- from dns.rdataclass import IN
- from dns.rdatatype import PTR, SRV, TXT
- from dns.rrset import RRset
- class Listener:
- """
- mDNS browser, receives service advertisements, retrieves
- ServiceInfo records and drops them into a Queue for pickup.
- """
- try:
- from queue import Queue, Empty # Python 3
- except ImportError:
- from Queue import Queue, Empty # Python 2
- def __init__(self, args):
- self.q = self.Queue()
- self.browse_timeout = args.browse_timeout
- self.query_timeout = args.query_timeout * 1000
- def add_service(self, z, type, name):
- printer = z.get_service_info(type, name, timeout = self.query_timeout)
- if printer is not None:
- self.q.put(printer)
- def remove_service(self, z, type, name):
- pass
- def get(self):
- "Iterator to retrieve ServiceInfo results"
- while True:
- try:
- yield self.q.get(timeout = self.browse_timeout)
- except self.Empty:
- return
- def dns_name(name, prefix = (), suffix = ()):
- "Strip .local, add optional prefix and suffix."
- from dns.name import Name, from_text
- name = from_text(str(name))
- assert name.labels[-2:] == (b"local", b"")
- return Name(prefix + name.labels[:-2] + suffix)
- def rr(name, rdata):
- r = RRset(name, rdata.rdclass, rdata.rdtype)
- r.add(rdata)
- return r
- def txt_rr(printer):
- "Regenerate ServiceInfo from property list and generate a TXT RR."
- from dns.rdata import from_wire
- si = ServiceInfo(printer.type, printer.name, properties = printer.properties)
- return rr(dns_name(printer.name), from_wire(IN, TXT, si.text, 0, len(si.text)))
- def srv_rr(printer):
- "Generate an SRV RR."
- from dns.rdtypes.IN.SRV import SRV as SRV_RR
- return rr(dns_name(printer.name), SRV_RR(IN, SRV, printer.priority, printer.weight, printer.port, dns_name(printer.server)))
- def ptr_rr(printer, *prefix):
- "Generate a PTR RR."
- from dns.rdtypes.ANY.PTR import PTR as PTR_RR
- return rr(dns_name(printer.type, prefix = prefix), PTR_RR(IN, PTR, dns_name(printer.name)))
- def main():
- ap = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter)
- ap.add_argument("-q", "--quiet", action = "store_true",
- help = "omit comments from generated master file text")
- ap.add_argument("-o", "--output", type = FileType("w"), default = "-",
- help = "where to write master file text")
- ap.add_argument("--browse-timeout", type = int, default = 5,
- help = "timeout in seconds while browsing for printers")
- ap.add_argument("--query-timeout", type = int, default = 3,
- help = "timeout in seconds to retrieve printer data")
- ap.add_argument("--keep-adminurl", action = "store_true",
- help = "keep adminurl property in generated DNS data")
- ap.add_argument("mdns_type", nargs = "*",
- default = ["_ipp._tcp.local.", "_universal._sub._ipp._tcp.local.", "_pdl-datastream._tcp.local."],
- help = "mDNS types for which to browse")
- args = ap.parse_args()
- def write(s = ""):
- args.output.write("{!s}\n".format(s))
- printers = {}
- z = Zeroconf()
- listener = Listener(args)
- for mdns_type in args.mdns_type:
- ServiceBrowser(z, mdns_type, listener)
- for printer in listener.get():
- try:
- printers[printer.name].append(printer)
- except KeyError:
- printers[printer.name] = [printer]
- z.close()
- for printer in printers.values():
- rrs = [ptr_rr(p) for p in printer]
- p = printer[0]
- if not args.keep_adminurl:
- p.properties.pop(b"adminurl", None)
- rrs.append(srv_rr(p))
- rrs.append(txt_rr(p))
- if not args.quiet:
- addr = inet_ntop({4:AF_INET, 6:AF_INET6}[len(p.address)], p.address)
- write(";; Name: {!r}".format(p.name))
- write(";; Addr: {} ({})".format(addr, p.server))
- write(";;")
- for rr in rrs:
- write(rr)
- write()
- if __name__ == "__main__":
- main()
|