aboutsummaryrefslogtreecommitdiff
path: root/rp
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-01-13 05:17:50 +0000
committerRob Austein <sra@hactrn.net>2016-01-13 05:17:50 +0000
commit5dac4badb7c67a3698628ca80320623f423e2f4d (patch)
tree17b59c897a3f55e2bb3e55e201ff94c05882a79a /rp
parente7f0dc6d3596548e8d530afd747aaca4775a15fe (diff)
Checkpoint of first cut at RRDP client code. RRDP client code not yet
tested, and final_cleanup() needs work to avoid trashing RRDP state. svn path=/branches/tk705/; revision=6222
Diffstat (limited to 'rp')
-rwxr-xr-xrp/rcynic/rcynicng222
1 files changed, 212 insertions, 10 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 12395c62..fc36c0d9 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -9,10 +9,11 @@ Reimplementation of rcynic in Python. Work in progress.
import os
import sys
import time
-import shutil
import errno
+import shutil
import logging
import argparse
+import urlparse
import subprocess
import tornado.gen
@@ -25,10 +26,11 @@ import rpki.POW
import rpki.sundial
import rpki.config
import rpki.autoconf
+import rpki.relaxng
from rpki.oids import id_kp_bgpsec_router
-from lxml.etree import ElementTree, Element, SubElement, Comment
+from lxml.etree import ElementTree, Element, SubElement, Comment, XML
logger = logging.getLogger("rcynicng")
@@ -724,6 +726,13 @@ def sha256(bytes):
return d.digest()
+class RRDP_ParseFailure(Exception):
+ "Failure parsing RRDP message."
+
+class DeadHost(Exception):
+ "Host recently tried and known to be unavailable."
+
+
class Fetcher(object):
"""
Network transfer methods and history database.
@@ -746,11 +755,14 @@ class Fetcher(object):
_rsync_deadhosts = set()
_rsync_history = dict()
- def __init__(self, uri):
+ _https_deadhosts = set()
+ _https_history = dict()
+
+ def __init__(self, uri, ta = False):
self.uri = uri
+ self.ta = ta
self.pending = None
self.status = None
- self.runtime = None
def _rsync_split_uri(self):
return tuple(self.uri.rstrip("/").split("/")[2:])
@@ -769,6 +781,8 @@ class Fetcher(object):
return False
if self.uri.startswith("rsync://"):
return self._rsync_needed()
+ if self.uri.startswith("https://"):
+ return self._https_needed()
raise ValueError
def _rsync_needed(self):
@@ -778,14 +792,23 @@ class Fetcher(object):
entry = self._rsync_find(path)
return entry is None or entry.pending is not None
+ def _https_needed(self):
+ netloc = urlparse.urlparse(self.uri).netloc
+ if netloc in self._https_deadhosts:
+ return False
+ entry = self._https_history.get(self.uri)
+ return entry is None or entry.pending is not None
+
def fetch(self):
if self.uri.startswith("rsync://"):
return self._rsync_fetch()
+ if self.uri.startswith("https://"):
+ return self._https_fetch() if self.ta else self._rrdp_fetch()
raise ValueError
@tornado.gen.coroutine
def _rsync_fetch(self):
- assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") or self.uri.endswith("/"))
+ assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") if self.ta else self.uri.endswith("/"))
if args.no_fetch:
return
@@ -833,12 +856,11 @@ class Fetcher(object):
output = yield rsync.stdout.read_until_close()
pid, self.status = os.waitpid(rsync.pid, os.WNOHANG)
t1 = time.time()
- self.runtime = t1 - t0
if (pid, self.status) == (0, 0):
logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", rsync.pid)
for line in output.splitlines():
logger.debug("rsync[%s] %s", rsync.pid, line)
- logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, self.runtime, self.status)
+ logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, t1 - t0, self.status)
# Should do something with rsync result and validation status database here.
@@ -880,6 +902,179 @@ class Fetcher(object):
elif os.path.exists(path):
yield path
+ @tornado.gen.coroutine
+ def _https_fetch_url(self, url):
+
+ if urlparse.urlparse(url).netloc in self._https_deadhosts:
+ raise DeadHost
+
+ try:
+ t0 = time.time()
+ client = tornado.httpclient.AsyncHTTPClient()
+ response = yield client.fetch(url, validate_cert = False)
+ # Might want to check response Content-Type here
+ except:
+ # Might want to check response code here to figure out
+ # whether to list this host in _https_deadhosts.
+ ok = False
+ raise
+ else:
+ ok = True
+ finally:
+ t1 = time.time()
+ logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
+ retrieval = rpki.rcynicdb.models.Retrieval.objects.create(
+ uri = url,
+ started = rpki.sundial.datetime.fromtimestamp(t0),
+ finished = rpki.sundial.datetime.fromtimestamp(t1),
+ successful = ok)
+
+ raise tornado.gen.Return(retrieval, response.body if ok else None)
+
+ @tornado.gen.coroutine
+ def _https_fetch(self):
+
+ if args.no_fetch:
+ return
+
+ other = self._https_history.get(self.uri)
+ if other is not None and other.pending is not None:
+ yield other.pending.wait()
+ return
+
+ self.pending = tornado.locks.Condition()
+ self._rsync_history[self.uri] = self
+
+ try:
+ retrieval, der = yield self._https_fetch_url(self.uri)
+ X509.store_if_new(der, self.uri, retrieval)
+ except:
+ logger.exception("Couldn't load %s", self.uri)
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch_url(self, url, tag, hash = None, uuid = None, serial = None):
+
+ retrieval, xml = yield self._https_fetch_url(url)
+
+ if hash is not None and sha256(xml).encode("hex") != hash.lower():
+ raise RRDP_ParseFailure("Expected RRDP hash %s for %s, got %s" % (
+ hash.lower(), url, sha256(xml).encode("hex")))
+
+ xml = XML(xml)
+ rpki.relaxng.rrdp.schema.assertValid(xml)
+
+ if tag is not None and xml.tag != rpki.relaxng.rrdp.xmlns + tag:
+ raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % (
+ tag, delta_url, xml.tag))
+
+ if uuid is not None and xml.get("uuid") != uuid:
+ raise RRDP_ParseFailure("Expected RRDP UUID %s for %s, got %s" % (
+ uuid, url, xml.get("uuid")))
+
+ if serial is not None and long(xml.get("serial")) != long(serial):
+ raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % (
+ serial, url, xml.get("serial")))
+
+ raise tornado.gen.Return(retrieval, xml)
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch(self):
+ from django.db import transaction
+
+ if args.no_fetch:
+ return
+
+ other = self._https_history.get(self.uri)
+ if other is not None and other.pending is not None:
+ yield other.pending.wait()
+ return
+
+ self.pending = tornado.locks.Condition()
+ self._rsync_history[self.uri] = self
+
+ try:
+ retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification")
+
+ uuid = notification.get("uuid")
+ serial = long(notification.get("serial"))
+
+ old_snapshot = rpki.rcynicdb.models.RRDPSnapshot.objects.filter(
+ uuid = uuid).order_by("-retrieved__started").first()
+
+ if old_snapshot is not None and old_snapshot.serial == serial:
+ return # We're up to date, nothing to do
+
+ delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
+ for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
+
+ new_snapshot = rpki.rcynicdb.models.RRDPSnapshot(retrieved = retrieval, uuid = uuid, serial = serial)
+
+ if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials:
+ # No useful combination of old snapshot and available deltas, need to load current snapshot.
+
+ x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
+
+ retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"),
+ tag = "snapshot", uuid = uuid, serial = serial)
+
+ with transaction.atomic():
+ new_snapshot.save()
+ for x in snapshot.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
+ uri = x.get("uri")
+ cls = uri_to_class(uri)
+ if cls is None:
+ raise RRDP_ParseFailure("Unexpected URI %s" % uri)
+ obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
+ obj.snapshot.add(new_snapshot)
+ obj.save()
+
+ else:
+ # In theory there exists a set of deltas which will get us from old_snapshot to new_snapshot.
+
+ deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
+ hash = delta_serials[serial][1],
+ tag = "delta",
+ uuid = uuid,
+ serial = serial))
+ for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)]
+
+ with transaction.atomic():
+ new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set
+ new_snapshot.save()
+
+ for retrieval, delta in deltas:
+
+ for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"):
+ new_snapshot.rpkiobject_set.remove(
+ new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ new_snapshot.save()
+
+ for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
+ uri = x.get("uri")
+ cls = uri_to_class(uri)
+ if cls is None:
+ raise RRDP_ParseFailure("Unexpected URI %s" % uri)
+ if x.get("hash", None) is not None:
+ new_snapshot.rpkiobject_set.remove(
+ new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ new_snapshot.save()
+ obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
+ obj.snapshot.add(new_snapshot)
+ obj.save()
+
+ except:
+ logger.exception("Couldn't load %s", self.uri)
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
+
class CheckTALTask(object):
@@ -892,7 +1087,7 @@ class CheckTALTask(object):
@tornado.gen.coroutine
def __call__(self):
- yield Fetcher(self.uri).fetch()
+ yield Fetcher(self.uri, ta = True).fetch()
for cer in fetch_objects(uri = self.uri):
if self.check(cer):
yield task_queue.put(WalkTask(cer = cer))
@@ -966,8 +1161,15 @@ def final_cleanup():
# https://docs.djangoproject.com/en/1.8/faq/models/#how-can-i-see-the-raw-sql-queries-django-is-running
# All RPKI objects other than the most recent authenticated set.
- # This will definitely need work once we support RRDP.
- rpki.rcynicdb.models.RPKIObject.objects.exclude(authenticated = authenticated.id).delete()
+ q = rpki.rcynicdb.models.RPKIObject.objects
+ q = q.exclude(authenticated = authenticated.id)
+
+ # OK, I think what we want to express here is: "Keep any
+ # RPKIObject which is part of any most recent RRDP snapshot
+ # which has one RPKIObjects in the current authenticated set"
+ # Whee!
+
+ q.delete()
# Flush all but the most recent authenticated set.
rpki.rcynicdb.models.Authenticated.objects.exclude(id = authenticated.id).delete()