aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic/rcynicng
diff options
context:
space:
mode:
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-xrp/rcynic/rcynicng118
1 files changed, 55 insertions, 63 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 13062ba9..1b1cd067 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -1026,87 +1026,78 @@ class Fetcher(object):
session_id = notification.get("session_id")
serial = long(notification.get("serial"))
- old_snapshot = RRDPSnapshot.objects.filter(
+ snapshot = RRDPSnapshot.objects.filter(
session_id = session_id).order_by("-retrieved__started").first()
- logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r",
- self.uri, session_id, serial, old_snapshot)
+ logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r",
+ self.uri, session_id, serial, snapshot)
- if old_snapshot is not None and old_snapshot.serial == serial:
+ if snapshot is not None and snapshot.serial == serial:
logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
return
delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
- new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial)
+ if snapshot is None or snapshot.serial + 1 not in delta_serials:
- if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials:
+ if snapshot is not None:
+ logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
- logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri"))
+ url, hash = x.get("uri"), x.get("hash")
- retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"),
- tag = "snapshot", session_id = session_id, serial = serial)
+ logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial)
+
+ retrieval, x = yield self._rrdp_fetch_url(
+ url = url,
+ hash = hash,
+ tag = "snapshot",
+ session_id = session_id,
+ serial = serial)
with transaction.atomic():
- new_snapshot.save()
- for x in snapshot.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
+ if snapshot is not None:
+ snapshot.delete()
+ snapshot = RRDPSnapshot.objects.create(retrieved = retrieval, session_id = session_id, serial = serial)
+ for x in x.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
uri = x.get("uri")
cls = uri_to_class(uri)
if cls is None:
raise RRDP_ParseFailure("Unexpected URI %s" % uri)
obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(new_snapshot)
+ obj.snapshot.add(snapshot)
obj.save()
+ logger.debug("RRDP %s committing snapshot %s serial %s", self.uri, url, serial)
else:
- logger.debug("RRDP %s fetching %s deltas (%s -- %s)", self.uri,
- (new_snapshot.serial - old_snapshot.serial), old_snapshot.serial, new_snapshot.serial)
-
- deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
- hash = delta_serials[serial][1],
- tag = "delta",
- session_id = session_id,
- serial = serial))
- for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)]
-
- logger.debug("RRDP %s loading deltas", self.uri)
-
- with transaction.atomic():
- new_snapshot.save()
-
- logger.debug("RRDP %s copying snapshot %s", self.uri, old_snapshot.serial)
+ logger.debug("RRDP %s %s deltas (%s -- %s)", self.uri,
+ (serial - snapshot.serial), snapshot.serial, serial)
- if False:
- # What we'd like to do here is:
+ for serial in xrange(snapshot.serial + 1, serial + 1):
- new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all()
+ url, hash = delta_serials[serial]
- # but, at least with the SQLite3 driver, this explodes when there are too many objects,
- # because of the silly way the driver tries to implement this.
+ logger.debug("RRDP %s fetching %s serial %s", self.uri, url, serial)
- else:
- # So do this the slow way for now, do better later if it turns out to matter.
+ retrieval, delta = yield self._rrdp_fetch_url(
+ url = url,
+ hash = hash,
+ tag = "delta",
+ session_id = session_id,
+ serial = serial)
- logger.debug("RRDP %s starting potentially really slow copy operation", self.uri)
+ logger.debug("RRDP %s loading delta %s serial %s", self.uri, url, serial)
- for obj in old_snapshot.rpkiobject_set.all():
- new_snapshot.rpkiobject_set.add(obj)
-
- logger.debug("RRDP %s finished potentially really slow copy operation", self.uri)
-
- new_snapshot.save()
-
- for retrieval, delta in deltas:
-
- logger.debug("RRDP %s applying delta %s", self.uri, delta.get("serial"))
+ with transaction.atomic():
+ snapshot.serial = serial
+ snapshot.save()
for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"):
- new_snapshot.rpkiobject_set.remove(
- new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
- new_snapshot.save()
+ snapshot.rpkiobject_set.remove(
+ snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ snapshot.save()
for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
uri = x.get("uri")
@@ -1114,16 +1105,16 @@ class Fetcher(object):
if cls is None:
raise RRDP_ParseFailure("Unexpected URI %s" % uri)
if x.get("hash", None) is not None:
- new_snapshot.rpkiobject_set.remove(
- new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
- new_snapshot.save()
+ snapshot.rpkiobject_set.remove(
+ snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ snapshot.save()
obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(new_snapshot)
+ obj.snapshot.add(snapshot)
obj.save()
- logger.debug("RRDP %s committing", self.uri)
+ logger.debug("RRDP %s committing delta %s serial %s", self.uri, url, serial)
- logger.debug("RRDP %s done loading deltas", self.uri)
+ logger.debug("RRDP %s done processing deltas", self.uri)
except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError):
pass # Already logged
@@ -1274,15 +1265,16 @@ def final_cleanup():
# http://stackoverflow.com/questions/2317686/joining-2-sql-select-result-sets-into-one
# https://docs.djangoproject.com/en/1.8/topics/db/sql/#executing-custom-sql-directly
- q = RRDPSnapshot.objects
- q = q.values("session_id")
- q = q.annotate(max_serial = models.Max("serial"))
- q = q.values_list("session_id", "max_serial")
-
- logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q))
+ # Except that this shouldn't be necessary after all, now that we reuse snapshot objects in place.
- for u, s in q:
- RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete()
+ if False:
+ q = RRDPSnapshot.objects
+ q = q.values("session_id")
+ q = q.annotate(max_serial = models.Max("serial"))
+ q = q.values_list("session_id", "max_serial")
+ logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q))
+ for u, s in q:
+ RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete()
logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")