diff options
-rwxr-xr-x | rp/rcynic/rcynicng | 118 |
1 files changed, 55 insertions, 63 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 13062ba9..1b1cd067 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -1026,87 +1026,78 @@ class Fetcher(object): session_id = notification.get("session_id") serial = long(notification.get("serial")) - old_snapshot = RRDPSnapshot.objects.filter( + snapshot = RRDPSnapshot.objects.filter( session_id = session_id).order_by("-retrieved__started").first() - logger.debug("RRDP notification for %s session_id %s serial %s old_snapshot %r", - self.uri, session_id, serial, old_snapshot) + logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r", + self.uri, session_id, serial, snapshot) - if old_snapshot is not None and old_snapshot.serial == serial: + if snapshot is not None and snapshot.serial == serial: logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri) return delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) - new_snapshot = RRDPSnapshot(retrieved = retrieval, session_id = session_id, serial = serial) + if snapshot is None or snapshot.serial + 1 not in delta_serials: - if old_snapshot is None or old_snapshot.serial + 1 not in delta_serials: + if snapshot is not None: + logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) x = notification.find(rpki.relaxng.rrdp.xmlns + "snapshot") - logger.debug("RRDP %s loading from snapshot %s", self.uri, x.get("uri")) + url, hash = x.get("uri"), x.get("hash") - retrieval, snapshot = yield self._rrdp_fetch_url(url = x.get("uri"), hash = x.get("hash"), - tag = "snapshot", session_id = session_id, serial = serial) + logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) + + retrieval, x = yield self._rrdp_fetch_url( + url = url, + hash = hash, + tag = "snapshot", + session_id = session_id, + serial = serial) with transaction.atomic(): - new_snapshot.save() - for x in snapshot.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): + if snapshot is not None: + snapshot.delete() + snapshot = RRDPSnapshot.objects.create(retrieved = retrieval, session_id = session_id, serial = serial) + for x in x.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): uri = x.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI %s" % uri) obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) - obj.snapshot.add(new_snapshot) + obj.snapshot.add(snapshot) obj.save() + logger.debug("RRDP %s committing snapshot %s serial %s", self.uri, url, serial) else: - logger.debug("RRDP %s fetching %s deltas (%s -- %s)", self.uri, - (new_snapshot.serial - old_snapshot.serial), old_snapshot.serial, new_snapshot.serial) - - deltas = [(yield self._rrdp_fetch_url(url = delta_serials[serial][0], - hash = delta_serials[serial][1], - tag = "delta", - session_id = session_id, - serial = serial)) - for serial in xrange(old_snapshot.serial + 1, new_snapshot.serial + 1)] - - logger.debug("RRDP %s loading deltas", self.uri) - - with transaction.atomic(): - new_snapshot.save() - - logger.debug("RRDP %s copying snapshot %s", self.uri, old_snapshot.serial) + logger.debug("RRDP %s %s deltas (%s -- %s)", self.uri, + (serial - snapshot.serial), snapshot.serial, serial) - if False: - # What we'd like to do here is: + for serial in xrange(snapshot.serial + 1, serial + 1): - new_snapshot.rpkiobject_set = old_snapshot.rpkiobject_set.all() + url, hash = delta_serials[serial] - # but, at least with the SQLite3 driver, this explodes when there are too many objects, - # because of the silly way the driver tries to implement this. + logger.debug("RRDP %s fetching %s serial %s", self.uri, url, serial) - else: - # So do this the slow way for now, do better later if it turns out to matter. + retrieval, delta = yield self._rrdp_fetch_url( + url = url, + hash = hash, + tag = "delta", + session_id = session_id, + serial = serial) - logger.debug("RRDP %s starting potentially really slow copy operation", self.uri) + logger.debug("RRDP %s loading delta %s serial %s", self.uri, url, serial) - for obj in old_snapshot.rpkiobject_set.all(): - new_snapshot.rpkiobject_set.add(obj) - - logger.debug("RRDP %s finished potentially really slow copy operation", self.uri) - - new_snapshot.save() - - for retrieval, delta in deltas: - - logger.debug("RRDP %s applying delta %s", self.uri, delta.get("serial")) + with transaction.atomic(): + snapshot.serial = serial + snapshot.save() for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "withdraw"): - new_snapshot.rpkiobject_set.remove( - new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) - new_snapshot.save() + snapshot.rpkiobject_set.remove( + snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) + snapshot.save() for x in delta.iterchildren(rpki.relaxng.rrdp.xmlns + "publish"): uri = x.get("uri") @@ -1114,16 +1105,16 @@ class Fetcher(object): if cls is None: raise RRDP_ParseFailure("Unexpected URI %s" % uri) if x.get("hash", None) is not None: - new_snapshot.rpkiobject_set.remove( - new_snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) - new_snapshot.save() + snapshot.rpkiobject_set.remove( + snapshot.rpkiobject_set.get(sha256 = x.get("hash").lower())) + snapshot.save() obj, created = cls.store_if_new(x.text.decode("base64"), uri, retrieval) - obj.snapshot.add(new_snapshot) + obj.snapshot.add(snapshot) obj.save() - logger.debug("RRDP %s committing", self.uri) + logger.debug("RRDP %s committing delta %s serial %s", self.uri, url, serial) - logger.debug("RRDP %s done loading deltas", self.uri) + logger.debug("RRDP %s done processing deltas", self.uri) except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError): pass # Already logged @@ -1274,15 +1265,16 @@ def final_cleanup(): # http://stackoverflow.com/questions/2317686/joining-2-sql-select-result-sets-into-one # https://docs.djangoproject.com/en/1.8/topics/db/sql/#executing-custom-sql-directly - q = RRDPSnapshot.objects - q = q.values("session_id") - q = q.annotate(max_serial = models.Max("serial")) - q = q.values_list("session_id", "max_serial") - - logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q)) + # Except that this shouldn't be necessary after all, now that we reuse snapshot objects in place. - for u, s in q: - RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete() + if False: + q = RRDPSnapshot.objects + q = q.values("session_id") + q = q.annotate(max_serial = models.Max("serial")) + q = q.values_list("session_id", "max_serial") + logger.debug("Annotation query for RRDPSnapshots gave us %r", list(q)) + for u, s in q: + RRDPSnapshot.objects.filter(session_id = u, serial__lt = s).delete() logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") |