aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/left_right.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r--rpkid/rpki/left_right.py64
1 files changed, 32 insertions, 32 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 87256a6e..b74b12b5 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -48,7 +48,7 @@ enforce_strict_up_down_xml_sender = False
# the goal is to avoid going totally compute bound when somebody
# throws 50,000 new ROA requests at us in a single batch.
-max_new_roas_at_once = 500
+max_new_roas_at_once = 50
class left_right_namespace(object):
"""
@@ -683,40 +683,45 @@ class self_elt(data_elt):
publisher = rpki.rpkid.publication_queue()
ca_details = set()
-
- stopped_early = False
seen = set()
- for roa_request in roa_requests:
+
+ def loop(iterator, roa_request):
self.gctx.checkpoint()
try:
k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
if k in seen:
rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
- continue
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
- if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
- rpki.log.warn("Too many new ROAs in a single batch, deferring processing some of them")
- stopped_early = True
- break
else:
- rpki.log.debug("Found existing %r" % roa)
- roa.update(publisher = publisher, fast = True)
- ca_details.add(roa.ca_detail)
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ roa.update(publisher = publisher, fast = True)
+ ca_details.add(roa.ca_detail)
except (SystemExit, rpki.async.ExitNow):
raise
except Exception, e:
if not isinstance(e, rpki.exceptions.NoCoveringCertForROA):
rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
+ if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
+ self.gctx.sql.sweep()
+ self.gctx.checkpoint()
+ publisher.call_pubd(iterator, publication_failed)
+ else:
+ iterator()
+
+ def publication_failed(e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ cb()
- # Don't flush old ROAs when we didn't finish creating new ones.
- # We may regret this, but our options in this case are limited.
+ def done():
- if not stopped_early:
orphans.extend(roas.itervalues())
for roa in orphans:
try:
@@ -728,22 +733,17 @@ class self_elt(data_elt):
rpki.log.traceback()
rpki.log.warn("Could not revoke %r: %s" % (roa, e))
- self.gctx.sql.sweep()
-
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
+ self.gctx.sql.sweep()
- self.gctx.sql.sweep()
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
- def publication_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.sql.sweep()
self.gctx.checkpoint()
- cb()
+ publisher.call_pubd(cb, publication_failed)
- self.gctx.checkpoint()
- publisher.call_pubd(cb, publication_failed)
+ rpki.async.iterator(roa_requests, loop, done)
def roa_requests_failed(e):
rpki.log.traceback()