aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-01-20 23:15:38 +0000
committerRob Austein <sra@hactrn.net>2016-01-20 23:15:38 +0000
commitaa77e34c8cc1f675dd8f86f713c3ce8a06fece8a (patch)
treeaa64fa1f813554366083c0271027fbead7de31a8
parent3cd14c0dfd61be32ce56fb065cd6c89c60e874f9 (diff)
Process deltas incrementally (one SQL commit per delta) and mutate
existing RRDPSnapshot objects while applying deltas rather than creating new ones. This simplifies cleanup, avoids locking out the I/O loop for the duration of a long commit, and allows us to salvage whatever progress we were able to make if a network problem stops us partway through fetching a long series of deltas. svn path=/branches/tk705/; revision=6229
-rwxr-xr-xrp/rcynic/rcynicng118
1 files changed, 55 insertions, 63 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 13062ba9..1b1cd067 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -1026,87 +1026,78 @@ class Fetcher(object):
session_id = notification.get("session_id")
serial = long(notification.get("serial"))
- old_snapshot = RRDPSnapshot.objects.filter(
+ snapshot = RRDPSnapshot.objects.filter(
session_id = session_id).order_by("-retrieved__started").first()
- logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r",
- self.uri, session_id, serial, old_snapshot)
+ logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r",
+ self.uri, session_id, serial, snapshot)
- if old_snapshot is not None and old_snapshot.serial == serial:
+ if snapshot is not None and snapshot.serial == serial:
logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
return
delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
- new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial)
+ if snapshot is None or snapshot.serial + 1 not in delta_serials:
- if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials:
+ if snapshot is not None:
+ logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot")
- logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri"))
+ url, hash = x.get("uri"), x.get("hash")
- retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"),
- tag = "snapshot", session_id = session_id, serial = serial)
+ logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial)
+
+ retrieval, x = yield self._rrdp_fetch_url(
+ url = url,
+ hash = hash,
+ tag = "snapshot",
+ session_id = session_id,
+ serial = serial)
with transaction.atomic():
- new_snapshot.save()
- for x in snapshot.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
+ if snapshot is not None:
+ snapshot.delete()
+ snapshot = RRDPSnapshot.objects.create(retrieved = retrieval, session_id = session_id, serial = serial)
+ for x in x.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
uri = x.get("uri")
cls = uri_to_class(uri)
if cls is None:
raise RRDP_ParseFailure("Unexpected URI %s" % uri)
obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(new_snapshot)
+ obj.snapshot.add(snapshot)
obj.save()
+ logger.debug("RRDP %s committing snapshot %s serial %s", self.uri, url, serial)
else:
- logger.debug("RRDP %s fetching %s deltas (%s -- %s)", self.uri,
- (new_snapshot.serial - old_snapshot.serial), old_snapshot.serial, new_snapshot.serial)
-
- deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0],
- hash = delta_serials[serial][1],
- tag = "delta",
- session_id = session_id,
- serial = serial))
- for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)]
-
- logger.debug("RRDP %s loading deltas", self.uri)
-
- with transaction.atomic():
- new_snapshot.save()
-
- logger.debug("RRDP %s copying snapshot %s", self.uri, old_snapshot.serial)
+ logger.debug("RRDP %s %s deltas (%s -- %s)", self.uri,
+ (serial - snapshot.serial), snapshot.serial, serial)
- if False:
- # What we'd like to do here is:
+ for serial in xrange(snapshot.serial + 1, serial + 1):
- new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all()
+ url, hash = delta_serials[serial]
- # but, at least with the SQLite3 driver, this explodes when there are too many objects,
- # because of the silly way the driver tries to implement this.
+ logger.debug("RRDP %s fetching %s serial %s", self.uri, url, serial)
- else:
- # So do this the slow way for now, do better later if it turns out to matter.
+ retrieval, delta = yield self._rrdp_fetch_url(
+ url = url,
+ hash = hash,
+ tag = "delta",
+ session_id = session_id,
+ serial = serial)
- logger.debug("RRDP %s starting potentially really slow copy operation", self.uri)
+ logger.debug("RRDP %s loading delta %s serial %s", self.uri, url, serial)
- for obj in old_snapshot.rpkiobject_set.all():
- new_snapshot.rpkiobject_set.add(obj)
-
- logger.debug("RRDP %s finished potentially really slow copy operation", self.uri)
-
- new_snapshot.save()
-
- for retrieval, delta in deltas:
-
- logger.debug("RRDP %s applying delta %s", self.uri, delta.get("serial"))
+ with transaction.atomic():
+ snapshot.serial = serial
+ snapshot.save()
for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"):
- new_snapshot.rpkiobject_set.remove(
- new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
- new_snapshot.save()
+ snapshot.rpkiobject_set.remove(
+ snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ snapshot.save()
for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"):
uri = x.get("uri")
@@ -1114,16 +1105,16 @@ class Fetcher(object):
if cls is None:
raise RRDP_ParseFailure("Unexpected URI %s" % uri)
if x.get("hash", None) is not None:
- new_snapshot.rpkiobject_set.remove(
- new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
- new_snapshot.save()
+ snapshot.rpkiobject_set.remove(
+ snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower()))
+ snapshot.save()
obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval)
- obj.snapshot.add(new_snapshot)
+ obj.snapshot.add(snapshot)
obj.save()
- logger.debug("RRDP %s committing", self.uri)
+ logger.debug("RRDP %s committing delta %s serial %s", self.uri, url, serial)
- logger.debug("RRDP %s done loading deltas", self.uri)
+ logger.debug("RRDP %s done processing deltas", self.uri)
except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError):
pass # Already logged
@@ -1274,15 +1265,16 @@ def final_cleanup():
# http://stackoverflow.com/questions/2317686/joining-2-sql-select-result-sets-into-one
# https://docs.djangoproject.com/en/1.8/topics/db/sql/#executing-custom-sql-directly
- q = RRDPSnapshot.objects
- q = q.values("session_id")
- q = q.annotate(max_serial = models.Max("serial"))
- q = q.values_list("session_id", "max_serial")
-
- logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q))
+ # Except that this shouldn't be necessary after all, now that we reuse snapshot objects in place.
- for u, s in q:
- RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete()
+ if False:
+ q = RRDPSnapshot.objects
+ q = q.values("session_id")
+ q = q.annotate(max_serial = models.Max("serial"))
+ q = q.values_list("session_id", "max_serial")
+ logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q))
+ for u, s in q:
+ RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete()
logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")