aboutsummaryrefslogtreecommitdiff
path: root/printer_browser.py
blob: c5ceddcf6758fef44b037d301d52dc4fc6927495 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3

"""
Browse for printers on the local net, dump DNS records for inclusion
in a DNS zone in master file format.
"""


from argparse           import ArgumentParser, FileType, ArgumentDefaultsHelpFormatter
from zeroconf           import Zeroconf, ServiceBrowser, ServiceInfo
from socket             import inet_ntop, AF_INET, AF_INET6
from dns.rdataclass     import IN
from dns.rdatatype      import PTR, SRV, TXT
from dns.rrset          import RRset

class Listener:
    """
    mDNS browser, receives service advertisements, retrieves
    ServiceInfo records and drops them into a Queue for pickup.
    """

    try:
        from queue import Queue, Empty # Python 3
    except ImportError:
        from Queue import Queue, Empty # Python 2

    def __init__(self, args):
        self.q = self.Queue()
        self.browse_timeout = args.browse_timeout
        self.query_timeout  = args.query_timeout * 1000

    def add_service(self, z, type, name):
        printer = z.get_service_info(type, name, timeout = self.query_timeout)
        if printer is not None:
            self.q.put(printer)

    def remove_service(self, z, type, name):
        pass

    def update_service(self, z, type, name):
        pass

    def get(self):
        "Iterator to retrieve ServiceInfo results"
        while True:
            try:
                yield self.q.get(timeout = self.browse_timeout)
            except self.Empty:
                return

def dns_name(name, prefix = (), suffix = ()):
    "Strip .local, add optional prefix and suffix."
    from dns.name import Name, from_text
    name = from_text(str(name))
    assert name.labels[-2:] == (b"local", b"")
    return Name(prefix + name.labels[:-2] + suffix)

def rr(name, rdata):
    r = RRset(name, rdata.rdclass, rdata.rdtype)
    r.add(rdata)
    return r

def txt_rr(printer):
    "Regenerate ServiceInfo from property list and generate a TXT RR."
    from dns.rdata import from_wire
    si = ServiceInfo(printer.type, printer.name, properties = printer.properties)
    return rr(dns_name(printer.name), from_wire(IN, TXT, si.text, 0, len(si.text)))

def srv_rr(printer):
    "Generate an SRV RR."
    from dns.rdtypes.IN.SRV import SRV as SRV_RR
    return rr(dns_name(printer.name), SRV_RR(IN, SRV, printer.priority, printer.weight, printer.port, dns_name(printer.server)))

def ptr_rr(printer, *prefix):
    "Generate a PTR RR."
    from dns.rdtypes.ANY.PTR import PTR as PTR_RR
    return rr(dns_name(printer.type, prefix = prefix), PTR_RR(IN, PTR, dns_name(printer.name)))

def main():
    ap = ArgumentParser(description = __doc__, formatter_class = ArgumentDefaultsHelpFormatter)
    ap.add_argument("-q", "--quiet",    action = "store_true",
                    help = "omit comments from generated master file text")
    ap.add_argument("-o", "--output",   type = FileType("w"),   default = "-",
                    help = "where to write master file text")
    ap.add_argument("--browse-timeout", type = int,             default = 5,
                    help = "timeout in seconds while browsing for printers")
    ap.add_argument("--query-timeout",  type = int,             default = 3,
                    help = "timeout in seconds to retrieve printer data")
    ap.add_argument("--keep-adminurl",  action = "store_true",
                    help = "keep adminurl property in generated DNS data")
    ap.add_argument("mdns_type",        nargs = "*",
                    default = ["_ipp._tcp.local.", "_universal._sub._ipp._tcp.local.", "_pdl-datastream._tcp.local."],
                    help = "mDNS types for which to browse")
    args = ap.parse_args()

    def write(s = ""):
        args.output.write("{!s}\n".format(s))

    printers = {}

    z = Zeroconf()
    listener = Listener(args)
    for mdns_type in args.mdns_type:
        ServiceBrowser(z, mdns_type, listener)
    for printer in listener.get():
        try:
            printers[printer.name].append(printer)
        except KeyError:
            printers[printer.name] = [printer]
    z.close()

    for printer in printers.values():
        rrs = [ptr_rr(p) for p in printer]
        p = printer[0]
        if not args.keep_adminurl:
            p.properties.pop(b"adminurl", None)
        rrs.append(srv_rr(p))
        rrs.append(txt_rr(p))

        if not args.quiet:
            write(";; Name: {!r}".format(p.name))
            for a in p.addresses:
                write(";; Addr: {} ({})".format(inet_ntop({4:AF_INET, 16:AF_INET6}[len(a)], a), p.server))
            write(";;")
        for rr in rrs:
            write(rr)
        write()

if __name__ == "__main__":
    main()