diff options
-rwxr-xr-x | rp/rcynic/rcynicng | 257 | ||||
-rw-r--r-- | rpki/rcynicdb/migrations/0002_auto_20160227_2003.py | 29 | ||||
-rw-r--r-- | rpki/rcynicdb/models.py | 6 |
3 files changed, 212 insertions, 80 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index b3cda8a5..cac40317 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -10,6 +10,7 @@ import os import sys import ssl import time +import copy import errno import shutil import socket @@ -33,7 +34,7 @@ import rpki.relaxng from rpki.oids import id_kp_bgpsec_router -from lxml.etree import ElementTree, Element, SubElement, Comment, XML +from lxml.etree import ElementTree, Element, SubElement, Comment, XML, XMLPullParser, DocumentInvalid, XMLSyntaxError logger = logging.getLogger("rcynicng") @@ -144,7 +145,7 @@ class X509(rpki.POW.X509): uri = uri, aki = "" if aki is None else aki.encode("hex"), ski = "" if ski is None else ski.encode("hex"), - sha256 = sha256(der).encode("hex"), + sha256 = sha256hex(der), retrieved = retrieval)) @property @@ -266,7 +267,7 @@ class CRL(rpki.POW.CRL): uri = uri, aki = "" if aki is None else aki.encode("hex"), ski = "", - sha256 = sha256(der).encode("hex"), + sha256 = sha256hex(der), retrieved = retrieval)) @property @@ -336,7 +337,7 @@ class CMS_Mixin(object): uri = uri, aki = "" if aki is None else aki.encode("hex"), ski = "" if ski is None else ski.encode("hex"), - sha256 = sha256(der).encode("hex"), + sha256 = sha256hex(der), retrieved = retrieval)) @property @@ -727,10 +728,10 @@ def first_rsync_uri(uris): def first_https_uri(uris): return first_uri(uris, "https://") -def sha256(bytes): +def sha256hex(bytes): d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) d.update(bytes) - return d.digest() + return d.digest().encode("hex") class RRDP_ParseFailure(Exception): @@ -740,6 +741,70 @@ class DeadHost(Exception): "Host recently tried and known to be unavailable." +class RRDPPullParser(object): + """ + An XML pull-parser for RRDP <snapshot/> and <delta/> messages. + """ + + def __init__(self, handlers, validate = False): + self.parser = XMLPullParser() + self.events = self.parser.read_events() + self.handlers = handlers + self.validate = validate + self._digest = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + + @staticmethod + def _get_path(x): + path = [] + while x is not None: + path.insert(0, x) + x = x.getparent() + return path + + # Validation is nasty, but our options in mid-parse are limited + # once we've started modifying the document, which we kind of have + # to do given that the entire point of iterative parsing is to + # conserve memory by discarding elements as soon as we've + # processed them. Fortunately, RRDP snapshots and deltas have + # such simple structure that we can get away with reconstructing + # and validating just the current branch. This is a total kludge, + # and doesn't work even for RRDP notifications, much less + # generalized XML documents, but by happy chance it does work in + # the two cases where we really need it. + + @staticmethod + def _validate(path): + root = None + for this in reversed(path): + child = root + root = copy.deepcopy(this) + del root[:] + if child is not None: + root.append(child) + rpki.relaxng.rrdp.schema.assertValid(root) + + def __call__(self, data, retrieval): + self.parser.feed(data) + self._digest.update(data) + for action, element in self.events: + path = self._get_path(element) + tags = tuple(x.tag for x in path) + if len(path) < 2: + continue + if self.validate: + self._validate(path) + if tags in self.handlers: + path.insert(0, retrieval) + self.handlers[tags](*path) + element.clear() + while element.getprevious() is not None: + del element.getparent()[0] + + @property + def sha256hex(self): + return self._digest.digest().encode("hex") + + class Fetcher(object): """ Network transfer methods and history database. @@ -810,7 +875,7 @@ class Fetcher(object): if self.uri.startswith("rsync://"): return self._rsync_fetch() if self.uri.startswith("https://"): - return self._https_fetch() if self.ta else self._rrdp_fetch() + return self._https_fetch_ta() if self.ta else self._rrdp_fetch() raise ValueError @tornado.gen.coroutine @@ -903,7 +968,7 @@ class Fetcher(object): yield path @tornado.gen.coroutine - def _https_fetch_url(self, url): + def _https_fetch_url(self, url, pull_parser = None): if urlparse.urlparse(url).netloc in self._https_deadhosts: raise DeadHost @@ -929,29 +994,20 @@ class Fetcher(object): # Would also need to pull timestamp from the Last-Modified # header in the response object. - # One might reasonably ask why we set validate_cert = False - # here, given that doing so is generally a horrible idea which - # leaves TLS open to MitM attacks. The answer is simple: we - # really don't care, because the underlying data are both - # public and signed, and because we have no usable trust - # relationship with the server. In other words, this is all - # object security, not channel security. For all practical - # purposes, we might as well be using plain HTTP, but that's - # politically unpopular in the IETF these days. So we - # encrypt, apparently just to give the NSA something to do. - # - # Perhaps some day we'll enable TLS certificate validation - # here so that we can whine about failures, but at this point - # it's not worth the hassle of figuring out which TLS trust - # anchors to configure, requiring the bucket of trust anchors - # that the open source web browsers use, or any of that mess. - try: ok = False t0 = time.time() - client = tornado.httpclient.AsyncHTTPClient() + retrieval = Retrieval.objects.create( + uri = url, + started = rpki.sundial.datetime.fromtimestamp(t0)) + if pull_parser is None: + streaming_callback = None + else: + streaming_callback = lambda data: pull_parser(data, retrieval) + client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size) response = yield client.fetch(url, - validate_cert = False, + streaming_callback = streaming_callback, + validate_cert = args.validate_https, connect_timeout = args.https_timeout, request_timeout = args.https_timeout) # Might want to check response Content-Type here @@ -974,16 +1030,14 @@ class Fetcher(object): finally: t1 = time.time() logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) - retrieval = Retrieval.objects.create( - uri = url, - started = rpki.sundial.datetime.fromtimestamp(t0), - finished = rpki.sundial.datetime.fromtimestamp(t1), - successful = ok) + retrieval.finished = rpki.sundial.datetime.fromtimestamp(t1) + retrieval.successful = ok + retrieval.save() - raise tornado.gen.Return((retrieval, response.body if ok else None)) + raise tornado.gen.Return((retrieval, response)) @tornado.gen.coroutine - def _https_fetch(self): + def _https_fetch_ta(self): if args.no_fetch: return @@ -997,8 +1051,8 @@ class Fetcher(object): self._rsync_history[self.uri] = self try: - retrieval, der = yield self._https_fetch_url(self.uri) - X509.store_if_new(der, self.uri, retrieval) + retrieval, response = yield self._https_fetch_url(self.uri) + X509.store_if_new(response.body, self.uri, retrieval) except: logger.exception("Couldn't load %s", self.uri) @@ -1008,30 +1062,55 @@ class Fetcher(object): pending.notify_all() @tornado.gen.coroutine - def _rrdp_fetch_url(self, url, tag, hash = None, session_id = None, serial = None): - - retrieval, xml = yield self._https_fetch_url(url) + def _rrdp_fetch_notification(self, url): - if hash is not None and sha256(xml).encode("hex") != hash.lower(): - raise RRDP_ParseFailure("Expected RRDP hash %s for %s, got %s" % ( - hash.lower(), url, sha256(xml).encode("hex"))) + retrieval, response = yield self._https_fetch_url(url) - xml = XML(xml) - rpki.relaxng.rrdp.schema.assertValid(xml) + notification = ElementTree(file = response.buffer).getroot() - if tag is not None and xml.tag != rpki.relaxng.rrdp.xmlns + tag: - raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % ( - tag, delta_url, xml.tag)) + rpki.relaxng.rrdp.schema.assertValid(notification) - if session_id is not None and xml.get("session_id") != session_id: - raise RRDP_ParseFailure("Expected RRDP session_id %s for %s, got %s" % ( - session_id, url, xml.get("session_id"))) + if notification.tag != rpki.relaxng.rrdp.xmlns + "notification": + raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag)) - if serial is not None and long(xml.get("serial")) != long(serial): - raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % ( - serial, url, xml.get("serial"))) + raise tornado.gen.Return((retrieval, notification)) + + @tornado.gen.coroutine + def _rrdp_fetch_delta(self, url, hash, session_id, serial): + + # We'll probably want to convert this to using a pull-parser, + # but doing so is more work and less urgent than converting + # snapshots to use one. + + retrieval, response = yield self._https_fetch_url(url) + + h = sha256hex(response.body) + if h != hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, h)) + + delta = XML(response.body) + rpki.relaxng.rrdp.schema.assertValid(delta) + + if delta.tag != rpki.relaxng.rrdp.xmlns + "delta": + raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag)) + + if delta.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(session_id, url, delta.get("session_id"))) + + if long(delta.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(serial, url, delta.get("serial"))) - raise tornado.gen.Return((retrieval, xml)) + raise tornado.gen.Return((retrieval, delta)) + + @tornado.gen.coroutine + def _rrdp_fetch_snapshot(self, url, hash, pull_parser): + + retrieval, response = yield self._https_fetch_url(url, pull_parser) + + if pull_parser.sha256hex != hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, pull_parser.sha256hex)) + + raise tornado.gen.Return(retrieval) @tornado.gen.coroutine def _rrdp_fetch(self): @@ -1049,7 +1128,7 @@ class Fetcher(object): self._https_history[self.uri] = self try: - retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification") + retrieval, notification = yield self._rrdp_fetch_notification(url = self.uri) session_id = notification.get("session_id") serial = long(notification.get("serial")) @@ -1078,26 +1157,36 @@ class Fetcher(object): logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) - retrieval, x = yield self._rrdp_fetch_url( - url = url, - hash = hash, - tag = "snapshot", - session_id = session_id, - serial = serial) - - with transaction.atomic(): - if snapshot is not None: - snapshot.delete() - snapshot = RRDPSnapshot.objects.create(retrieved = retrieval, session_id = session_id, serial = serial) - for x in x.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): - uri = x.get("uri") - cls = uri_to_class(uri) - if cls is None: - raise RRDP_ParseFailure("Unexpected URI %s" % uri) - obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) - obj.snapshot.add(snapshot) - obj.save() - logger.debug("RRDP %s committing snapshot %s serial %s", self.uri, url, serial) + snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) + + # Fetching and processing a huge snapshot file can take quite a while. + # If we need to warn the user about this, see header_callback (companion + # to streaming_callback) which would let us see the content-length early + # enough to issue a useful warning. + + def snapshot_publish_handler(retrieval, root, publish): + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + serial, url, root.get("serial"))) + uri = publish.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) + obj, created = cls.store_if_new(publish.text.decode("base64"), uri, retrieval) + obj.snapshot.add(snapshot) + obj.save() + + retrieval = yield self._rrdp_fetch_snapshot( + url = url, + hash = hash, + pull_parser = RRDPPullParser({(rpki.relaxng.rrdp.xmlns + "snapshot", + rpki.relaxng.rrdp.xmlns + "publish") : snapshot_publish_handler})) + + snapshot.retrieved = retrieval + snapshot.save() else: logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, @@ -1112,10 +1201,9 @@ class Fetcher(object): while deltas and len(futures) < args.fetch_ahead_goal: serial, url, hash = deltas.pop(0) logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url) - futures.append(self._rrdp_fetch_url( + futures.append(self._rrdp_fetch_delta( url = url, hash = hash, - tag = "delta", session_id = session_id, serial = serial)) @@ -1261,6 +1349,16 @@ def final_cleanup(): with transaction.atomic(): + #logger.debug("Flushing incomplete RRDP snapshots and retrieval objects") + + q = RRDPSnapshot.objects + q = q.filter(finished__isnull = True) + q.delete() + + q = Retrieval.objects + q = q.filter(successful__isnull = True) + q.delete() + #logger.debug("Flushing old authenticated sets") q = Authenticated.objects @@ -1340,6 +1438,11 @@ def main(): parser.add_argument("--https-timeout", default = 300, type = posint) + parser.add_argument("--validate-https", action = "store_true") + + parser.add_argument("--max-https-body-size", type = posint, default = 512 * 1024 * 1024) + + # We already have a whole bunch of logging control code in # rpki.log, just need to figure out / remember how to use it # properly. See rpki.log.init() & rpki.log.argparse_setup(). diff --git a/rpki/rcynicdb/migrations/0002_auto_20160227_2003.py b/rpki/rcynicdb/migrations/0002_auto_20160227_2003.py new file mode 100644 index 00000000..9c3acecb --- /dev/null +++ b/rpki/rcynicdb/migrations/0002_auto_20160227_2003.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('rcynicdb', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='retrieval', + name='finished', + field=models.DateTimeField(null=True), + ), + migrations.AlterField( + model_name='retrieval', + name='successful', + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name='rrdpsnapshot', + name='retrieved', + field=models.OneToOneField(null=True, to='rcynicdb.Retrieval'), + ), + ] diff --git a/rpki/rcynicdb/models.py b/rpki/rcynicdb/models.py index 185482b1..630164a3 100644 --- a/rpki/rcynicdb/models.py +++ b/rpki/rcynicdb/models.py @@ -7,8 +7,8 @@ from django.db import models class Retrieval(models.Model): uri = models.TextField() started = models.DateTimeField() - finished = models.DateTimeField() - successful = models.BooleanField() + finished = models.DateTimeField(null = True) + successful = models.BooleanField(default = False) # Collection of validated objects. @@ -21,7 +21,7 @@ class Authenticated(models.Model): class RRDPSnapshot(models.Model): session_id = models.UUIDField() serial = models.BigIntegerField() - retrieved = models.OneToOneField(Retrieval) + retrieved = models.OneToOneField(Retrieval, null = True) # RPKI objects. # |