diff options
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 115 |
1 files changed, 65 insertions, 50 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 11df3b73..faaa0bd9 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -394,67 +394,82 @@ class UpdateROAsTask(AbstractTask): # fairly low given that we defer CRL and manifest generation until # we're ready to publish, but it's theoretically present. + # Rewritten to conserve memory when postponing. We used to treat + # postponement as just another odd kind of voluntary yield, but + # when working with really big data sets this tends to result in + # extreme memory bloat. So, instead, once we hit the postponement + # limit we publish what we've got, discard all of our transient + # data, and recompute what we need to do when we come back. This + # is a bit less efficient, but should tend to converge towards the + # correct result, with a much smaller memory footprint. + @tornado.gen.coroutine def main(self): logger.debug("%r: Updating ROAs", self) - try: - r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) - except: - logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) - return + while True: # Postponement loop - logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) - - roas = {} - seen = set() - orphans = [] - updates = [] - self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201 - self.ca_details = set() # pylint: disable=W0201 - - for roa in self.tenant.roas.all(): - k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) - if k not in roas: - roas[k] = roa - elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): - orphans.append(roas[k]) - roas[k] = roa - else: - orphans.append(roa) + try: + r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) + except: + logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) + return - for r_pdu in r_msg: - k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) - if k in seen: - logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) - else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) - logger.debug("%r: Created new %r", self, roa) + logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) + + roas = {} + seen = set() + orphans = [] + updates = [] + self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201 + self.ca_details = set() # pylint: disable=W0201 + + for roa in self.tenant.roas.all(): + k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) + if k not in roas: + roas[k] = roa + elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): + orphans.append(roas[k]) + roas[k] = roa else: - logger.debug("%r: Found existing %r", self, roa) - updates.append(roa) + orphans.append(roa) - r_msg = seen = None + for r_pdu in r_msg: + k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) + if k in seen: + logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) + else: + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + logger.debug("%r: Created new %r", self, roa) + else: + logger.debug("%r: Found existing %r", self, roa) + updates.append(roa) - orphans.extend(roas.itervalues()) + r_msg = seen = None - roas = None + orphans.extend(roas.itervalues()) - while updates: - if self.overdue: - yield self.publish() - yield self.postpone() - roa = updates.pop(0) - try: - roa.update(publisher = self.publisher) - self.ca_details.add(roa.ca_detail) - except rpki.exceptions.NoCoveringCertForROA: - logger.warning("%r: No covering certificate for %r, skipping", self, roa) - except: - logger.exception("%r: Could not update %r, skipping", self, roa) + roas = None + + while updates and not self.overdue: + roa = updates.pop(0) + try: + roa.update(publisher = self.publisher) + self.ca_details.add(roa.ca_detail) + except rpki.exceptions.NoCoveringCertForROA: + logger.warning("%r: No covering certificate for %r, skipping", self, roa) + except: + logger.exception("%r: Could not update %r, skipping", self, roa) + + if not self.overdue: + break + + roas = seen = orphans = updates = None + yield self.publish() + yield self.postpone() for roa in orphans: try: |