aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2010-06-04 03:12:08 +0000
committerRob Austein <sra@hactrn.net>2010-06-04 03:12:08 +0000
commit1b8d6a4ba743d14fe449b1e43a8f3ac066903989 (patch)
tree1286002e5554638c329f134e6eb9777c8d9708da
parent72a37fa08a82f972f23b817f7a4671765def710b (diff)
Initial version of asynchronous DNS code
svn path=/rpkid/rpki/adns.py; revision=3269
-rw-r--r--rpkid/rpki/adns.py316
1 files changed, 316 insertions, 0 deletions
diff --git a/rpkid/rpki/adns.py b/rpkid/rpki/adns.py
new file mode 100644
index 00000000..273abf73
--- /dev/null
+++ b/rpkid/rpki/adns.py
@@ -0,0 +1,316 @@
+"""
+Basic asynchronous DNS code, using asyncore and Bob Halley's excellent
+dnspython package.
+
+$Id$
+
+Copyright (C) 2010 Internet Systems Consortium ("ISC")
+
+Permission to use, copy, modify, and distribute this software for any
+purpose with or without fee is hereby granted, provided that the above
+copyright notice and this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
+
+Portions copyright (C) 2003-2007, 2009, 2010 Nominum, Inc.
+
+Permission to use, copy, modify, and distribute this software and its
+documentation for any purpose with or without fee is hereby granted,
+provided that the above copyright notice and this permission notice
+appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+"""
+
+import asyncore, socket, time, sys
+import dns.resolver, dns.rdatatype, dns.rdataclass, dns.name, dns.message
+import dns.inet, dns.exception, dns.query, dns.rcode, dns.ipv4, dns.ipv6
+import rpki.async, rpki.sundial, rpki.log
+
+## @var resolver
+# Resolver object, shared by everything using this module
+
+resolver = dns.resolver.Resolver()
+if resolver.cache is None:
+ resolver.cache = dns.resolver.Cache()
+
+## @var nscache
+# Nameserver map, maps values from resolver.nameservers to (af,
+# address) pairs. The latter turns out to be a more useful form for
+# us to use internally, because it simplifies the checks we need to
+# make upon packet receiption.
+
+nscache = {}
+
+class dispatcher(asyncore.dispatcher):
+ """
+ Basic UDP socket reader for use with asyncore.
+ """
+
+ def __init__(self, cb, eb, af, bufsize = 65535):
+ asyncore.dispatcher.__init__(self)
+ self.cb = cb
+ self.eb = eb
+ self.af = af
+ self.bufsize = bufsize
+ self.create_socket(af, socket.SOCK_DGRAM)
+
+ def handle_read(self):
+ """
+ Receive a packet, hand it off to query class callback.
+ """
+ wire, from_address = self.recvfrom(self.bufsize)
+ self.cb(self.af, from_address[0], from_address[1], wire)
+
+ def handle_error(self):
+ """
+ Pass errors to query class errback.
+ """
+ self.eb(sys.exc_info()[1])
+
+ def handle_connect(self):
+ """
+ Quietly ignore UDP "connection" events.
+ """
+ pass
+
+ def writable(self):
+ """
+ We don't need to hear about UDP socket becoming writable.
+ """
+ return False
+
+
+class query(object):
+ """
+ Simplified (no search paths) asynchronous adaptation of
+ dns.resolver.Resolver.query() (q.v.).
+ """
+
+ def __init__(self, cb, eb, qname, qtype = dns.rdatatype.A, qclass = dns.rdataclass.IN):
+
+ # Whack arguments into cannoical form
+ if isinstance(qname, (str, unicode)):
+ qname = dns.name.from_text(qname)
+ if isinstance(qtype, str):
+ qtype = dns.rdatatype.from_text(qtype)
+ if isinstance(qclass, str):
+ qclass = dns.rdataclass.from_text(qclass)
+
+ assert qname.is_absolute()
+
+ # Check our cache before doing network query.
+ if resolver.cache:
+ answer = resolver.cache.get((qname, qtype, qclass))
+ else:
+ answer = None
+
+ if answer:
+ # Found usable answer in cache, no network query required.
+ cb(answer)
+
+ else:
+ # Set up for network query.
+ self.cb = cb
+ self.eb = eb
+ self.qname = qname
+ self.qtype = qtype
+ self.qclass = qclass
+ self.start = time.time()
+ self.timer = rpki.async.timer()
+ self.sockets = {}
+ self.request = dns.message.make_query(self.qname, self.qtype, self.qclass)
+ if resolver.keyname is not None:
+ self.request.use_tsig(resolver.keyring, resolver.keyname, resolver.keyalgorithm)
+ self.request.use_edns(resolver.edns, resolver.ednsflags, resolver.payload)
+ self.response = None
+ self.backoff = 0.10
+ self.clone_nameservers()
+ self.loop1()
+
+ def clone_nameservers(self):
+ """
+ Parse resolver.nameservers into (address family, address) pairs,
+ cache result to speed up future queries, and construct nameserver
+ list for this query.
+ """
+ for ns in resolver.nameservers:
+ if ns not in nscache:
+ try:
+ addr = dns.ipv4.inet_aton(ns)
+ except:
+ pass
+ else:
+ nscache[ns] = (dns.inet.AF_INET, addr)
+ continue
+ try:
+ addr = dns.ipv6.inet_aton(ns)
+ except:
+ pass
+ else:
+ nscache[ns] = (dns.inet.AF_INET6, addr)
+ continue
+ raise ValueError
+ self.nameservers = [nscache[ns] for ns in resolver.nameservers]
+
+ def loop1(self):
+ """
+ Outer loop. If we haven't got a response yet and still have
+ nameservers to check, start inner loop. Otherwise, we're done.
+ """
+ self.timer.cancel()
+ if self.response is None and self.nameservers:
+ self.iterator = rpki.async.iterator(self.nameservers[:], self.loop2, self.done2)
+ else:
+ self.done1()
+
+ def loop2(self, iterator, nameserver):
+ """
+ Inner loop. Send query to next nameserver in our list, unless
+ we've hit the overall timeout for this query.
+ """
+ self.timer.cancel()
+ try:
+ timeout = resolver._compute_timeout(self.start)
+ except dns.resolver.Timeout, e:
+ self.lose(e)
+ else:
+ af, addr = nameserver
+ if af not in self.sockets:
+ self.sockets[af] = dispatcher(self.socket_cb, self.socket_eb, af)
+ self.sockets[af].sendto(self.request.to_wire(),
+ (dns.inet.inet_ntop(af, addr), resolver.port))
+ self.timer.set_handler(self.socket_timeout)
+ self.timer.set_errback(self.socket_eb)
+ self.timer.set(rpki.sundial.timedelta(seconds = timeout))
+
+ def socket_timeout(self):
+ """
+ No answer from nameserver, move on to next one (inner loop).
+ """
+ self.response = None
+ self.iterator()
+
+ def socket_eb(self, e):
+ """
+ UDP socket signaled error. If it really is some kind of socket
+ error, handle as if we've timed out on this nameserver; otherwise,
+ pass error back to caller.
+ """
+ self.timer.cancel()
+ if isinstance(e, socket.error):
+ self.response = None
+ self.iterator()
+ else:
+ self.lose(e)
+
+ def socket_cb(self, af, from_host, from_port, wire):
+ """
+ Received a packet that might be a DNS message. If it doesn't look
+ like it came from one of our nameservers, just drop it and leave
+ the timer running. Otherwise, try parsing it: if it's an answer,
+ we're done, otherwise handle error appropriately and move on to
+ next nameserver.
+ """
+ sender = (af, dns.inet.inet_pton(af, from_host))
+ if from_port != resolver.port or sender not in self.nameservers:
+ return
+ self.timer.cancel()
+ try:
+ self.response = dns.message.from_wire(wire, keyring = self.request.keyring, request_mac = self.request.mac, one_rr_per_rrset = False)
+ except dns.exception.FormError:
+ self.nameservers.remove(sender)
+ else:
+ rcode = self.response.rcode()
+ if rcode in (dns.rcode.NOERROR, dns.rcode.NXDOMAIN):
+ self.done1()
+ return
+ if rcode != dns.rcode.SERVFAIL:
+ self.nameservers.remove(sender)
+ self.response = None
+ self.iterator()
+
+ def done2(self):
+ """
+ Done with inner loop. If we still haven't got an answer and
+ haven't (yet?) eliminated all of our nameservers, wait a little
+ while before starting the cycle again, unless we've hit the
+ timeout threshold for the whole query.
+ """
+ if self.response is None and self.nameservers:
+ try:
+ delay = rpki.sundial.timedelta(seconds = min(resolver._compute_timeout(self.start), self.backoff))
+ self.backoff *= 2
+ self.timer.set_handler(self.loop1)
+ self.timer.set_errback(self.lose)
+ self.timer.set(delay)
+ except dns.resolver.Timeout, e:
+ self.lose(e)
+ else:
+ self.loop1()
+
+ def cleanup(self):
+ """
+ Shut down our timer and sockets.
+ """
+ self.timer.cancel()
+ for s in self.sockets.itervalues():
+ s.close()
+
+ def lose(self, e):
+ """
+ Something bad happened. Clean up, then pass error back to caller.
+ """
+ self.cleanup()
+ self.eb(e)
+
+ def done1(self):
+ """
+ Done with outer loop. If we got a useful answer, cache it, then
+ pass it back to caller; if we got an error, pass the appropriate
+ exception back to caller.
+ """
+ self.cleanup()
+ try:
+ if not self.nameservers:
+ raise dns.resolver.NoNameservers
+ if self.response.rcode() == dns.rcode.NXDOMAIN:
+ raise dns.resolver.NXDOMAIN
+ answer = dns.resolver.Answer(self.qname, self.qtype, self.qclass, self.response)
+ if resolver.cache:
+ resolver.cache.put((self.qname, self.qtype, self.qclass), answer)
+ self.cb(answer)
+ except (rpki.async.ExitNow, SystemExit):
+ raise
+ except Exception, e:
+ self.lose(e)
+
+if __name__ == "__main__":
+
+ rpki.log.use_syslog = False
+
+ def done(result):
+ for rdata in result:
+ print rdata
+
+ def lose(e):
+ print "DNS lookup failed: %r" % e
+
+ for qtype in (dns.rdatatype.A, dns.rdatatype.AAAA, dns.rdatatype.HINFO):
+ query(done, lose, "www.hactrn.net", qtype)
+ query(done, lose, "wibble.hactrn.net")
+ query(done, lose, "www.hactrn.net", qclass = dns.rdataclass.CH)
+
+ rpki.async.event_loop()