aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-02-23 21:18:07 +0000
committerRob Austein <sra@hactrn.net>2016-02-23 21:18:07 +0000
commit2ed40817177c93aa84e9f130b2c8c37519adcf73 (patch)
treed534387da2b15776b2a260092bbd16052590ae28
parentf413061c35230af16e147a09ce5bdcb3fea134cd (diff)
Rework ROA postponement code to lower memory footprint.
svn path=/branches/tk705/; revision=6280
-rw-r--r--rpki/rpkid_tasks.py115
1 files changed, 65 insertions, 50 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index 11df3b73..faaa0bd9 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -394,67 +394,82 @@ class UpdateROAsTask(AbstractTask):
# fairly low given that we defer CRL and manifest generation until
# we're ready to publish, but it's theoretically present.
+ # Rewritten to conserve memory when postponing. We used to treat
+ # postponement as just another odd kind of voluntary yield, but
+ # when working with really big data sets this tends to result in
+ # extreme memory bloat. So, instead, once we hit the postponement
+ # limit we publish what we've got, discard all of our transient
+ # data, and recompute what we need to do when we come back. This
+ # is a bit less efficient, but should tend to converge towards the
+ # correct result, with a much smaller memory footprint.
+
@tornado.gen.coroutine
def main(self):
logger.debug("%r: Updating ROAs", self)
- try:
- r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
- except:
- logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
- return
+ while True: # Postponement loop
- logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
-
- roas = {}
- seen = set()
- orphans = []
- updates = []
- self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201
- self.ca_details = set() # pylint: disable=W0201
-
- for roa in self.tenant.roas.all():
- k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
- if k not in roas:
- roas[k] = roa
- elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
- orphans.append(roas[k])
- roas[k] = roa
- else:
- orphans.append(roa)
+ try:
+ r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
+ except:
+ logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
+ return
- for r_pdu in r_msg:
- k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
- if k in seen:
- logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
- logger.debug("%r: Created new %r", self, roa)
+ logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
+
+ roas = {}
+ seen = set()
+ orphans = []
+ updates = []
+ self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201
+ self.ca_details = set() # pylint: disable=W0201
+
+ for roa in self.tenant.roas.all():
+ k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
+ if k not in roas:
+ roas[k] = roa
+ elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
+ orphans.append(roas[k])
+ roas[k] = roa
else:
- logger.debug("%r: Found existing %r", self, roa)
- updates.append(roa)
+ orphans.append(roa)
- r_msg = seen = None
+ for r_pdu in r_msg:
+ k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
+ if k in seen:
+ logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
+ logger.debug("%r: Created new %r", self, roa)
+ else:
+ logger.debug("%r: Found existing %r", self, roa)
+ updates.append(roa)
- orphans.extend(roas.itervalues())
+ r_msg = seen = None
- roas = None
+ orphans.extend(roas.itervalues())
- while updates:
- if self.overdue:
- yield self.publish()
- yield self.postpone()
- roa = updates.pop(0)
- try:
- roa.update(publisher = self.publisher)
- self.ca_details.add(roa.ca_detail)
- except rpki.exceptions.NoCoveringCertForROA:
- logger.warning("%r: No covering certificate for %r, skipping", self, roa)
- except:
- logger.exception("%r: Could not update %r, skipping", self, roa)
+ roas = None
+
+ while updates and not self.overdue:
+ roa = updates.pop(0)
+ try:
+ roa.update(publisher = self.publisher)
+ self.ca_details.add(roa.ca_detail)
+ except rpki.exceptions.NoCoveringCertForROA:
+ logger.warning("%r: No covering certificate for %r, skipping", self, roa)
+ except:
+ logger.exception("%r: Could not update %r, skipping", self, roa)
+
+ if not self.overdue:
+ break
+
+ roas = seen = orphans = updates = None
+ yield self.publish()
+ yield self.postpone()
for roa in orphans:
try:
='n361' href='#n361'>361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398