diff options
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-x | rp/rcynic/rcynicng | 35 |
1 files changed, 22 insertions, 13 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index c8add7fe..2f3b4a75 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -1008,6 +1008,8 @@ class Fetcher(object): yield tornado.gen.moment try: existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der)) + logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s", + new_objs[i].uri, new_objs[i].sha256) except RPKIObject.DoesNotExist: i += 1 else: @@ -1052,8 +1054,11 @@ class Fetcher(object): if snapshot is None or snapshot.serial + 1 not in deltas: + existing_rpkiobject_map = dict() + if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) + existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk")) x = notification.find(tag_snapshot) @@ -1065,7 +1070,7 @@ class Fetcher(object): snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) - # Value of "chunk" here probably needs to be configurable. Larger numbers batch more objects in + # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in # a single bulk addition, which is faster ... unless one or more of them isn't really new, in # which case we have to check everything in that batch when we get the IntegrityError, so # the smaller the batch, the faster that check. No single good answer. @@ -1073,8 +1078,8 @@ class Fetcher(object): root = None existing_rpkiobjects = [] new_rpkiobjects = [] - chunk = 2000 # 10000 - + chunk = 2000 + for event, node in iterparse(xml_file): if node is root: continue @@ -1102,9 +1107,13 @@ class Fetcher(object): raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) der = node.text.decode("base64") - ski, aki = cls.derRead(der).get_hex_SKI_AKI() - new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, - retrieved = retrieval, sha256 = sha256hex(der))) + sha256 = sha256hex(der) + try: + existing_rpkiobjects.append(existing_rpkiobject_map[sha256]) + except KeyError: + ski, aki = cls.derRead(der).get_hex_SKI_AKI() + new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, + retrieved = retrieval, sha256 = sha256)) node.clear() while node.getprevious() is not None: @@ -1118,13 +1127,13 @@ class Fetcher(object): if len(new_rpkiobjects) > 0: yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) - n = len(existing_rpkiobjects) - for i in xrange(0, n, chunk): - snapshot.rpkiobject_set.add(*existing_rpkiobjects[i : i + chunk]) + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in retrieval.rpkiobject_set.values_list("pk", flat = True)]) - n = retrieval.rpkiobject_set.count() - for i in xrange(0, n, chunk): - snapshot.rpkiobject_set.add(*list(retrieval.rpkiobject_set.values_list("pk", flat = True)[i : i + chunk])) + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in existing_rpkiobjects]) snapshot.retrieved = retrieval snapshot.save() @@ -1337,7 +1346,7 @@ def final_cleanup(): #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") q = RPKIObject.objects - q = q.exclude(authenticated = authenticated.id) + q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id) q = q.filter(snapshot = None) q.delete() |