aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic/rcynicng
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-02-27 23:03:35 +0000
committerRob Austein <sra@hactrn.net>2016-02-27 23:03:35 +0000
commit6a21a007ea834c2c7641e045f2bedf5d0203da77 (patch)
tree2353baa21083133798f722888dfcbca4b35f18c6 /rp/rcynic/rcynicng
parent0b0d0d6506a107690e62341cc1b630eea96e4f35 (diff)
Use an incremental parser for snapshot files.
svn path=/branches/tk705/; revision=6297
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-xrp/rcynic/rcynicng257
1 files changed, 180 insertions, 77 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index b3cda8a5..cac40317 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -10,6 +10,7 @@ import os
import sys
import ssl
import time
+import copy
import errno
import shutil
import socket
@@ -33,7 +34,7 @@ import rpki.relaxng
from rpki.oids import id_kp_bgpsec_router
-from lxml.etree import ElementTree, Element, SubElement, Comment, XML
+from lxml.etree import ElementTree, Element, SubElement, Comment, XML, XMLPullParser, DocumentInvalid, XMLSyntaxError
logger = logging.getLogger("rcynicng")
@@ -144,7 +145,7 @@ class X509(rpki.POW.X509):
uri = uri,
aki = "" if aki is None else aki.encode("hex"),
ski = "" if ski is None else ski.encode("hex"),
- sha256 = sha256(der).encode("hex"),
+ sha256 = sha256hex(der),
retrieved = retrieval))
@property
@@ -266,7 +267,7 @@ class CRL(rpki.POW.CRL):
uri = uri,
aki = "" if aki is None else aki.encode("hex"),
ski = "",
- sha256 = sha256(der).encode("hex"),
+ sha256 = sha256hex(der),
retrieved = retrieval))
@property
@@ -336,7 +337,7 @@ class CMS_Mixin(object):
uri = uri,
aki = "" if aki is None else aki.encode("hex"),
ski = "" if ski is None else ski.encode("hex"),
- sha256 = sha256(der).encode("hex"),
+ sha256 = sha256hex(der),
retrieved = retrieval))
@property
@@ -727,10 +728,10 @@ def first_rsync_uri(uris):
def first_https_uri(uris):
return first_uri(uris, "https://")
-def sha256(bytes):
+def sha256hex(bytes):
d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
d.update(bytes)
- return d.digest()
+ return d.digest().encode("hex")
class RRDP_ParseFailure(Exception):
@@ -740,6 +741,70 @@ class DeadHost(Exception):
"Host recently tried and known to be unavailable."
+class RRDPPullParser(object):
+ """
+ An XML pull-parser for RRDP <snapshot/> and <delta/> messages.
+ """
+
+ def __init__(self, handlers, validate = False):
+ self.parser = XMLPullParser()
+ self.events = self.parser.read_events()
+ self.handlers = handlers
+ self.validate = validate
+ self._digest = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
+
+ @staticmethod
+ def _get_path(x):
+ path = []
+ while x is not None:
+ path.insert(0, x)
+ x = x.getparent()
+ return path
+
+ # Validation is nasty, but our options in mid-parse are limited
+ # once we've started modifying the document, which we kind of have
+ # to do given that the entire point of iterative parsing is to
+ # conserve memory by discarding elements as soon as we've
+ # processed them. Fortunately, RRDP snapshots and deltas have
+ # such simple structure that we can get away with reconstructing
+ # and validating just the current branch. This is a total kludge,
+ # and doesn't work even for RRDP notifications, much less
+ # generalized XML documents, but by happy chance it does work in
+ # the two cases where we really need it.
+
+ @staticmethod
+ def _validate(path):
+ root = None
+ for this in reversed(path):
+ child = root
+ root = copy.deepcopy(this)
+ del root[:]
+ if child is not None:
+ root.append(child)
+ rpki.relaxng.rrdp.schema.assertValid(root)
+
+ def __call__(self, data, retrieval):
+ self.parser.feed(data)
+ self._digest.update(data)
+ for action, element in self.events:
+ path = self._get_path(element)
+ tags = tuple(x.tag for x in path)
+ if len(path) < 2:
+ continue
+ if self.validate:
+ self._validate(path)
+ if tags in self.handlers:
+ path.insert(0, retrieval)
+ self.handlers[tags](*path)
+ element.clear()
+ while element.getprevious() is not None:
+ del element.getparent()[0]
+
+ @property
+ def sha256hex(self):
+ return self._digest.digest().encode("hex")
+
+
class Fetcher(object):
"""
Network transfer methods and history database.
@@ -810,7 +875,7 @@ class Fetcher(object):
if self.uri.startswith("rsync://"):
return self._rsync_fetch()
if self.uri.startswith("https://"):
- return self._https_fetch() if self.ta else self._rrdp_fetch()
+ return self._https_fetch_ta() if self.ta else self._rrdp_fetch()
raise ValueError
@tornado.gen.coroutine
@@ -903,7 +968,7 @@ class Fetcher(object):
yield path
@tornado.gen.coroutine
- def _https_fetch_url(self, url):
+ def _https_fetch_url(self, url, pull_parser = None):
if urlparse.urlparse(url).netloc in self._https_deadhosts:
raise DeadHost
@@ -929,29 +994,20 @@ class Fetcher(object):
# Would also need to pull timestamp from the Last-Modified
# header in the response object.
- # One might reasonably ask why we set validate_cert = False
- # here, given that doing so is generally a horrible idea which
- # leaves TLS open to MitM attacks. The answer is simple: we
- # really don't care, because the underlying data are both
- # public and signed, and because we have no usable trust
- # relationship with the server. In other words, this is all
- # object security, not channel security. For all practical
- # purposes, we might as well be using plain HTTP, but that's
- # politically unpopular in the IETF these days. So we
- # encrypt, apparently just to give the NSA something to do.
- #
- # Perhaps some day we'll enable TLS certificate validation
- # here so that we can whine about failures, but at this point
- # it's not worth the hassle of figuring out which TLS trust
- # anchors to configure, requiring the bucket of trust anchors
- # that the open source web browsers use, or any of that mess.
-
try:
ok = False
t0 = time.time()
- client = tornado.httpclient.AsyncHTTPClient()
+ retrieval = Retrieval.objects.create(
+ uri = url,
+ started = rpki.sundial.datetime.fromtimestamp(t0))
+ if pull_parser is None:
+ streaming_callback = None
+ else:
+ streaming_callback = lambda data: pull_parser(data, retrieval)
+ client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size)
response = yield client.fetch(url,
- validate_cert = False,
+ streaming_callback = streaming_callback,
+ validate_cert = args.validate_https,
connect_timeout = args.https_timeout,
request_timeout = args.https_timeout)
# Might want to check response Content-Type here
@@ -974,16 +1030,14 @@ class Fetcher(object):
finally:
t1 = time.time()
logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
- retrieval = Retrieval.objects.create(
- uri = url,
- started = rpki.sundial.datetime.fromtimestamp(t0),
- finished = rpki.sundial.datetime.fromtimestamp(t1),
- successful = ok)
+ retrieval.finished = rpki.sundial.datetime.fromtimestamp(t1)
+ retrieval.successful = ok
+ retrieval.save()
- raise tornado.gen.Return((retrieval, response.body if ok else None))
+ raise tornado.gen.Return((retrieval, response))
@tornado.gen.coroutine
- def _https_fetch(self):
+ def _https_fetch_ta(self):
if args.no_fetch:
return
@@ -997,8 +1051,8 @@ class Fetcher(object):
self._rsync_history[self.uri] = self
try:
- retrieval, der = yield self._https_fetch_url(self.uri)
- X509.store_if_new(der, self.uri, retrieval)
+ retrieval, response = yield self._https_fetch_url(self.uri)
+ X509.store_if_new(response.body, self.uri, retrieval)
except:
logger.exception("Couldn't load %s", self.uri)
@@ -1008,30 +1062,55 @@ class Fetcher(object):
pending.notify_all()
@tornado.gen.coroutine
- def _rrdp_fetch_url(self, url, tag, hash = None, session_id = None, serial = None):
-
- retrieval, xml = yield self._https_fetch_url(url)
+ def _rrdp_fetch_notification(self, url):
- if hash is not None and sha256(xml).encode("hex") != hash.lower():
- raise RRDP_ParseFailure("Expected RRDP hash %s for %s, got %s" % (
- hash.lower(), url, sha256(xml).encode("hex")))
+ retrieval, response = yield self._https_fetch_url(url)
- xml = XML(xml)
- rpki.relaxng.rrdp.schema.assertValid(xml)
+ notification = ElementTree(file = response.buffer).getroot()
- if tag is not None and xml.tag != rpki.relaxng.rrdp.xmlns + tag:
- raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % (
- tag, delta_url, xml.tag))
+ rpki.relaxng.rrdp.schema.assertValid(notification)
- if session_id is not None and xml.get("session_id") != session_id:
- raise RRDP_ParseFailure("Expected RRDP session_id %s for %s, got %s" % (
- session_id, url, xml.get("session_id")))
+ if notification.tag != rpki.relaxng.rrdp.xmlns + "notification":
+ raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag))
- if serial is not None and long(xml.get("serial")) != long(serial):
- raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % (
- serial, url, xml.get("serial")))
+ raise tornado.gen.Return((retrieval, notification))
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch_delta(self, url, hash, session_id, serial):
+
+ # We'll probably want to convert this to using a pull-parser,
+ # but doing so is more work and less urgent than converting
+ # snapshots to use one.
+
+ retrieval, response = yield self._https_fetch_url(url)
+
+ h = sha256hex(response.body)
+ if h != hash.lower():
+ raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, h))
+
+ delta = XML(response.body)
+ rpki.relaxng.rrdp.schema.assertValid(delta)
+
+ if delta.tag != rpki.relaxng.rrdp.xmlns + "delta":
+ raise RRDP_ParseFailure("Expected RRDP delta for {}, got {}".format(url, delta.tag))
+
+ if delta.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(session_id, url, delta.get("session_id")))
+
+ if long(delta.get("serial")) != long(serial):
+ raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(serial, url, delta.get("serial")))
- raise tornado.gen.Return((retrieval, xml))
+ raise tornado.gen.Return((retrieval, delta))
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch_snapshot(self, url, hash, pull_parser):
+
+ retrieval, response = yield self._https_fetch_url(url, pull_parser)
+
+ if pull_parser.sha256hex != hash.lower():
+ raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(hash.lower(), url, pull_parser.sha256hex))
+
+ raise tornado.gen.Return(retrieval)
@tornado.gen.coroutine
def _rrdp_fetch(self):
@@ -1049,7 +1128,7 @@ class Fetcher(object):
self._https_history[self.uri] = self
try:
- retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification")
+ retrieval, notification = yield self._rrdp_fetch_notification(url = self.uri)
session_id = notification.get("session_id")
serial = long(notification.get("serial"))
@@ -1078,26 +1157,36 @@ class Fetcher(object):
logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial)
- retrieval, x = yield self._rrdp_fetch_url(
- url = url,
- hash = hash,
- tag = "snapshot",
- session_id = session_id,
- serial = serial)
-
- with transaction.atomic():
- if snapshot is not None:
- snapshot.delete()
- snapshot = RRDPSnapshot.objects.create(retrieved = retrieval, session_id = session_id, serial = serial)
- for x in x.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
- uri = x.get("uri")
- cls = uri_to_class(uri)
- if cls is None:
- raise RRDP_ParseFailure("Unexpected URI %s" % uri)
- obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(snapshot)
- obj.save()
- logger.debug("RRDP %s committing snapshot %s serial %s", self.uri, url, serial)
+ snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial)
+
+ # Fetching and processing a huge snapshot file can take quite a while.
+ # If we need to warn the user about this, see header_callback (companion
+ # to streaming_callback) which would let us see the content-length early
+ # enough to issue a useful warning.
+
+ def snapshot_publish_handler(retrieval, root, publish):
+ if root.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(
+ session_id, url, root.get("session_id")))
+ if long(root.get("serial")) != long(serial):
+ raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(
+ serial, url, root.get("serial")))
+ uri = publish.get("uri")
+ cls = uri_to_class(uri)
+ if cls is None:
+ raise RRDP_ParseFailure("Unexpected URI {}".format(uri))
+ obj, created = cls.store_if_new(publish.text.decode("base64"), uri, retrieval)
+ obj.snapshot.add(snapshot)
+ obj.save()
+
+ retrieval = yield self._rrdp_fetch_snapshot(
+ url = url,
+ hash = hash,
+ pull_parser = RRDPPullParser({(rpki.relaxng.rrdp.xmlns + "snapshot",
+ rpki.relaxng.rrdp.xmlns + "publish") : snapshot_publish_handler}))
+
+ snapshot.retrieved = retrieval
+ snapshot.save()
else:
logger.debug("RRDP %s %s deltas (%s--%s)", self.uri,
@@ -1112,10 +1201,9 @@ class Fetcher(object):
while deltas and len(futures) < args.fetch_ahead_goal:
serial, url, hash = deltas.pop(0)
logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url)
- futures.append(self._rrdp_fetch_url(
+ futures.append(self._rrdp_fetch_delta(
url = url,
hash = hash,
- tag = "delta",
session_id = session_id,
serial = serial))
@@ -1261,6 +1349,16 @@ def final_cleanup():
with transaction.atomic():
+ #logger.debug("Flushing incomplete RRDP snapshots and retrieval objects")
+
+ q = RRDPSnapshot.objects
+ q = q.filter(finished__isnull = True)
+ q.delete()
+
+ q = Retrieval.objects
+ q = q.filter(successful__isnull = True)
+ q.delete()
+
#logger.debug("Flushing old authenticated sets")
q = Authenticated.objects
@@ -1340,6 +1438,11 @@ def main():
parser.add_argument("--https-timeout", default = 300, type = posint)
+ parser.add_argument("--validate-https", action = "store_true")
+
+ parser.add_argument("--max-https-body-size", type = posint, default = 512 * 1024 * 1024)
+
+
# We already have a whole bunch of logging control code in
# rpki.log, just need to figure out / remember how to use it
# properly. See rpki.log.init() & rpki.log.argparse_setup().