diff options
Diffstat (limited to 'rp')
-rwxr-xr-x | rp/rcynic/rcynicng | 201 |
1 files changed, 93 insertions, 108 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index cad0c505..553b32e5 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -16,6 +16,7 @@ import shutil import socket import logging import argparse +import tempfile import urlparse import subprocess @@ -34,10 +35,18 @@ import rpki.relaxng from rpki.oids import id_kp_bgpsec_router -from lxml.etree import ElementTree, Element, SubElement, Comment, XML, XMLPullParser, DocumentInvalid, XMLSyntaxError +from lxml.etree import ElementTree, Element, SubElement, Comment, XML, DocumentInvalid, XMLSyntaxError, iterparse logger = logging.getLogger("rcynicng") +xmlns = rpki.relaxng.rrdp.xmlns + +tag_delta = xmlns + "delta" +tag_notification = xmlns + "notification" +tag_publish = xmlns + "publish" +tag_snapshot = xmlns + "snapshot" +tag_withdraw = xmlns + "withdraw" + codes = rpki.POW.validation_status @@ -741,70 +750,6 @@ class DeadHost(Exception): "Host recently tried and known to be unavailable." -class RRDPPullParser(object): - """ - An XML pull-parser for RRDP <snapshot/> and <delta/> messages. - """ - - def __init__(self, handlers, validate = False): - self.parser = XMLPullParser() - self.events = self.parser.read_events() - self.handlers = handlers - self.validate = validate - self._digest = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) - - @staticmethod - def _get_path(x): - path = [] - while x is not None: - path.insert(0, x) - x = x.getparent() - return path - - # Validation is nasty, but our options in mid-parse are limited - # once we've started modifying the document, which we kind of have - # to do given that the entire point of iterative parsing is to - # conserve memory by discarding elements as soon as we've - # processed them. Fortunately, RRDP snapshots and deltas have - # such simple structure that we can get away with reconstructing - # and validating just the current branch. This is a total kludge, - # and doesn't work even for RRDP notifications, much less - # generalized XML documents, but by happy chance it does work in - # the two cases where we really need it. - - @staticmethod - def _validate(path): - root = None - for this in reversed(path): - child = root - root = copy.deepcopy(this) - del root[:] - if child is not None: - root.append(child) - rpki.relaxng.rrdp.schema.assertValid(root) - - def __call__(self, data, retrieval): - self.parser.feed(data) - self._digest.update(data) - for action, element in self.events: - path = self._get_path(element) - tags = tuple(x.tag for x in path) - if len(path) < 2: - continue - if self.validate: - self._validate(path) - if tags in self.handlers: - path.insert(0, retrieval) - self.handlers[tags](*path) - element.clear() - while element.getprevious() is not None: - del element.getparent()[0] - - @property - def sha256hex(self): - return self._digest.digest().encode("hex") - - class Fetcher(object): """ Network transfer methods and history database. @@ -968,7 +913,7 @@ class Fetcher(object): yield path @tornado.gen.coroutine - def _https_fetch_url(self, url, pull_parser = None): + def _https_fetch_url(self, url, streaming_callback = None): if urlparse.urlparse(url).netloc in self._https_deadhosts: raise DeadHost @@ -997,13 +942,6 @@ class Fetcher(object): try: ok = False t0 = time.time() - retrieval = Retrieval.objects.create( - uri = url, - started = rpki.sundial.datetime.fromtimestamp(t0)) - if pull_parser is None: - streaming_callback = None - else: - streaming_callback = lambda data: pull_parser(data, retrieval) client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size) response = yield client.fetch(url, streaming_callback = streaming_callback, @@ -1030,11 +968,13 @@ class Fetcher(object): finally: t1 = time.time() logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) - retrieval.finished = rpki.sundial.datetime.fromtimestamp(t1) - retrieval.successful = ok - retrieval.save() - - raise tornado.gen.Return((retrieval, response)) + retrieval = Retrieval.objects.create( + uri = url, + started = rpki.sundial.datetime.fromtimestamp(t0), + finished = rpki.sundial.datetime.fromtimestamp(t1), + successful = ok) + if ok: + raise tornado.gen.Return((retrieval, response)) @tornado.gen.coroutine def _https_fetch_ta(self): @@ -1070,7 +1010,7 @@ class Fetcher(object): rpki.relaxng.rrdp.schema.assertValid(notification) - if notification.tag != rpki.relaxng.rrdp.xmlns + "notification": + if notification.tag != tag_notification: raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag)) raise tornado.gen.Return((retrieval, notification)) @@ -1091,7 +1031,7 @@ class Fetcher(object): delta = XML(response.body) rpki.relaxng.rrdp.schema.assertValid(delta) - if delta.tag != rpki.relaxng.rrdp.xmlns + "delta": + if delta.tag != tag_delta: raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag)) if delta.get("session_id") != session_id: @@ -1113,6 +1053,22 @@ class Fetcher(object): raise tornado.gen.Return(retrieval) @tornado.gen.coroutine + def _rrdp_fetch_file(self, url, expected_hash): + + sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + xml_file = tempfile.SpooledTemporaryFile() + + retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data))) + + received_hash = sha256.digest().encode("hex") + xml_file.seek(0) + + if received_hash != expected_hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash)) + + raise tornado.gen.Return((retrieval, response, xml_file)) + + @tornado.gen.coroutine def _rrdp_fetch(self): from django.db import transaction @@ -1144,50 +1100,83 @@ class Fetcher(object): return deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) - for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) + for delta in notification.iterchildren(tag_delta)) if snapshot is None or snapshot.serial + 1 not in deltas: if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) - x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot") + x = notification.find(tag_snapshot) url, hash = x.get("uri"), x.get("hash") logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) + retrieval, response, xml_file = yield self._rrdp_fetch_file(url, hash) + snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) - # Fetching and processing a huge snapshot file can take quite a while. - # If we need to warn the user about this, see header_callback (companion - # to streaming_callback) which would let us see the content-length early - # enough to issue a useful warning. - - def snapshot_publish_handler(retrieval, root, publish): - if root.get("session_id") != session_id: - raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( - session_id, url, root.get("session_id"))) - if long(root.get("serial")) != long(serial): - raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( - serial, url, root.get("serial"))) - uri = publish.get("uri") + root = None + count = 0 + + for event, node in iterparse(xml_file): + if node is root: + continue + + if root is None: + root = node.getparent() + if root is None or root.tag != tag_snapshot: + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + serial, url, root.get("serial"))) + + if node.tag != tag_publish or node.getparent() is not root: + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + + count += 1 + if count % 1000 == 0: + logger.debug("Parsing %s object %s", url, count) + + uri = node.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) - obj, created = cls.store_if_new(publish.text.decode("base64"), uri, retrieval) + + # This next bit, sadly, is hideously slow for large snapshots. Django does have a bulk + # creation operation of sorts, but using it here would be trickly because the record may already + # exist. It's possible that we can do smoething clever here using the .exists() queryset + # operator to figure out whether we should add something to the bulk insertion. Don't really + # know that it would be faster, but it might, since read operations are probably faster than + # reads interleaved with writes. + # + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create + # + # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh. + # Well, maybe we can work around that, since "retrieval" already gives us the exact set of + # objects for which we'd need to patch up the many-to-many relationship pointer. Depends + # on what they mean by "doesn't work": automation not complete vs mysterious failure. + + obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) obj.snapshot.add(snapshot) obj.save() - retrieval = yield self._rrdp_fetch_snapshot( - url = url, - hash = hash, - pull_parser = RRDPPullParser({(rpki.relaxng.rrdp.xmlns + "snapshot", - rpki.relaxng.rrdp.xmlns + "publish") : snapshot_publish_handler})) + node.clear() + while node.getprevious() is not None: + del root[0] + + yield tornado.gen.moment snapshot.retrieved = retrieval snapshot.save() + xml_file.close() + else: logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, (serial - snapshot.serial), snapshot.serial, serial) @@ -1218,12 +1207,12 @@ class Fetcher(object): snapshot.serial = serial snapshot.save() - for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"): + for x in delta.iterchildren(tag_withdraw): snapshot.rpkiobject_set.remove( snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) snapshot.save() - for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): + for x in delta.iterchildren(tag_publish): uri = x.get("uri") cls = uri_to_class(uri) if cls is None: @@ -1349,11 +1338,7 @@ def final_cleanup(): with transaction.atomic(): - #logger.debug("Flushing incomplete RRDP snapshots and retrieval objects") - - q = Retrieval.objects - q = q.filter(finished__isnull = True) - q.delete() + #logger.debug("Flushing incomplete RRDP snapshots") q = RRDPSnapshot.objects q = q.filter(retrieved__isnull = True) |