aboutsummaryrefslogtreecommitdiff
path: root/rp
diff options
context:
space:
mode:
Diffstat (limited to 'rp')
-rwxr-xr-xrp/rcynic/rcynicng201
1 files changed, 93 insertions, 108 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index cad0c505..553b32e5 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -16,6 +16,7 @@ import shutil
import socket
import logging
import argparse
+import tempfile
import urlparse
import subprocess
@@ -34,10 +35,18 @@ import rpki.relaxng
from rpki.oids import id_kp_bgpsec_router
-from lxml.etree import ElementTree, Element, SubElement, Comment, XML, XMLPullParser, DocumentInvalid, XMLSyntaxError
+from lxml.etree import ElementTree, Element, SubElement, Comment, XML, DocumentInvalid, XMLSyntaxError, iterparse
logger = logging.getLogger("rcynicng")
+xmlns = rpki.relaxng.rrdp.xmlns
+
+tag_delta = xmlns + "delta"
+tag_notification = xmlns + "notification"
+tag_publish = xmlns + "publish"
+tag_snapshot = xmlns + "snapshot"
+tag_withdraw = xmlns + "withdraw"
+
codes = rpki.POW.validation_status
@@ -741,70 +750,6 @@ class DeadHost(Exception):
"Host recently tried and known to be unavailable."
-class RRDPPullParser(object):
- """
- An XML pull-parser for RRDP <snapshot/> and <delta/> messages.
- """
-
- def __init__(self, handlers, validate = False):
- self.parser = XMLPullParser()
- self.events = self.parser.read_events()
- self.handlers = handlers
- self.validate = validate
- self._digest = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
-
- @staticmethod
- def _get_path(x):
- path = []
- while x is not None:
- path.insert(0, x)
- x = x.getparent()
- return path
-
- # Validation is nasty, but our options in mid-parse are limited
- # once we've started modifying the document, which we kind of have
- # to do given that the entire point of iterative parsing is to
- # conserve memory by discarding elements as soon as we've
- # processed them. Fortunately, RRDP snapshots and deltas have
- # such simple structure that we can get away with reconstructing
- # and validating just the current branch. This is a total kludge,
- # and doesn't work even for RRDP notifications, much less
- # generalized XML documents, but by happy chance it does work in
- # the two cases where we really need it.
-
- @staticmethod
- def _validate(path):
- root = None
- for this in reversed(path):
- child = root
- root = copy.deepcopy(this)
- del root[:]
- if child is not None:
- root.append(child)
- rpki.relaxng.rrdp.schema.assertValid(root)
-
- def __call__(self, data, retrieval):
- self.parser.feed(data)
- self._digest.update(data)
- for action, element in self.events:
- path = self._get_path(element)
- tags = tuple(x.tag for x in path)
- if len(path) < 2:
- continue
- if self.validate:
- self._validate(path)
- if tags in self.handlers:
- path.insert(0, retrieval)
- self.handlers[tags](*path)
- element.clear()
- while element.getprevious() is not None:
- del element.getparent()[0]
-
- @property
- def sha256hex(self):
- return self._digest.digest().encode("hex")
-
-
class Fetcher(object):
"""
Network transfer methods and history database.
@@ -968,7 +913,7 @@ class Fetcher(object):
yield path
@tornado.gen.coroutine
- def _https_fetch_url(self, url, pull_parser = None):
+ def _https_fetch_url(self, url, streaming_callback = None):
if urlparse.urlparse(url).netloc in self._https_deadhosts:
raise DeadHost
@@ -997,13 +942,6 @@ class Fetcher(object):
try:
ok = False
t0 = time.time()
- retrieval = Retrieval.objects.create(
- uri = url,
- started = rpki.sundial.datetime.fromtimestamp(t0))
- if pull_parser is None:
- streaming_callback = None
- else:
- streaming_callback = lambda data: pull_parser(data, retrieval)
client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size)
response = yield client.fetch(url,
streaming_callback = streaming_callback,
@@ -1030,11 +968,13 @@ class Fetcher(object):
finally:
t1 = time.time()
logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
- retrieval.finished = rpki.sundial.datetime.fromtimestamp(t1)
- retrieval.successful = ok
- retrieval.save()
-
- raise tornado.gen.Return((retrieval, response))
+ retrieval = Retrieval.objects.create(
+ uri = url,
+ started = rpki.sundial.datetime.fromtimestamp(t0),
+ finished = rpki.sundial.datetime.fromtimestamp(t1),
+ successful = ok)
+ if ok:
+ raise tornado.gen.Return((retrieval, response))
@tornado.gen.coroutine
def _https_fetch_ta(self):
@@ -1070,7 +1010,7 @@ class Fetcher(object):
rpki.relaxng.rrdp.schema.assertValid(notification)
- if notification.tag != rpki.relaxng.rrdp.xmlns + "notification":
+ if notification.tag != tag_notification:
raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag))
raise tornado.gen.Return((retrieval, notification))
@@ -1091,7 +1031,7 @@ class Fetcher(object):
delta = XML(response.body)
rpki.relaxng.rrdp.schema.assertValid(delta)
- if delta.tag != rpki.relaxng.rrdp.xmlns + "delta":
+ if delta.tag != tag_delta:
raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag))
if delta.get("session_id") != session_id:
@@ -1113,6 +1053,22 @@ class Fetcher(object):
raise tornado.gen.Return(retrieval)
@tornado.gen.coroutine
+ def _rrdp_fetch_file(self, url, expected_hash):
+
+ sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
+ xml_file = tempfile.SpooledTemporaryFile()
+
+ retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data)))
+
+ received_hash = sha256.digest().encode("hex")
+ xml_file.seek(0)
+
+ if received_hash != expected_hash.lower():
+ raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash))
+
+ raise tornado.gen.Return((retrieval, response, xml_file))
+
+ @tornado.gen.coroutine
def _rrdp_fetch(self):
from django.db import transaction
@@ -1144,50 +1100,83 @@ class Fetcher(object):
return
deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
- for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
+ for delta in notification.iterchildren(tag_delta))
if snapshot is None or snapshot.serial + 1 not in deltas:
if snapshot is not None:
logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
- x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
+ x = notification.find(tag_snapshot)
url, hash = x.get("uri"), x.get("hash")
logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial)
+ retrieval, response, xml_file = yield self._rrdp_fetch_file(url, hash)
+
snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial)
- # Fetching and processing a huge snapshot file can take quite a while.
- # If we need to warn the user about this, see header_callback (companion
- # to streaming_callback) which would let us see the content-length early
- # enough to issue a useful warning.
-
- def snapshot_publish_handler(retrieval, root, publish):
- if root.get("session_id") != session_id:
- raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(
- session_id, url, root.get("session_id")))
- if long(root.get("serial")) != long(serial):
- raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(
- serial, url, root.get("serial")))
- uri = publish.get("uri")
+ root = None
+ count = 0
+
+ for event, node in iterparse(xml_file):
+ if node is root:
+ continue
+
+ if root is None:
+ root = node.getparent()
+ if root is None or root.tag != tag_snapshot:
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url))
+ if root.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(
+ session_id, url, root.get("session_id")))
+ if long(root.get("serial")) != long(serial):
+ raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(
+ serial, url, root.get("serial")))
+
+ if node.tag != tag_publish or node.getparent() is not root:
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url))
+
+ count += 1
+ if count % 1000 == 0:
+ logger.debug("Parsing %s object %s", url, count)
+
+ uri = node.get("uri")
cls = uri_to_class(uri)
if cls is None:
raise RRDP_ParseFailure("Unexpected URI {}".format(uri))
- obj, created = cls.store_if_new(publish.text.decode("base64"), uri, retrieval)
+
+ # This next bit, sadly, is hideously slow for large snapshots. Django does have a bulk
+ # creation operation of sorts, but using it here would be trickly because the record may already
+ # exist. It's possible that we can do smoething clever here using the .exists() queryset
+ # operator to figure out whether we should add something to the bulk insertion. Don't really
+ # know that it would be faster, but it might, since read operations are probably faster than
+ # reads interleaved with writes.
+ #
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.exists
+ # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#bulk-create
+ #
+ # ...but doc says .bulk_create() doesn't work with many-to-many relationships, oops. Feh.
+ # Well, maybe we can work around that, since "retrieval" already gives us the exact set of
+ # objects for which we'd need to patch up the many-to-many relationship pointer. Depends
+ # on what they mean by "doesn't work": automation not complete vs mysterious failure.
+
+ obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval)
obj.snapshot.add(snapshot)
obj.save()
- retrieval = yield self._rrdp_fetch_snapshot(
- url = url,
- hash = hash,
- pull_parser = RRDPPullParser({(rpki.relaxng.rrdp.xmlns + "snapshot",
- rpki.relaxng.rrdp.xmlns + "publish") : snapshot_publish_handler}))
+ node.clear()
+ while node.getprevious() is not None:
+ del root[0]
+
+ yield tornado.gen.moment
snapshot.retrieved = retrieval
snapshot.save()
+ xml_file.close()
+
else:
logger.debug("RRDP %s %s deltas (%s--%s)", self.uri,
(serial - snapshot.serial), snapshot.serial, serial)
@@ -1218,12 +1207,12 @@ class Fetcher(object):
snapshot.serial = serial
snapshot.save()
- for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"):
+ for x in delta.iterchildren(tag_withdraw):
snapshot.rpkiobject_set.remove(
snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
snapshot.save()
- for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
+ for x in delta.iterchildren(tag_publish):
uri = x.get("uri")
cls = uri_to_class(uri)
if cls is None:
@@ -1349,11 +1338,7 @@ def final_cleanup():
with transaction.atomic():
- #logger.debug("Flushing incomplete RRDP snapshots and retrieval objects")
-
- q = Retrieval.objects
- q = q.filter(finished__isnull = True)
- q.delete()
+ #logger.debug("Flushing incomplete RRDP snapshots")
q = RRDPSnapshot.objects
q = q.filter(retrieved__isnull = True)