diff options
author | Rob Austein <sra@hactrn.net> | 2016-01-13 05:17:50 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-01-13 05:17:50 +0000 |
commit | 5dac4badb7c67a3698628ca80320623f423e2f4d (patch) | |
tree | 17b59c897a3f55e2bb3e55e201ff94c05882a79a /rp/rcynic | |
parent | e7f0dc6d3596548e8d530afd747aaca4775a15fe (diff) |
Checkpoint of first cut at RRDP client code. RRDP client code not yet
tested, and final_cleanup() needs work to avoid trashing RRDP state.
svn path=/branches/tk705/; revision=6222
Diffstat (limited to 'rp/rcynic')
-rwxr-xr-x | rp/rcynic/rcynicng | 222 |
1 files changed, 212 insertions, 10 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 12395c62..fc36c0d9 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -9,10 +9,11 @@ Reimplementation of rcynic in Python. Work in progress. import os import sys import time -import shutil import errno +import shutil import logging import argparse +import urlparse import subprocess import tornado.gen @@ -25,10 +26,11 @@ import rpki.POW import rpki.sundial import rpki.config import rpki.autoconf +import rpki.relaxng from rpki.oids import id_kp_bgpsec_router -from lxml.etree import ElementTree, Element, SubElement, Comment +from lxml.etree import ElementTree, Element, SubElement, Comment, XML logger = logging.getLogger("rcynicng") @@ -724,6 +726,13 @@ def sha256(bytes): return d.digest() +class RRDP_ParseFailure(Exception): + "Failure parsing RRDP message." + +class DeadHost(Exception): + "Host recently tried and known to be unavailable." + + class Fetcher(object): """ Network transfer methods and history database. @@ -746,11 +755,14 @@ class Fetcher(object): _rsync_deadhosts = set() _rsync_history = dict() - def __init__(self, uri): + _https_deadhosts = set() + _https_history = dict() + + def __init__(self, uri, ta = False): self.uri = uri + self.ta = ta self.pending = None self.status = None - self.runtime = None def _rsync_split_uri(self): return tuple(self.uri.rstrip("/").split("/")[2:]) @@ -769,6 +781,8 @@ class Fetcher(object): return False if self.uri.startswith("rsync://"): return self._rsync_needed() + if self.uri.startswith("https://"): + return self._https_needed() raise ValueError def _rsync_needed(self): @@ -778,14 +792,23 @@ class Fetcher(object): entry = self._rsync_find(path) return entry is None or entry.pending is not None + def _https_needed(self): + netloc = urlparse.urlparse(self.uri).netloc + if netloc in self._https_deadhosts: + return False + entry = self._https_history.get(self.uri) + return entry is None or entry.pending is not None + def fetch(self): if self.uri.startswith("rsync://"): return self._rsync_fetch() + if self.uri.startswith("https://"): + return self._https_fetch() if self.ta else self._rrdp_fetch() raise ValueError @tornado.gen.coroutine def _rsync_fetch(self): - assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") or self.uri.endswith("/")) + assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") if self.ta else self.uri.endswith("/")) if args.no_fetch: return @@ -833,12 +856,11 @@ class Fetcher(object): output = yield rsync.stdout.read_until_close() pid, self.status = os.waitpid(rsync.pid, os.WNOHANG) t1 = time.time() - self.runtime = t1 - t0 if (pid, self.status) == (0, 0): logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", rsync.pid) for line in output.splitlines(): logger.debug("rsync[%s] %s", rsync.pid, line) - logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, self.runtime, self.status) + logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, t1 - t0, self.status) # Should do something with rsync result and validation status database here. @@ -880,6 +902,179 @@ class Fetcher(object): elif os.path.exists(path): yield path + @tornado.gen.coroutine + def _https_fetch_url(self, url): + + if urlparse.urlparse(url).netloc in self._https_deadhosts: + raise DeadHost + + try: + t0 = time.time() + client = tornado.httpclient.AsyncHTTPClient() + response = yield client.fetch(url, validate_cert = False) + # Might want to check response Content-Type here + except: + # Might want to check response code here to figure out + # whether to list this host in _https_deadhosts. + ok = False + raise + else: + ok = True + finally: + t1 = time.time() + logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) + retrieval = rpki.rcynicdb.models.Retrieval.objects.create( + uri = url, + started = rpki.sundial.datetime.fromtimestamp(t0), + finished = rpki.sundial.datetime.fromtimestamp(t1), + successful = ok) + + raise tornado.gen.Return(retrieval, response.body if ok else None) + + @tornado.gen.coroutine + def _https_fetch(self): + + if args.no_fetch: + return + + other = self._https_history.get(self.uri) + if other is not None and other.pending is not None: + yield other.pending.wait() + return + + self.pending = tornado.locks.Condition() + self._rsync_history[self.uri] = self + + try: + retrieval, der = yield self._https_fetch_url(self.uri) + X509.store_if_new(der, self.uri, retrieval) + except: + logger.exception("Couldn't load %s", self.uri) + + finally: + pending = self.pending + self.pending = None + pending.notify_all() + + @tornado.gen.coroutine + def _rrdp_fetch_url(self, url, tag, hash = None, uuid = None, serial = None): + + retrieval, xml = yield self._https_fetch_url(url) + + if hash is not None and sha256(xml).encode("hex") != hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash %s for %s, got %s" % ( + hash.lower(), url, sha256(xml).encode("hex"))) + + xml = XML(xml) + rpki.relaxng.rrdp.schema.assertValid(xml) + + if tag is not None and xml.tag != rpki.relaxng.rrdp.xmlns + tag: + raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % ( + tag, delta_url, xml.tag)) + + if uuid is not None and xml.get("uuid") != uuid: + raise RRDP_ParseFailure("Expected RRDP UUID %s for %s, got %s" % ( + uuid, url, xml.get("uuid"))) + + if serial is not None and long(xml.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % ( + serial, url, xml.get("serial"))) + + raise tornado.gen.Return(retrieval, xml) + + @tornado.gen.coroutine + def _rrdp_fetch(self): + from django.db import transaction + + if args.no_fetch: + return + + other = self._https_history.get(self.uri) + if other is not None and other.pending is not None: + yield other.pending.wait() + return + + self.pending = tornado.locks.Condition() + self._rsync_history[self.uri] = self + + try: + retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification") + + uuid = notification.get("uuid") + serial = long(notification.get("serial")) + + old_snapshot = rpki.rcynicdb.models.RRDPSnapshot.objects.filter( + uuid = uuid).order_by("-retrieved__started").first() + + if old_snapshot is not None and old_snapshot.serial == serial: + return # We're up to date, nothing to do + + delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) + for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) + + new_snapshot = rpki.rcynicdb.models.RRDPSnapshot(retrieved = retrieval, uuid = uuid, serial = serial) + + if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials: + # No useful combination of old snapshot and available deltas, need to load current snapshot. + + x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot") + + retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"), + tag = "snapshot", uuid = uuid, serial = serial) + + with transaction.atomic(): + new_snapshot.save() + for x in snapshot.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): + uri = x.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI %s" % uri) + obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) + obj.snapshot.add(new_snapshot) + obj.save() + + else: + # In theory there exists a set of deltas which will get us from old_snapshot to new_snapshot. + + deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0], + hash = delta_serials[serial][1], + tag = "delta", + uuid = uuid, + serial = serial)) + for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)] + + with transaction.atomic(): + new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set + new_snapshot.save() + + for retrieval, delta in deltas: + + for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"): + new_snapshot.rpkiobject_set.remove( + new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) + new_snapshot.save() + + for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): + uri = x.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI %s" % uri) + if x.get("hash", None) is not None: + new_snapshot.rpkiobject_set.remove( + new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) + new_snapshot.save() + obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) + obj.snapshot.add(new_snapshot) + obj.save() + + except: + logger.exception("Couldn't load %s", self.uri) + + finally: + pending = self.pending + self.pending = None + pending.notify_all() + class CheckTALTask(object): @@ -892,7 +1087,7 @@ class CheckTALTask(object): @tornado.gen.coroutine def __call__(self): - yield Fetcher(self.uri).fetch() + yield Fetcher(self.uri, ta = True).fetch() for cer in fetch_objects(uri = self.uri): if self.check(cer): yield task_queue.put(WalkTask(cer = cer)) @@ -966,8 +1161,15 @@ def final_cleanup(): # https://docs.djangoproject.com/en/1.8/faq/models/#how-can-i-see-the-raw-sql-queries-django-is-running # All RPKI objects other than the most recent authenticated set. - # This will definitely need work once we support RRDP. - rpki.rcynicdb.models.RPKIObject.objects.exclude(authenticated = authenticated.id).delete() + q = rpki.rcynicdb.models.RPKIObject.objects + q = q.exclude(authenticated = authenticated.id) + + # OK, I think what we want to express here is: "Keep any + # RPKIObject which is part of any most recent RRDP snapshot + # which has one RPKIObjects in the current authenticated set" + # Whee! + + q.delete() # Flush all but the most recent authenticated set. rpki.rcynicdb.models.Authenticated.objects.exclude(id = authenticated.id).delete() |