1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#!/usr/bin/env python3
"""
Browse for printers on the local net, dump DNS records for inclusion
in a DNS zone in master file format.
"""
from argparse import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo
from socket import inet_ntop, AF_INET, AF_INET6
from dns.rdataclass import IN
from dns.rdatatype import PTR, SRV, TXT
from dns.rrset import RRset
class Listener:
"""
mDNS browser, receives service advertisements, retrieves
ServiceInfo records and drops them into a Queue for pickup.
"""
try:
from queue import Queue, Empty # Python 3
except ImportError:
from Queue import Queue, Empty # Python 2
def __init__(self, args):
self.q = self.Queue()
self.browse_timeout = args.browse_timeout
self.query_timeout = args.query_timeout * 1000
def add_service(self, z, type, name):
printer = z.get_service_info(type, name, timeout = self.query_timeout)
if printer is not None:
self.q.put(printer)
def remove_service(self, z, type, name):
pass
def get(self):
"Iterator to retrieve ServiceInfo results"
while True:
try:
yield self.q.get(timeout = self.browse_timeout)
except self.Empty:
return
def dns_name(name, prefix = (), suffix = ()):
"Strip .local, add optional prefix and suffix."
from dns.name import Name, from_text
name = from_text(str(name))
assert name.labels[-2:] == (b"local", b"")
return Name(prefix + name.labels[:-2] + suffix)
def rr(name, rdata):
r = RRset(name, rdata.rdclass, rdata.rdtype)
r.add(rdata)
return r
def txt_rr(printer):
"Regenerate ServiceInfo from property list and generate a TXT RR."
from dns.rdata import from_wire
si = ServiceInfo(printer.type, printer.name, properties = printer.properties)
return rr(dns_name(printer.name), from_wire(IN, TXT, si.text, 0, len(si.text)))
def srv_rr(printer):
"Generate an SRV RR."
from dns.rdtypes.IN.SRV import SRV as SRV_RR
return rr(dns_name(printer.name), SRV_RR(IN, SRV, printer.priority, printer.weight, printer.port, dns_name(printer.server)))
def ptr_rr(printer, *prefix):
"Generate a PTR RR."
from dns.rdtypes.ANY.PTR import PTR as PTR_RR
return rr(dns_name(printer.type, prefix = prefix), PTR_RR(IN, PTR, dns_name(printer.name)))
def main():
ap = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter)
ap.add_argument("-q", "--quiet", action = "store_true",
help = "omit comments from generated master file text")
ap.add_argument("-o", "--output", type = FileType("w"), default = "-",
help = "where to write master file text")
ap.add_argument("--browse-timeout", type = int, default = 5,
help = "timeout in seconds while browsing for printers")
ap.add_argument("--query-timeout", type = int, default = 3,
help = "timeout in seconds to retrieve printer data")
ap.add_argument("--keep-adminurl", action = "store_true",
help = "keep adminurl property in generated DNS data")
ap.add_argument("mdns_type", nargs = "*",
default = ["_ipp._tcp.local.", "_universal._sub._ipp._tcp.local.", "_pdl-datastream._tcp.local."],
help = "mDNS types for which to browse")
args = ap.parse_args()
def write(s = ""):
args.output.write("{!s}\n".format(s))
printers = {}
z = Zeroconf()
listener = Listener(args)
for mdns_type in args.mdns_type:
ServiceBrowser(z, mdns_type, listener)
for printer in listener.get():
try:
printers[printer.name].append(printer)
except KeyError:
printers[printer.name] = [printer]
z.close()
for printer in printers.values():
rrs = [ptr_rr(p) for p in printer]
p = printer[0]
if not args.keep_adminurl:
p.properties.pop(b"adminurl", None)
rrs.append(srv_rr(p))
rrs.append(txt_rr(p))
if not args.quiet:
addr = inet_ntop({4:AF_INET, 6:AF_INET6}[len(p.address)], p.address)
write(";; Name: {!r}".format(p.name))
write(";; Addr: {} ({})".format(addr, p.server))
write(";;")
for rr in rrs:
write(rr)
write()
if __name__ == "__main__":
main()
|