aboutsummaryrefslogtreecommitdiff
path: root/printer_browser.py
blob: a7888927ec96f7f211cbd1007d3adda693cf55d6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3

"""
Browse for printers on the local net, dump DNS records for inclusion
in a DNS zone in master file format.
"""


from argparse           import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
from zeroconf           import Zeroconf, ServiceBrowser, ServiceInfo
from socket             import inet_ntop, AF_INET, AF_INET6
from dns.rdataclass     import IN
from dns.rdatatype      import PTR, SRV, TXT
from dns.rrset          import RRset

class Listener:
    """
    mDNS browser, receives service advertisements, retrieves
    ServiceInfo records and drops them into a Queue for pickup.
    """

    try:
        from queue import Queue, Empty # Python 3
    except ImportError:
        from Queue import Queue, Empty # Python 2

    def __init__(self, args):
        self.q = self.Queue()
        self.browse_timeout = args.browse_timeout
        self.query_timeout  = args.query_timeout * 1000

    def add_service(self, z, type, name):
        printer = z.get_service_info(type, name, timeout = self.query_timeout)
        if printer is not None:
            self.q.put(printer)

    def remove_service(self, z, type, name):
        pass

    def get(self):
        "Iterator to retrieve ServiceInfo results"
        while True:
            try:
                yield self.q.get(timeout = self.browse_timeout)
            except self.Empty:
                return

def dns_name(name, prefix = (), suffix = ()):
    "Strip .local, add optional prefix and suffix."
    from dns.name import Name, from_text
    name = from_text(str(name))
    assert name.labels[-2:] == (b"local", b"")
    return Name(prefix + name.labels[:-2] + suffix)

def rr(name, rdata):
    r = RRset(name, rdata.rdclass, rdata.rdtype)
    r.add(rdata)
    return r

def txt_rr(printer):
    "Regenerate ServiceInfo from property list and generate a TXT RR."
    from dns.rdata import from_wire
    si = ServiceInfo(printer.type, printer.name, properties = printer.properties)
    return rr(dns_name(printer.name), from_wire(IN, TXT, si.text, 0, len(si.text)))

def srv_rr(printer):
    "Generate an SRV RR."
    from dns.rdtypes.IN.SRV import SRV as SRV_RR
    return rr(dns_name(printer.name), SRV_RR(IN, SRV, printer.priority, printer.weight, printer.port, dns_name(printer.server)))

def ptr_rr(printer, *prefix):
    "Generate a PTR RR."
    from dns.rdtypes.ANY.PTR import PTR as PTR_RR
    return rr(dns_name(printer.type, prefix = prefix), PTR_RR(IN, PTR, dns_name(printer.name)))

def main():
    ap = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter)
    ap.add_argument("-q", "--quiet",    action = "store_true",
                    help = "omit comments from generated master file text")
    ap.add_argument("-o", "--output",   type = FileType("w"),   default = "-",
                    help = "where to write master file text")
    ap.add_argument("--browse-timeout", type = int,             default = 5,
                    help = "timeout in seconds while browsing for printers")
    ap.add_argument("--query-timeout",  type = int,             default = 3,
                    help = "timeout in seconds to retrieve printer data")
    ap.add_argument("--keep-adminurl",  action = "store_true",
                    help = "keep adminurl property in generated DNS data")
    ap.add_argument("mdns_type",        nargs = "*",
                    default = ["_ipp._tcp.local.", "_universal._sub._ipp._tcp.local.", "_pdl-datastream._tcp.local."],
                    help = "mDNS types for which to browse")
    args = ap.parse_args()

    def write(s = ""):
        args.output.write("{!s}\n".format(s))

    printers = {}

    z = Zeroconf()
    listener = Listener(args)
    for mdns_type in args.mdns_type:
        ServiceBrowser(z, mdns_type, listener)
    for printer in listener.get():
        try:
            printers[printer.name].append(printer)
        except KeyError:
            printers[printer.name] = [printer]
    z.close()

    for printer in printers.values():
        rrs = [ptr_rr(p) for p in printer]
        p = printer[0]
        if not args.keep_adminurl:
            p.properties.pop(b"adminurl", None)
        rrs.append(srv_rr(p))
        rrs.append(txt_rr(p))

        if not args.quiet:
            addr = inet_ntop({4:AF_INET, 6:AF_INET6}[len(p.address)], p.address)
            write(";; Name: {!r}".format(p.name))
            write(";; Addr: {} ({})".format(addr, p.server))
            write(";;")
        for rr in rrs:
            write(rr)
        write()

if __name__ == "__main__":
    main()