aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-03-03 06:19:55 +0000
committerRob Austein <sra@hactrn.net>2016-03-03 06:19:55 +0000
commit3faf15f327bc6745a7fd084a0cfaa5d4892fe862 (patch)
treee26cd805f1088d97f41e6f0034b81b268690306c
parent8f3cc172776b3a0e6382caa2b3d44591d341e76b (diff)
Speed up snapshot loading and cleanup with huge data sets.
svn path=/branches/tk705/; revision=6307
-rwxr-xr-xrp/rcynic/rcynicng35
-rw-r--r--rpki/rcynicdb/models.py25
2 files changed, 47 insertions, 13 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index c8add7fe..2f3b4a75 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -1008,6 +1008,8 @@ class Fetcher(object):
yield tornado.gen.moment
try:
existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der))
+ logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s",
+ new_objs[i].uri, new_objs[i].sha256)
except RPKIObject.DoesNotExist:
i += 1
else:
@@ -1052,8 +1054,11 @@ class Fetcher(object):
if snapshot is None or snapshot.serial + 1 not in deltas:
+ existing_rpkiobject_map = dict()
+
if snapshot is not None:
logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
+ existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk"))
x = notification.find(tag_snapshot)
@@ -1065,7 +1070,7 @@ class Fetcher(object):
snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial)
- # Value of "chunk" here probably needs to be configurable. Larger numbers batch more objects in
+ # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in
# a single bulk addition, which is faster ... unless one or more of them isn't really new, in
# which case we have to check everything in that batch when we get the IntegrityError, so
# the smaller the batch, the faster that check. No single good answer.
@@ -1073,8 +1078,8 @@ class Fetcher(object):
root = None
existing_rpkiobjects = []
new_rpkiobjects = []
- chunk = 2000 # 10000
-
+ chunk = 2000
+
for event, node in iterparse(xml_file):
if node is root:
continue
@@ -1102,9 +1107,13 @@ class Fetcher(object):
raise RRDP_ParseFailure("Unexpected URI {}".format(uri))
der = node.text.decode("base64")
- ski, aki = cls.derRead(der).get_hex_SKI_AKI()
- new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
- retrieved = retrieval, sha256 = sha256hex(der)))
+ sha256 = sha256hex(der)
+ try:
+ existing_rpkiobjects.append(existing_rpkiobject_map[sha256])
+ except KeyError:
+ ski, aki = cls.derRead(der).get_hex_SKI_AKI()
+ new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
+ retrieved = retrieval, sha256 = sha256))
node.clear()
while node.getprevious() is not None:
@@ -1118,13 +1127,13 @@ class Fetcher(object):
if len(new_rpkiobjects) > 0:
yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects)
- n = len(existing_rpkiobjects)
- for i in xrange(0, n, chunk):
- snapshot.rpkiobject_set.add(*existing_rpkiobjects[i : i + chunk])
+ RPKIObject.snapshot.through.objects.bulk_create([
+ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i)
+ for i in retrieval.rpkiobject_set.values_list("pk", flat = True)])
- n = retrieval.rpkiobject_set.count()
- for i in xrange(0, n, chunk):
- snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk]))
+ RPKIObject.snapshot.through.objects.bulk_create([
+ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i)
+ for i in existing_rpkiobjects])
snapshot.retrieved = retrieval
snapshot.save()
@@ -1337,7 +1346,7 @@ def final_cleanup():
#logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")
q = RPKIObject.objects
- q = q.exclude(authenticated = authenticated.id)
+ q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id)
q = q.filter(snapshot = None)
q.delete()
diff --git a/rpki/rcynicdb/models.py b/rpki/rcynicdb/models.py
index cd6d3698..9a790230 100644
--- a/rpki/rcynicdb/models.py
+++ b/rpki/rcynicdb/models.py
@@ -10,12 +10,24 @@ class Retrieval(models.Model):
finished = models.DateTimeField()
successful = models.BooleanField()
+ def __repr__(self):
+ try:
+ return "<Retrieval: {0.uri} started {0.started} finished {0.finished} successful {0.successful}>".format(self)
+ except:
+ return "<Retrieval: {}>".format(id(self))
+
# Collection of validated objects.
class Authenticated(models.Model):
started = models.DateTimeField()
finished = models.DateTimeField(null = True)
+ def __repr__(self):
+ try:
+ return "<Authenticated: started {0.started} finished {0.finished}>".format(self)
+ except:
+ return "<Authenticated: {}>".format(id(self))
+
# One instance of an RRDP snapshot.
class RRDPSnapshot(models.Model):
@@ -23,6 +35,13 @@ class RRDPSnapshot(models.Model):
serial = models.BigIntegerField()
retrieved = models.OneToOneField(Retrieval, null = True)
+ def __repr__(self):
+ try:
+ return "<RRDPSnapshot: serial {0.serial} session_id {0.session_id} retrieved {0.retrieved!r}>".format(self)
+ except:
+ return "<RRDPSnapshot: {}>".format(id(self))
+
+
# RPKI objects.
#
# Might need to add an on_delete argument to the ForeignKey for the
@@ -54,3 +73,9 @@ class RPKIObject(models.Model):
retrieved = models.ForeignKey(Retrieval)
authenticated = models.ManyToManyField(Authenticated)
snapshot = models.ManyToManyField(RRDPSnapshot)
+
+ def __repr__(self):
+ try:
+ return "<RPKIObject: uri {0.uri} sha256 {0.sha256} ski {0.ski} aki {0.aki} retrieved {0.retrieved!r}>".format(self)
+ except:
+ return "<RPKIObject: {}>".format(id(self))