aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-01-15 07:33:51 +0000
committerRob Austein <sra@hactrn.net>2016-01-15 07:33:51 +0000
commit7de85bc5d8cc7fc900f8646aa466af5d98da909e (patch)
tree45d7a1b46eabd9a7c400fd8899811cd2fa279486
parent3903833d68da03e82d75a7c5d0dc0699a1c15bb0 (diff)
RRDP seems to work now, at least with RIPE's implementation.
svn path=/branches/tk705/; revision=6224
-rwxr-xr-xrp/rcynic/rcynicng140
-rw-r--r--rpki/django_settings/rcynic.py7
-rw-r--r--rpki/rcynicdb/models.py46
3 files changed, 111 insertions, 82 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 3e32e286..cfcfd133 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -21,6 +21,7 @@ import tornado.locks
import tornado.ioloop
import tornado.queues
import tornado.process
+import tornado.httpclient
import rpki.POW
import rpki.sundial
@@ -135,7 +136,7 @@ class X509(rpki.POW.X509):
self = cls.derRead(der)
aki = self.getAKI()
ski = self.getSKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -257,7 +258,7 @@ class CRL(rpki.POW.CRL):
def store_if_new(cls, der, uri, retrieval):
self = cls.derRead(der)
aki = self.getAKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -327,7 +328,7 @@ class CMS_Mixin(object):
cert = self.certs()[0]
aki = cert.getAKI()
ski = cert.getSKI()
- return rpki.rcynicdb.models.RPKIObject.objects.get_or_create(
+ return RPKIObject.objects.get_or_create(
der = der,
defaults = dict(
uri = uri,
@@ -484,7 +485,7 @@ def uri_to_class(uri):
# https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering
def fetch_objects(**kwargs):
- for obj in rpki.rcynicdb.models.RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"):
+ for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"):
cls = uri_to_class(obj.uri)
if cls is not None:
yield cls.load(obj)
@@ -516,10 +517,15 @@ class WalkFrame(object):
@tornado.gen.coroutine
def initial(self, wsk):
- # Will need some logic to decide whether to prefer RRDP or rsync.
- # For the moment, we've only implemented rsync, so this is easy.
+ rsync_uri = first_rsync_uri(self.cer.caDirectory)
+ rrdp_uri = first_https_uri(self.cer.rpkiNotify)
- self.fetcher = Fetcher(first_rsync_uri(self.cer.caDirectory))
+ if args.prefer_rsync:
+ uri = rsync_uri or rrdp_uri
+ else:
+ uri = rrdp_uri or rsync_uri
+
+ self.fetcher = Fetcher(uri)
if not self.fetcher.needed():
self.state = self.ready
@@ -720,6 +726,9 @@ def first_uri(uris, scheme):
def first_rsync_uri(uris):
return first_uri(uris, "rsync://")
+def first_https_uri(uris):
+ return first_uri(uris, "https://")
+
def sha256(bytes):
d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
d.update(bytes)
@@ -864,7 +873,7 @@ class Fetcher(object):
# Should do something with rsync result and validation status database here.
- retrieval = rpki.rcynicdb.models.Retrieval.objects.create(
+ retrieval = Retrieval.objects.create(
uri = self.uri,
started = rpki.sundial.datetime.fromtimestamp(t0),
finished = rpki.sundial.datetime.fromtimestamp(t1),
@@ -901,6 +910,15 @@ class Fetcher(object):
if urlparse.urlparse(url).netloc in self._https_deadhosts:
raise DeadHost
+ # Should do something with deadhost processing below. Looks
+ # like errors such as HTTP timeout show up as
+ # tornado.httpclient.HTTPError exceptions (which we could
+ # suppress if we wanted to do so, but we probably don't).
+ # HTTP timeout shows up in the logs as "HTTP 599". See doc for:
+ #
+ # tornado.httpclient.AsyncHTTPClient.fetch()
+ # tornado.httpclient.HTTPError
+
try:
t0 = time.time()
client = tornado.httpclient.AsyncHTTPClient()
@@ -916,13 +934,13 @@ class Fetcher(object):
finally:
t1 = time.time()
logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
- retrieval = rpki.rcynicdb.models.Retrieval.objects.create(
+ retrieval = Retrieval.objects.create(
uri = url,
started = rpki.sundial.datetime.fromtimestamp(t0),
finished = rpki.sundial.datetime.fromtimestamp(t1),
successful = ok)
- raise tornado.gen.Return(retrieval, response.body if ok else None)
+ raise tornado.gen.Return((retrieval, response.body if ok else None))
@tornado.gen.coroutine
def _https_fetch(self):
@@ -950,7 +968,7 @@ class Fetcher(object):
pending.notify_all()
@tornado.gen.coroutine
- def _rrdp_fetch_url(self, url, tag, hash = None, uuid = None, serial = None):
+ def _rrdp_fetch_url(self, url, tag, hash = None, session_id = None, serial = None):
retrieval, xml = yield self._https_fetch_url(url)
@@ -965,15 +983,15 @@ class Fetcher(object):
raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % (
tag, delta_url, xml.tag))
- if uuid is not None and xml.get("uuid") != uuid:
- raise RRDP_ParseFailure("Expected RRDP UUID %s for %s, got %s" % (
- uuid, url, xml.get("uuid")))
+ if session_id is not None and xml.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id %s for %s, got %s" % (
+ session_id, url, xml.get("session_id")))
if serial is not None and long(xml.get("serial")) != long(serial):
raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % (
serial, url, xml.get("serial")))
- raise tornado.gen.Return(retrieval, xml)
+ raise tornado.gen.Return((retrieval, xml))
@tornado.gen.coroutine
def _rrdp_fetch(self):
@@ -988,32 +1006,37 @@ class Fetcher(object):
return
self.pending = tornado.locks.Condition()
- self._rsync_history[self.uri] = self
+ self._https_history[self.uri] = self
try:
retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification")
- uuid = notification.get("uuid")
+ session_id = notification.get("session_id")
serial = long(notification.get("serial"))
- old_snapshot = rpki.rcynicdb.models.RRDPSnapshot.objects.filter(
- uuid = uuid).order_by("-retrieved__started").first()
+ old_snapshot = RRDPSnapshot.objects.filter(
+ session_id = session_id).order_by("-retrieved__started").first()
- if old_snapshot is not None and old_snapshot.serial == serial:
- return # We're up to date, nothing to do
+ logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r",
+ self.uri, session_id, serial, old_snapshot)
+ if old_snapshot is not None and old_snapshot.serial == serial:
+ logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
+ return
+
delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
- new_snapshot = rpki.rcynicdb.models.RRDPSnapshot(retrieved = retrieval, uuid = uuid, serial = serial)
+ new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial)
if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials:
- # No useful combination of old snapshot and available deltas, need to load current snapshot.
x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
+ logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri"))
+
retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"),
- tag = "snapshot", uuid = uuid, serial = serial)
+ tag = "snapshot", session_id = session_id, serial = serial)
with transaction.atomic():
new_snapshot.save()
@@ -1027,17 +1050,17 @@ class Fetcher(object):
obj.save()
else:
- # In theory there exists a set of deltas which will get us from old_snapshot to new_snapshot.
+ logger.debug("RRDP %s loading %s deltas", self.uri, (new_snapshot.serial - old_snapshot.serial))
- deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
- hash = delta_serials[serial][1],
- tag = "delta",
- uuid = uuid,
- serial = serial))
+ deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
+ hash = delta_serials[serial][1],
+ tag = "delta",
+ session_id = session_id,
+ serial = serial))
for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)]
with transaction.atomic():
- new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set
+ new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all()
new_snapshot.save()
for retrieval, delta in deltas:
@@ -1145,33 +1168,54 @@ def final_report():
def final_cleanup():
from django.db import transaction
+ def report(when):
+ logger.debug("Database state %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval",
+ when,
+ Authenticated.objects.all().count(),
+ RRDPSnapshot.objects.all().count(),
+ RPKIObject.objects.all().count(),
+ Retrieval.objects.all().count())
+
+ report("before")
+
with transaction.atomic():
- # Flush old authenticated sets.
+ #logger.debug("Flushing old authenticated sets")
- q = rpki.rcynicdb.models.Authenticated.objects.exclude(id = authenticated.id)
+ q = Authenticated.objects
+ q = q.exclude(id = authenticated.id)
q.delete()
- # Flush RRDP snapshots which don't contribute anything to the current authenticated set.
+ #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set")
- q = authenticated.rpkiobject_set.order_by("snapshot__id").values_list("snapshot__id", flat = True)
- q = rpki.rcynicdb.models.RRDPSnapshot.objects.exclude(id__in = q.distinct())
+ q = RPKIObject.objects
+ q = q.filter(authenticated = authenticated.id)
+ q = q.exclude(snapshot = None)
+ q = q.order_by("snapshot__id")
+ q = q.values_list("snapshot__id", flat = True)
+ q = q.distinct()
+ q = RRDPSnapshot.objects.exclude(id__in = q)
q.delete()
- # Flush RPKI objects which are neither part of the current authenticated set nor part of
- # a current RRDP snapshot.
+ #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")
- q = rpki.rcynicdb.models.RPKIObject.objects
+ q = RPKIObject.objects
q = q.exclude(authenticated = authenticated.id)
q = q.filter(snapshot = None)
q.delete()
- # Flush retrieval objects which are no longer related to any RPKI objects.
+ #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot")
- q = rpki.rcynicdb.models.RPKIObject.objects.order_by("retrieved__id").values_list("retrieved__id", flat = True)
- q = rpki.rcynicdb.models.Retrieval.objects.exclude(id__in = q.distinct())
+ q = RPKIObject.objects
+ q = q.order_by("retrieved__id")
+ q = q.values_list("retrieved__id", flat = True)
+ q = q.distinct()
+ q = Retrieval.objects.exclude(id__in = q)
+ q = q.filter(rrdpsnapshot = None)
q.delete()
+ report("after")
+
@tornado.gen.coroutine
def launcher():
@@ -1211,6 +1255,8 @@ def main():
parser.add_argument("--no-spawn-on-fetch", action = "store_true")
parser.add_argument("--no-migrate", action = "store_true")
+ parser.add_argument("--prefer-rsync", action = "store_true")
+
global args
args = parser.parse_args()
@@ -1231,11 +1277,19 @@ def main():
django.core.management.call_command("migrate", verbosity = 0, interactive = False)
import rpki.rcynicdb
+ global Retrieval
+ global Authenticated
+ global RRDPSnapshot
+ global RPKIObject
+ Retrieval = rpki.rcynicdb.models.Retrieval
+ Authenticated = rpki.rcynicdb.models.Authenticated
+ RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot
+ RPKIObject = rpki.rcynicdb.models.RPKIObject
logging.basicConfig(level = logging.DEBUG, format = "%(asctime)s %(message)s", datefmt = "%Y-%m-%d %H:%M:%S")
global authenticated
- authenticated = rpki.rcynicdb.models.Authenticated.objects.create(started = rpki.sundial.datetime.now())
+ authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now())
global task_queue
task_queue = tornado.queues.Queue()
diff --git a/rpki/django_settings/rcynic.py b/rpki/django_settings/rcynic.py
index 90491ddc..0845604c 100644
--- a/rpki/django_settings/rcynic.py
+++ b/rpki/django_settings/rcynic.py
@@ -51,6 +51,13 @@ del DatabaseConfigurator
INSTALLED_APPS = ["rpki.rcynicdb"]
+# Debugging
+#
+# DO NOT ENABLE DJANGO DEBUGGING IN PRODUCTION!
+#
+#DEBUG = True
+
+
# Allow local site to override any setting above -- but if there's
# anything that local sites routinely need to modify, please consider
# putting that configuration into rpki.conf and just adding code here
diff --git a/rpki/rcynicdb/models.py b/rpki/rcynicdb/models.py
index c5eb983a..09d513d5 100644
--- a/rpki/rcynicdb/models.py
+++ b/rpki/rcynicdb/models.py
@@ -1,35 +1,8 @@
-# First cut at ORM models for rcynicng, assuming for now that we're
-# going to go with Django rather than raw SQL.
+# First cut at ORM models for rcynicng.
from django.db import models
# HTTP/HTTPS/RSYNC fetch event.
-#
-# Open issue: for RRDP, are we just recording the notification fetch,
-# or the snapshot/delta fetches as well? If the latter, to which
-# retrieval event does the RRDPSnapshot 1:1 relationship refer? For
-# that matter, should we somehow be recording the relationship between
-# the notification and snapshot/delta fetches? Given that, at least
-# in the current protocol, we will only do either one snapshot fetch
-# or one delta fetch after the notify fetch, we could just use two
-# URIs in the retrieval record, if we allow the second to be empty
-# (which we would have to do anyway for rsync).
-#
-# Or we could add some kind of fun SQL self-reference, which, in
-# Django, looks like:
-#
-# models.ForeignKey('self', on_delete = models.CASCADE)
-#
-# except that it's more like a 1:1 recursive relationship, which isn't
-# mentioned in the Django docs, but which supposedly
-# (http://stackoverflow.com/questions/18271001/django-recursive-relationship)
-# works the same way:
-#
-# models.OneToOneField('self', null = True)
-#
-# Unclear whether we still need "on_delete = models.CASCADE", probably.
-# Example on StackOverflow has a complex .save() method, but that may
-# be specific to the original poster's desired semantics.
class Retrieval(models.Model):
uri = models.TextField()
@@ -37,25 +10,18 @@ class Retrieval(models.Model):
finished = models.DateTimeField()
successful = models.BooleanField()
-# Collection of validated objects (like current
-# rsync-data/authenticated.yyyy-mm-ddTHH:MM:SS/ tree)
+# Collection of validated objects.
class Authenticated(models.Model):
started = models.DateTimeField()
finished = models.DateTimeField(null = True)
# One instance of an RRDP snapshot.
-#
-# Deltas are processed by finding the RRDPSnapshot holding the
-# starting point, creating a new RRDPSnapshot for the endpoint, and
-# applying all necessary deltas (with consistency checks all along the
-# way) to get from one to the other; we don't commit the endpoint (or
-# anything created during the process) until and unless it all works.
class RRDPSnapshot(models.Model):
- uuid = models.UUIDField()
- serial = models.BigIntegerField()
- retrieved = models.OneToOneField(Retrieval)
+ session_id = models.UUIDField()
+ serial = models.BigIntegerField()
+ retrieved = models.OneToOneField(Retrieval)
# RPKI objects.
#
@@ -64,6 +30,8 @@ class RRDPSnapshot(models.Model):
# what we want in this case.
#
# https://docs.djangoproject.com/en/1.9/ref/models/fields/#django.db.models.ForeignKey.on_delete
+#
+# Might also want to provide names for the reverse relationships, code uses blah_set for now.
class RPKIObject(models.Model):
der = models.BinaryField(unique = True)