diff options
author | Rob Austein <sra@hactrn.net> | 2016-02-29 12:54:08 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-02-29 12:54:08 +0000 |
commit | 5d9a760dd1cef0f647b4ba206a87e4d9d5dee2c2 (patch) | |
tree | 6cd010adfa6903603145055b94b6e19188b987ef | |
parent | 632e40d8f23c68ac0e0b98b83aa538adcec04a33 (diff) |
Stash unparsed XML in a temporary file so we can get off the phone
quickly when processing large snapshots which take (much) longer to
load into SQL than to pull down from the net. Given this change,
lxml.etree.iterparse() is a more suitable API than
lxml.etree.XMLPullParser, so switch.
This version can download and process a 120,000 object snapshot,
albeit hideously slowly.
svn path=/branches/tk705/; revision=6299
-rwxr-xr-x | rp/rcynic/rcynicng | 201 |
1 files changed, 93 insertions, 108 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index cad0c505..553b32e5 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -16,6 +16,7 @@ import shutil import socket import logging import argparse +import tempfile import urlparse import subprocess @@ -34,10 +35,18 @@ import rpki.relaxng from rpki.oids import id_kp_bgpsec_router -from lxml.etree import ElementTree, Element, SubElement, Comment, XML, XMLPullParser, DocumentInvalid, XMLSyntaxError +from lxml.etree import ElementTree, Element, SubElement, Comment, XML, DocumentInvalid, XMLSyntaxError, iterparse logger = logging.getLogger("rcynicng") +xmlns = rpki.relaxng.rrdp.xmlns + +tag_delta = xmlns + "delta" +tag_notification = xmlns + "notification" +tag_publish = xmlns + "publish" +tag_snapshot = xmlns + "snapshot" +tag_withdraw = xmlns + "withdraw" + codes = rpki.POW.validation_status @@ -741,70 +750,6 @@ class DeadHost(Exception): "Host recently tried and known to be unavailable." -class RRDPPullParser(object): - """ - An XML pull-parser for RRDP <snapshot/> and <delta/> messages. - """ - - def __init__(self, handlers, validate = False): - self.parser = XMLPullParser() - self.events = self.parser.read_events() - self.handlers = handlers - self.validate = validate - self._digest = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) - - @staticmethod - def _get_path(x): - path = [] - while x is not None: - path.insert(0, x) - x = x.getparent() - return path - - # Validation is nasty, but our options in mid-parse are limited - # once we've started modifying the document, which we kind of have - # to do given that the entire point of iterative parsing is to - # conserve memory by discarding elements as soon as we've - # processed them. Fortunately, RRDP snapshots and deltas have - # such simple structure that we can get away with reconstructing - # and validating just the current branch. This is a total kludge, - # and doesn't work even for RRDP notifications, much less - # generalized XML documents, but by happy chance it does work in - # the two cases where we really need it. - - @staticmethod - def _validate(path): - root = None - for this in reversed(path): - child = root - root = copy.deepcopy(this) - del root[:] - if child is not None: - root.append(child) - rpki.relaxng.rrdp.schema.assertValid(root) - - def __call__(self, data, retrieval): - self.parser.feed(data) - self._digest.update(data) - for action, element in self.events: - path = self._get_path(element) - tags = tuple(x.tag for x in path) - if len(path) < 2: - continue - if self.validate: - self._validate(path) - if tags in self.handlers: - path.insert(0, retrieval) - self.handlers[tags](*path) - element.clear() - while element.getprevious() is not None: - del element.getparent()[0] - - @property - def sha256hex(self): - return self._digest.digest().encode("hex") - - class Fetcher(object): """ Network transfer methods and history database. @@ -968,7 +913,7 @@ class Fetcher(object): yield path @tornado.gen.coroutine - def _https_fetch_url(self, url, pull_parser = None): + def _https_fetch_url(self, url, streaming_callback = None): if urlparse.urlparse(url).netloc in self._https_deadhosts: raise DeadHost @@ -997,13 +942,6 @@ class Fetcher(object): try: ok = False t0 = time.time() - retrieval = Retrieval.objects.create( - uri = url, - started = rpki.sundial.datetime.fromtimestamp(t0)) - if pull_parser is None: - streaming_callback = None - else: - streaming_callback = lambda data: pull_parser(data, retrieval) client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size) response = yield client.fetch(url, streaming_callback = streaming_callback, @@ -1030,11 +968,13 @@ class Fetcher(object): finally: t1 = time.time() logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) - retrieval.finished = rpki.sundial.datetime.fromtimestamp(t1) - retrieval.successful = ok - retrieval.save() - - raise tornado.gen.Return((retrieval, response)) + retrieval = Retrieval.objects.create( + uri = url, + started = rpki.sundial.datetime.fromtimestamp(t0), + finished = rpki.sundial.datetime.fromtimestamp(t1), + successful = ok) + if ok: + raise tornado.gen.Return((retrieval, response)) @tornado.gen.coroutine def _https_fetch_ta(self): @@ -1070,7 +1010,7 @@ class Fetcher(object): rpki.relaxng.rrdp.schema.assertValid(notification) - if notification.tag != rpki.relaxng.rrdp.xmlns + "notification": + if notification.tag != tag_notification: raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag)) raise tornado.gen.Return((retrieval, notification)) @@ -1091,7 +1031,7 @@ class Fetcher(object): delta = XML(response.body) rpki.relaxng.rrdp.schema.assertValid(delta) - if delta.tag != rpki.relaxng.rrdp.xmlns + "delta": + if delta.tag != tag_delta: raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag)) if delta.get("session_id") != session_id: @@ -1113,6 +1053,22 @@ class Fetcher(object): raise tornado.gen.Return(retrieval) @tornado.gen.coroutine + def _rrdp_fetch_file(self, url, expected_hash): + + sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + xml_file = tempfile.SpooledTemporaryFile() + + retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data))) + + received_hash = sha256.digest().encode("hex") + xml_file.seek(0) + + if received_hash != expected_hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash)) + + raise tornado.gen.Return((retrieval, response, xml_file)) + + @tornado.gen.coroutine def _rrdp_fetch(self): from django.db import transaction @@ -1144,50 +1100,83 @@ class Fetcher(object): return deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) - for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) + for delta in notification.iterchildren(tag_delta)) if snapshot is None or snapshot.serial + 1 not in deltas: if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) - x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot") + x = notification.find(tag_snapshot) url, hash = x.get("uri"), x.get("hash") logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) + retrieval, response, xml_file = yield self._rrdp_fetch_file(url, hash) + snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) - # Fetching and processing a huge snapshot file can take quite a while. - # If we need to warn the user about this, see header_callback (companion - # to streaming_callback) which would let us see the content-length early - # enough to issue a useful warning. - - def snapshot_publish_handler(retrieval, root, publish): - if root.get("session_id") != session_id: - raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( - session_id, url, root.get("session_id"))) - if long(root.get("serial")) != long(serial): - raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( - serial, url, root.get("serial"))) - uri = publish.get("uri") + root = None + count = 0 + + for event, node in iterparse(xml_file): + if node is root: + continue + + if root is None: + root = node.getparent() + if root is None or root.tag != tag_snapshot: + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + serial, url, root.get("serial"))) + + if node.tag != tag_publish or node.getparent() is not root: + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + + count += 1 + if count % 1000 == 0: + logger.debug("Parsing %s object %s", url, count) + + uri = node.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) - obj, created = cls.store_if_new(publish.text.decode("base64"), uri, retrieval) + + # This next bit, sadly, is hideously slow for large snapshots. Django does have a bulk + # creation operation of sorts, but using it here would be trickly because the record may already + # exist. It's possible that we can do smoething clever here using the .exists() queryset + # operator to figure out whether we should add something to the bulk insertion. Don't really + # know that it would be faster, but it might, since read operations are probably faster than + # reads interleaved with writes. + # + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists + # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create + # + # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh. + # Well, maybe we can work around that, since "retrieval" already gives us the exact set of + # objects for which we'd need to patch up the many-to-many relationship pointer. Depends + # on what they mean by "doesn't work": automation not complete vs mysterious failure. + + obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) obj.snapshot.add(snapshot) obj.save() - retrieval = yield self._rrdp_fetch_snapshot( - url = url, - hash = hash, - pull_parser = RRDPPullParser({(rpki.relaxng.rrdp.xmlns + "snapshot", - rpki.relaxng.rrdp.xmlns + "publish") : snapshot_publish_handler})) + node.clear() + while node.getprevious() is not None: + del root[0] + + yield tornado.gen.moment snapshot.retrieved = retrieval snapshot.save() + xml_file.close() + else: logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, (serial - snapshot.serial), snapshot.serial, serial) @@ -1218,12 +1207,12 @@ class Fetcher(object): snapshot.serial = serial snapshot.save() - for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"): + for x in delta.iterchildren(tag_withdraw): snapshot.rpkiobject_set.remove( snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) snapshot.save() - for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): + for x in delta.iterchildren(tag_publish): uri = x.get("uri") cls = uri_to_class(uri) if cls is None: @@ -1349,11 +1338,7 @@ def final_cleanup(): with transaction.atomic(): - #logger.debug("Flushing incomplete RRDP snapshots and retrieval objects") - - q = Retrieval.objects - q = q.filter(finished__isnull = True) - q.delete() + #logger.debug("Flushing incomplete RRDP snapshots") q = RRDPSnapshot.objects q = q.filter(retrieved__isnull = True) |