diff options
author | Rob Austein <sra@hactrn.net> | 2016-03-03 06:19:55 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-03-03 06:19:55 +0000 |
commit | 3faf15f327bc6745a7fd084a0cfaa5d4892fe862 (patch) | |
tree | e26cd805f1088d97f41e6f0034b81b268690306c | |
parent | 8f3cc172776b3a0e6382caa2b3d44591d341e76b (diff) |
Speed up snapshot loading and cleanup with huge data sets.
svn path=/branches/tk705/; revision=6307
-rwxr-xr-x | rp/rcynic/rcynicng | 35 | ||||
-rw-r--r-- | rpki/rcynicdb/models.py | 25 |
2 files changed, 47 insertions, 13 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index c8add7fe..2f3b4a75 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -1008,6 +1008,8 @@ class Fetcher(object): yield tornado.gen.moment try: existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der)) + logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s", + new_objs[i].uri, new_objs[i].sha256) except RPKIObject.DoesNotExist: i += 1 else: @@ -1052,8 +1054,11 @@ class Fetcher(object): if snapshot is None or snapshot.serial + 1 not in deltas: + existing_rpkiobject_map = dict() + if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) + existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk")) x = notification.find(tag_snapshot) @@ -1065,7 +1070,7 @@ class Fetcher(object): snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) - # Value of "chunk" here probably needs to be configurable. Larger numbers batch more objects in + # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in # a single bulk addition, which is faster ... unless one or more of them isn't really new, in # which case we have to check everything in that batch when we get the IntegrityError, so # the smaller the batch, the faster that check. No single good answer. @@ -1073,8 +1078,8 @@ class Fetcher(object): root = None existing_rpkiobjects = [] new_rpkiobjects = [] - chunk = 2000 # 10000 - + chunk = 2000 + for event, node in iterparse(xml_file): if node is root: continue @@ -1102,9 +1107,13 @@ class Fetcher(object): raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) der = node.text.decode("base64") - ski, aki = cls.derRead(der).get_hex_SKI_AKI() - new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, - retrieved = retrieval, sha256 = sha256hex(der))) + sha256 = sha256hex(der) + try: + existing_rpkiobjects.append(existing_rpkiobject_map[sha256]) + except KeyError: + ski, aki = cls.derRead(der).get_hex_SKI_AKI() + new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, + retrieved = retrieval, sha256 = sha256)) node.clear() while node.getprevious() is not None: @@ -1118,13 +1127,13 @@ class Fetcher(object): if len(new_rpkiobjects) > 0: yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) - n = len(existing_rpkiobjects) - for i in xrange(0, n, chunk): - snapshot.rpkiobject_set.add(*existing_rpkiobjects[i : i + chunk]) + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in retrieval.rpkiobject_set.values_list("pk", flat = True)]) - n = retrieval.rpkiobject_set.count() - for i in xrange(0, n, chunk): - snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk])) + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in existing_rpkiobjects]) snapshot.retrieved = retrieval snapshot.save() @@ -1337,7 +1346,7 @@ def final_cleanup(): #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") q = RPKIObject.objects - q = q.exclude(authenticated = authenticated.id) + q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id) q = q.filter(snapshot = None) q.delete() diff --git a/rpki/rcynicdb/models.py b/rpki/rcynicdb/models.py index cd6d3698..9a790230 100644 --- a/rpki/rcynicdb/models.py +++ b/rpki/rcynicdb/models.py @@ -10,12 +10,24 @@ class Retrieval(models.Model): finished = models.DateTimeField() successful = models.BooleanField() + def __repr__(self): + try: + return "<Retrieval: {0.uri} started {0.started} finished {0.finished} successful {0.successful}>".format(self) + except: + return "<Retrieval: {}>".format(id(self)) + # Collection of validated objects. class Authenticated(models.Model): started = models.DateTimeField() finished = models.DateTimeField(null = True) + def __repr__(self): + try: + return "<Authenticated: started {0.started} finished {0.finished}>".format(self) + except: + return "<Authenticated: {}>".format(id(self)) + # One instance of an RRDP snapshot. class RRDPSnapshot(models.Model): @@ -23,6 +35,13 @@ class RRDPSnapshot(models.Model): serial = models.BigIntegerField() retrieved = models.OneToOneField(Retrieval, null = True) + def __repr__(self): + try: + return "<RRDPSnapshot: serial {0.serial} session_id {0.session_id} retrieved {0.retrieved!r}>".format(self) + except: + return "<RRDPSnapshot: {}>".format(id(self)) + + # RPKI objects. # # Might need to add an on_delete argument to the ForeignKey for the @@ -54,3 +73,9 @@ class RPKIObject(models.Model): retrieved = models.ForeignKey(Retrieval) authenticated = models.ManyToManyField(Authenticated) snapshot = models.ManyToManyField(RRDPSnapshot) + + def __repr__(self): + try: + return "<RPKIObject: uri {0.uri} sha256 {0.sha256} ski {0.ski} aki {0.aki} retrieved {0.retrieved!r}>".format(self) + except: + return "<RPKIObject: {}>".format(id(self)) |