diff options
-rwxr-xr-x | rp/rcynic/rcynicng | 140 | ||||
-rw-r--r-- | rpki/django_settings/rcynic.py | 7 | ||||
-rw-r--r-- | rpki/rcynicdb/models.py | 46 |
3 files changed, 111 insertions, 82 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 3e32e286..cfcfd133 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -21,6 +21,7 @@ import tornado.locks import tornado.ioloop import tornado.queues import tornado.process +import tornado.httpclient import rpki.POW import rpki.sundial @@ -135,7 +136,7 @@ class X509(rpki.POW.X509): self = cls.derRead(der) aki = self.getAKI() ski = self.getSKI() - return rpki.rcynicdb.models.RPKIObject.objects.get_or_create( + return RPKIObject.objects.get_or_create( der = der, defaults = dict( uri = uri, @@ -257,7 +258,7 @@ class CRL(rpki.POW.CRL): def store_if_new(cls, der, uri, retrieval): self = cls.derRead(der) aki = self.getAKI() - return rpki.rcynicdb.models.RPKIObject.objects.get_or_create( + return RPKIObject.objects.get_or_create( der = der, defaults = dict( uri = uri, @@ -327,7 +328,7 @@ class CMS_Mixin(object): cert = self.certs()[0] aki = cert.getAKI() ski = cert.getSKI() - return rpki.rcynicdb.models.RPKIObject.objects.get_or_create( + return RPKIObject.objects.get_or_create( der = der, defaults = dict( uri = uri, @@ -484,7 +485,7 @@ def uri_to_class(uri): # https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering def fetch_objects(**kwargs): - for obj in rpki.rcynicdb.models.RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"): + for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"): cls = uri_to_class(obj.uri) if cls is not None: yield cls.load(obj) @@ -516,10 +517,15 @@ class WalkFrame(object): @tornado.gen.coroutine def initial(self, wsk): - # Will need some logic to decide whether to prefer RRDP or rsync. - # For the moment, we've only implemented rsync, so this is easy. + rsync_uri = first_rsync_uri(self.cer.caDirectory) + rrdp_uri = first_https_uri(self.cer.rpkiNotify) - self.fetcher = Fetcher(first_rsync_uri(self.cer.caDirectory)) + if args.prefer_rsync: + uri = rsync_uri or rrdp_uri + else: + uri = rrdp_uri or rsync_uri + + self.fetcher = Fetcher(uri) if not self.fetcher.needed(): self.state = self.ready @@ -720,6 +726,9 @@ def first_uri(uris, scheme): def first_rsync_uri(uris): return first_uri(uris, "rsync://") +def first_https_uri(uris): + return first_uri(uris, "https://") + def sha256(bytes): d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) d.update(bytes) @@ -864,7 +873,7 @@ class Fetcher(object): # Should do something with rsync result and validation status database here. - retrieval = rpki.rcynicdb.models.Retrieval.objects.create( + retrieval = Retrieval.objects.create( uri = self.uri, started = rpki.sundial.datetime.fromtimestamp(t0), finished = rpki.sundial.datetime.fromtimestamp(t1), @@ -901,6 +910,15 @@ class Fetcher(object): if urlparse.urlparse(url).netloc in self._https_deadhosts: raise DeadHost + # Should do something with deadhost processing below. Looks + # like errors such as HTTP timeout show up as + # tornado.httpclient.HTTPError exceptions (which we could + # suppress if we wanted to do so, but we probably don't). + # HTTP timeout shows up in the logs as "HTTP 599". See doc for: + # + # tornado.httpclient.AsyncHTTPClient.fetch() + # tornado.httpclient.HTTPError + try: t0 = time.time() client = tornado.httpclient.AsyncHTTPClient() @@ -916,13 +934,13 @@ class Fetcher(object): finally: t1 = time.time() logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) - retrieval = rpki.rcynicdb.models.Retrieval.objects.create( + retrieval = Retrieval.objects.create( uri = url, started = rpki.sundial.datetime.fromtimestamp(t0), finished = rpki.sundial.datetime.fromtimestamp(t1), successful = ok) - raise tornado.gen.Return(retrieval, response.body if ok else None) + raise tornado.gen.Return((retrieval, response.body if ok else None)) @tornado.gen.coroutine def _https_fetch(self): @@ -950,7 +968,7 @@ class Fetcher(object): pending.notify_all() @tornado.gen.coroutine - def _rrdp_fetch_url(self, url, tag, hash = None, uuid = None, serial = None): + def _rrdp_fetch_url(self, url, tag, hash = None, session_id = None, serial = None): retrieval, xml = yield self._https_fetch_url(url) @@ -965,15 +983,15 @@ class Fetcher(object): raise RRDP_ParseFailure("Expected RRDP %s for %s, got %s" % ( tag, delta_url, xml.tag)) - if uuid is not None and xml.get("uuid") != uuid: - raise RRDP_ParseFailure("Expected RRDP UUID %s for %s, got %s" % ( - uuid, url, xml.get("uuid"))) + if session_id is not None and xml.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id %s for %s, got %s" % ( + session_id, url, xml.get("session_id"))) if serial is not None and long(xml.get("serial")) != long(serial): raise RRDP_ParseFailure("Expected RRDP serial %s for %s, got %s" % ( serial, url, xml.get("serial"))) - raise tornado.gen.Return(retrieval, xml) + raise tornado.gen.Return((retrieval, xml)) @tornado.gen.coroutine def _rrdp_fetch(self): @@ -988,32 +1006,37 @@ class Fetcher(object): return self.pending = tornado.locks.Condition() - self._rsync_history[self.uri] = self + self._https_history[self.uri] = self try: retrieval, notification = yield self._rrdp_fetch_url(url = self.uri, tag = "notification") - uuid = notification.get("uuid") + session_id = notification.get("session_id") serial = long(notification.get("serial")) - old_snapshot = rpki.rcynicdb.models.RRDPSnapshot.objects.filter( - uuid = uuid).order_by("-retrieved__started").first() + old_snapshot = RRDPSnapshot.objects.filter( + session_id = session_id).order_by("-retrieved__started").first() - if old_snapshot is not None and old_snapshot.serial == serial: - return # We're up to date, nothing to do + logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r", + self.uri, session_id, serial, old_snapshot) + if old_snapshot is not None and old_snapshot.serial == serial: + logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri) + return + delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) - new_snapshot = rpki.rcynicdb.models.RRDPSnapshot(retrieved = retrieval, uuid = uuid, serial = serial) + new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial) if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials: - # No useful combination of old snapshot and available deltas, need to load current snapshot. x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot") + logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri")) + retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"), - tag = "snapshot", uuid = uuid, serial = serial) + tag = "snapshot", session_id = session_id, serial = serial) with transaction.atomic(): new_snapshot.save() @@ -1027,17 +1050,17 @@ class Fetcher(object): obj.save() else: - # In theory there exists a set of deltas which will get us from old_snapshot to new_snapshot. + logger.debug("RRDP %s loading %s deltas", self.uri, (new_snapshot.serial - old_snapshot.serial)) - deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0], - hash = delta_serials[serial][1], - tag = "delta", - uuid = uuid, - serial = serial)) + deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0], + hash = delta_serials[serial][1], + tag = "delta", + session_id = session_id, + serial = serial)) for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)] with transaction.atomic(): - new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set + new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all() new_snapshot.save() for retrieval, delta in deltas: @@ -1145,33 +1168,54 @@ def final_report(): def final_cleanup(): from django.db import transaction + def report(when): + logger.debug("Database state %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval", + when, + Authenticated.objects.all().count(), + RRDPSnapshot.objects.all().count(), + RPKIObject.objects.all().count(), + Retrieval.objects.all().count()) + + report("before") + with transaction.atomic(): - # Flush old authenticated sets. + #logger.debug("Flushing old authenticated sets") - q = rpki.rcynicdb.models.Authenticated.objects.exclude(id = authenticated.id) + q = Authenticated.objects + q = q.exclude(id = authenticated.id) q.delete() - # Flush RRDP snapshots which don't contribute anything to the current authenticated set. + #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set") - q = authenticated.rpkiobject_set.order_by("snapshot__id").values_list("snapshot__id", flat = True) - q = rpki.rcynicdb.models.RRDPSnapshot.objects.exclude(id__in = q.distinct()) + q = RPKIObject.objects + q = q.filter(authenticated = authenticated.id) + q = q.exclude(snapshot = None) + q = q.order_by("snapshot__id") + q = q.values_list("snapshot__id", flat = True) + q = q.distinct() + q = RRDPSnapshot.objects.exclude(id__in = q) q.delete() - # Flush RPKI objects which are neither part of the current authenticated set nor part of - # a current RRDP snapshot. + #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") - q = rpki.rcynicdb.models.RPKIObject.objects + q = RPKIObject.objects q = q.exclude(authenticated = authenticated.id) q = q.filter(snapshot = None) q.delete() - # Flush retrieval objects which are no longer related to any RPKI objects. + #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot") - q = rpki.rcynicdb.models.RPKIObject.objects.order_by("retrieved__id").values_list("retrieved__id", flat = True) - q = rpki.rcynicdb.models.Retrieval.objects.exclude(id__in = q.distinct()) + q = RPKIObject.objects + q = q.order_by("retrieved__id") + q = q.values_list("retrieved__id", flat = True) + q = q.distinct() + q = Retrieval.objects.exclude(id__in = q) + q = q.filter(rrdpsnapshot = None) q.delete() + report("after") + @tornado.gen.coroutine def launcher(): @@ -1211,6 +1255,8 @@ def main(): parser.add_argument("--no-spawn-on-fetch", action = "store_true") parser.add_argument("--no-migrate", action = "store_true") + parser.add_argument("--prefer-rsync", action = "store_true") + global args args = parser.parse_args() @@ -1231,11 +1277,19 @@ def main(): django.core.management.call_command("migrate", verbosity = 0, interactive = False) import rpki.rcynicdb + global Retrieval + global Authenticated + global RRDPSnapshot + global RPKIObject + Retrieval = rpki.rcynicdb.models.Retrieval + Authenticated = rpki.rcynicdb.models.Authenticated + RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot + RPKIObject = rpki.rcynicdb.models.RPKIObject logging.basicConfig(level = logging.DEBUG, format = "%(asctime)s %(message)s", datefmt = "%Y-%m-%d %H:%M:%S") global authenticated - authenticated = rpki.rcynicdb.models.Authenticated.objects.create(started = rpki.sundial.datetime.now()) + authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now()) global task_queue task_queue = tornado.queues.Queue() diff --git a/rpki/django_settings/rcynic.py b/rpki/django_settings/rcynic.py index 90491ddc..0845604c 100644 --- a/rpki/django_settings/rcynic.py +++ b/rpki/django_settings/rcynic.py @@ -51,6 +51,13 @@ del DatabaseConfigurator INSTALLED_APPS = ["rpki.rcynicdb"] +# Debugging +# +# DO NOT ENABLE DJANGO DEBUGGING IN PRODUCTION! +# +#DEBUG = True + + # Allow local site to override any setting above -- but if there's # anything that local sites routinely need to modify, please consider # putting that configuration into rpki.conf and just adding code here diff --git a/rpki/rcynicdb/models.py b/rpki/rcynicdb/models.py index c5eb983a..09d513d5 100644 --- a/rpki/rcynicdb/models.py +++ b/rpki/rcynicdb/models.py @@ -1,35 +1,8 @@ -# First cut at ORM models for rcynicng, assuming for now that we're -# going to go with Django rather than raw SQL. +# First cut at ORM models for rcynicng. from django.db import models # HTTP/HTTPS/RSYNC fetch event. -# -# Open issue: for RRDP, are we just recording the notification fetch, -# or the snapshot/delta fetches as well? If the latter, to which -# retrieval event does the RRDPSnapshot 1:1 relationship refer? For -# that matter, should we somehow be recording the relationship between -# the notification and snapshot/delta fetches? Given that, at least -# in the current protocol, we will only do either one snapshot fetch -# or one delta fetch after the notify fetch, we could just use two -# URIs in the retrieval record, if we allow the second to be empty -# (which we would have to do anyway for rsync). -# -# Or we could add some kind of fun SQL self-reference, which, in -# Django, looks like: -# -# models.ForeignKey('self', on_delete = models.CASCADE) -# -# except that it's more like a 1:1 recursive relationship, which isn't -# mentioned in the Django docs, but which supposedly -# (http://stackoverflow.com/questions/18271001/django-recursive-relationship) -# works the same way: -# -# models.OneToOneField('self', null = True) -# -# Unclear whether we still need "on_delete = models.CASCADE", probably. -# Example on StackOverflow has a complex .save() method, but that may -# be specific to the original poster's desired semantics. class Retrieval(models.Model): uri = models.TextField() @@ -37,25 +10,18 @@ class Retrieval(models.Model): finished = models.DateTimeField() successful = models.BooleanField() -# Collection of validated objects (like current -# rsync-data/authenticated.yyyy-mm-ddTHH:MM:SS/ tree) +# Collection of validated objects. class Authenticated(models.Model): started = models.DateTimeField() finished = models.DateTimeField(null = True) # One instance of an RRDP snapshot. -# -# Deltas are processed by finding the RRDPSnapshot holding the -# starting point, creating a new RRDPSnapshot for the endpoint, and -# applying all necessary deltas (with consistency checks all along the -# way) to get from one to the other; we don't commit the endpoint (or -# anything created during the process) until and unless it all works. class RRDPSnapshot(models.Model): - uuid = models.UUIDField() - serial = models.BigIntegerField() - retrieved = models.OneToOneField(Retrieval) + session_id = models.UUIDField() + serial = models.BigIntegerField() + retrieved = models.OneToOneField(Retrieval) # RPKI objects. # @@ -64,6 +30,8 @@ class RRDPSnapshot(models.Model): # what we want in this case. # # https://docs.djangoproject.com/en/1.9/ref/models/fields/#django.db.models.ForeignKey.on_delete +# +# Might also want to provide names for the reverse relationships, code uses blah_set for now. class RPKIObject(models.Model): der = models.BinaryField(unique = True) |