aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r--rpki/rpkid_tasks.py115
1 files changed, 65 insertions, 50 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index 11df3b73..faaa0bd9 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -394,67 +394,82 @@ class UpdateROAsTask(AbstractTask):
# fairly low given that we defer CRL and manifest generation until
# we're ready to publish, but it's theoretically present.
+ # Rewritten to conserve memory when postponing. We used to treat
+ # postponement as just another odd kind of voluntary yield, but
+ # when working with really big data sets this tends to result in
+ # extreme memory bloat. So, instead, once we hit the postponement
+ # limit we publish what we've got, discard all of our transient
+ # data, and recompute what we need to do when we come back. This
+ # is a bit less efficient, but should tend to converge towards the
+ # correct result, with a much smaller memory footprint.
+
@tornado.gen.coroutine
def main(self):
logger.debug("%r: Updating ROAs", self)
- try:
- r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
- except:
- logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
- return
+ while True: # Postponement loop
- logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
-
- roas = {}
- seen = set()
- orphans = []
- updates = []
- self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201
- self.ca_details = set() # pylint: disable=W0201
-
- for roa in self.tenant.roas.all():
- k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
- if k not in roas:
- roas[k] = roa
- elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
- orphans.append(roas[k])
- roas[k] = roa
- else:
- orphans.append(roa)
+ try:
+ r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
+ except:
+ logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
+ return
- for r_pdu in r_msg:
- k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
- if k in seen:
- logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
- logger.debug("%r: Created new %r", self, roa)
+ logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
+
+ roas = {}
+ seen = set()
+ orphans = []
+ updates = []
+ self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201
+ self.ca_details = set() # pylint: disable=W0201
+
+ for roa in self.tenant.roas.all():
+ k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
+ if k not in roas:
+ roas[k] = roa
+ elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
+ orphans.append(roas[k])
+ roas[k] = roa
else:
- logger.debug("%r: Found existing %r", self, roa)
- updates.append(roa)
+ orphans.append(roa)
- r_msg = seen = None
+ for r_pdu in r_msg:
+ k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
+ if k in seen:
+ logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
+ logger.debug("%r: Created new %r", self, roa)
+ else:
+ logger.debug("%r: Found existing %r", self, roa)
+ updates.append(roa)
- orphans.extend(roas.itervalues())
+ r_msg = seen = None
- roas = None
+ orphans.extend(roas.itervalues())
- while updates:
- if self.overdue:
- yield self.publish()
- yield self.postpone()
- roa = updates.pop(0)
- try:
- roa.update(publisher = self.publisher)
- self.ca_details.add(roa.ca_detail)
- except rpki.exceptions.NoCoveringCertForROA:
- logger.warning("%r: No covering certificate for %r, skipping", self, roa)
- except:
- logger.exception("%r: Could not update %r, skipping", self, roa)
+ roas = None
+
+ while updates and not self.overdue:
+ roa = updates.pop(0)
+ try:
+ roa.update(publisher = self.publisher)
+ self.ca_details.add(roa.ca_detail)
+ except rpki.exceptions.NoCoveringCertForROA:
+ logger.warning("%r: No covering certificate for %r, skipping", self, roa)
+ except:
+ logger.exception("%r: Could not update %r, skipping", self, roa)
+
+ if not self.overdue:
+ break
+
+ roas = seen = orphans = updates = None
+ yield self.publish()
+ yield self.postpone()
for roa in orphans:
try: