diff options
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r-- | rpkid/rpki/left_right.py | 64 |
1 files changed, 32 insertions, 32 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 87256a6e..b74b12b5 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -48,7 +48,7 @@ enforce_strict_up_down_xml_sender = False # the goal is to avoid going totally compute bound when somebody # throws 50,000 new ROA requests at us in a single batch. -max_new_roas_at_once = 500 +max_new_roas_at_once = 50 class left_right_namespace(object): """ @@ -683,40 +683,45 @@ class self_elt(data_elt): publisher = rpki.rpkid.publication_queue() ca_details = set() - - stopped_early = False seen = set() - for roa_request in roa_requests: + + def loop(iterator, roa_request): self.gctx.checkpoint() try: k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) if k in seen: rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) - continue - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - rpki.log.debug("Couldn't find existing ROA, created %r" % roa) - if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: - rpki.log.warn("Too many new ROAs in a single batch, deferring processing some of them") - stopped_early = True - break else: - rpki.log.debug("Found existing %r" % roa) - roa.update(publisher = publisher, fast = True) - ca_details.add(roa.ca_detail) + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) + rpki.log.debug("Couldn't find existing ROA, created %r" % roa) + else: + rpki.log.debug("Found existing %r" % roa) + roa.update(publisher = publisher, fast = True) + ca_details.add(roa.ca_detail) except (SystemExit, rpki.async.ExitNow): raise except Exception, e: if not isinstance(e, rpki.exceptions.NoCoveringCertForROA): rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) + if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: + self.gctx.sql.sweep() + self.gctx.checkpoint() + publisher.call_pubd(iterator, publication_failed) + else: + iterator() + + def publication_failed(e): + rpki.log.traceback() + rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.checkpoint() + cb() - # Don't flush old ROAs when we didn't finish creating new ones. - # We may regret this, but our options in this case are limited. + def done(): - if not stopped_early: orphans.extend(roas.itervalues()) for roa in orphans: try: @@ -728,22 +733,17 @@ class self_elt(data_elt): rpki.log.traceback() rpki.log.warn("Could not revoke %r: %s" % (roa, e)) - self.gctx.sql.sweep() - - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) + self.gctx.sql.sweep() - self.gctx.sql.sweep() + for ca_detail in ca_details: + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) - def publication_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.sql.sweep() self.gctx.checkpoint() - cb() + publisher.call_pubd(cb, publication_failed) - self.gctx.checkpoint() - publisher.call_pubd(cb, publication_failed) + rpki.async.iterator(roa_requests, loop, done) def roa_requests_failed(e): rpki.log.traceback() |