From a0a187c11c628694e075d6150ead6ab86abe1448 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Mon, 29 Feb 2016 13:45:15 +0000 Subject: Use lxml.etree.iterparse() for delta files too. svn path=/branches/tk705/; revision=6300 --- rp/rcynic/rcynicng | 111 +++++++++++++++++++++-------------------------------- 1 file changed, 44 insertions(+), 67 deletions(-) (limited to 'rp') diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 553b32e5..024e55e4 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -1015,43 +1015,6 @@ class Fetcher(object): raise tornado.gen.Return((retrieval, notification)) - @tornado.gen.coroutine - def _rrdp_fetch_delta(self, url, hash, session_id, serial): - - # We'll probably want to convert this to using a pull-parser, - # but doing so is more work and less urgent than converting - # snapshots to use one. - - retrieval, response = yield self._https_fetch_url(url) - - h = sha256hex(response.body) - if h != hash.lower(): - raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, h)) - - delta = XML(response.body) - rpki.relaxng.rrdp.schema.assertValid(delta) - - if delta.tag != tag_delta: - raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag)) - - if delta.get("session_id") != session_id: - raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(session_id, url, delta.get("session_id"))) - - if long(delta.get("serial")) != long(serial): - raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(serial, url, delta.get("serial"))) - - raise tornado.gen.Return((retrieval, delta)) - - @tornado.gen.coroutine - def _rrdp_fetch_snapshot(self, url, hash, pull_parser): - - retrieval, response = yield self._https_fetch_url(url, pull_parser) - - if pull_parser.sha256hex != hash.lower(): - raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, pull_parser.sha256hex)) - - raise tornado.gen.Return(retrieval) - @tornado.gen.coroutine def _rrdp_fetch_file(self, url, expected_hash): @@ -1156,6 +1119,7 @@ class Fetcher(object): # # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create + # https://docs.djangoproject.com/en/1.9/ref/models/relations/ # # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh. # Well, maybe we can work around that, since "retrieval" already gives us the exact set of @@ -1164,7 +1128,6 @@ class Fetcher(object): obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) obj.snapshot.add(snapshot) - obj.save() node.clear() while node.getprevious() is not None: @@ -1190,40 +1153,54 @@ class Fetcher(object): while deltas and len(futures) < args.fetch_ahead_goal: serial, url, hash = deltas.pop(0) logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url) - futures.append(self._rrdp_fetch_delta( - url = url, - hash = hash, - session_id = session_id, - serial = serial)) + futures.append(self._rrdp_fetch_file(url, hash)) - retrieval, delta = yield futures.pop(0) + retrieval, response, xml_file = yield futures.pop(0) - serial = long(delta.get("serial")) - assert serial == snapshot.serial + 1 - - logger.debug("RRDP %s serial %s loading", self.uri, serial) + root = None + count = 0 with transaction.atomic(): - snapshot.serial = serial + snapshot.serial += 1 snapshot.save() - - for x in delta.iterchildren(tag_withdraw): - snapshot.rpkiobject_set.remove( - snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) - snapshot.save() - - for x in delta.iterchildren(tag_publish): - uri = x.get("uri") - cls = uri_to_class(uri) - if cls is None: - raise RRDP_ParseFailure("Unexpected URI %s" % uri) - if x.get("hash", None) is not None: - snapshot.rpkiobject_set.remove( - snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) - snapshot.save() - obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) - obj.snapshot.add(snapshot) - obj.save() + logger.debug("RRDP %s serial %s loading", self.uri, snapshot.serial) + + for event, node in iterparse(xml_file): + if node is root: + continue + + if root is None: + root = node.getparent() + if root is None or root.tag != tag_delta: + raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != snapshot.serial: + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + snapshot.serial, url, root.get("serial"))) + + if node.tag not in (tag_publish, tag_withdraw) or node.getparent() is not root: + raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) + + if node.tag == tag_withdraw or node.get("hash") is not None: + snapshot.rpkiobject_set.remove(snapshot.rpkiobject_set.get(sha256 = node.get("hash").lower())) + + if node.tag == tag_publish: + uri = node.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI %s" % uri) + obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) + obj.snapshot.add(snapshot) + + node.clear() + while node.getprevious() is not None: + del root[0] + + #yield tornado.gen.moment + + xml_file.close() logger.debug("RRDP %s done processing deltas", self.uri) -- cgit v1.2.3