aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpkid/rpki/async.py8
-rw-r--r--rpkid/rpki/left_right.py64
-rw-r--r--rpkid/rpki/rpkic.py14
-rw-r--r--rpkid/rpki/rpkid.py8
4 files changed, 57 insertions, 37 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index a7de6b25..6e9d1edf 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -385,6 +385,14 @@ def exit_event_loop():
"""
raise ExitNow
+def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)):
+ """
+ Use a near-term timer to schedule an event after letting the timer
+ and I/O systems run.
+ """
+ t = timer(handler)
+ t.set(delay)
+
class gc_summary(object):
"""
Periodic summary of GC state, for tracking down memory bloat.
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 87256a6e..b74b12b5 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -48,7 +48,7 @@ enforce_strict_up_down_xml_sender = False
# the goal is to avoid going totally compute bound when somebody
# throws 50,000 new ROA requests at us in a single batch.
-max_new_roas_at_once = 500
+max_new_roas_at_once = 50
class left_right_namespace(object):
"""
@@ -683,40 +683,45 @@ class self_elt(data_elt):
publisher = rpki.rpkid.publication_queue()
ca_details = set()
-
- stopped_early = False
seen = set()
- for roa_request in roa_requests:
+
+ def loop(iterator, roa_request):
self.gctx.checkpoint()
try:
k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
if k in seen:
rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
- continue
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
- if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
- rpki.log.warn("Too many new ROAs in a single batch, deferring processing some of them")
- stopped_early = True
- break
else:
- rpki.log.debug("Found existing %r" % roa)
- roa.update(publisher = publisher, fast = True)
- ca_details.add(roa.ca_detail)
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ roa.update(publisher = publisher, fast = True)
+ ca_details.add(roa.ca_detail)
except (SystemExit, rpki.async.ExitNow):
raise
except Exception, e:
if not isinstance(e, rpki.exceptions.NoCoveringCertForROA):
rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
+ if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
+ self.gctx.sql.sweep()
+ self.gctx.checkpoint()
+ publisher.call_pubd(iterator, publication_failed)
+ else:
+ iterator()
+
+ def publication_failed(e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ cb()
- # Don't flush old ROAs when we didn't finish creating new ones.
- # We may regret this, but our options in this case are limited.
+ def done():
- if not stopped_early:
orphans.extend(roas.itervalues())
for roa in orphans:
try:
@@ -728,22 +733,17 @@ class self_elt(data_elt):
rpki.log.traceback()
rpki.log.warn("Could not revoke %r: %s" % (roa, e))
- self.gctx.sql.sweep()
-
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
+ self.gctx.sql.sweep()
- self.gctx.sql.sweep()
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
- def publication_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.sql.sweep()
self.gctx.checkpoint()
- cb()
+ publisher.call_pubd(cb, publication_failed)
- self.gctx.checkpoint()
- publisher.call_pubd(cb, publication_failed)
+ rpki.async.iterator(roa_requests, loop, done)
def roa_requests_failed(e):
rpki.log.traceback()
diff --git a/rpkid/rpki/rpkic.py b/rpkid/rpki/rpkic.py
index 3ea44689..0559b13e 100644
--- a/rpkid/rpki/rpkic.py
+++ b/rpkid/rpki/rpkic.py
@@ -83,11 +83,14 @@ class main(rpki.cli.Cmd):
cfg_file = None
handle = None
+ self.autosynch = True
- opts, argv = getopt.getopt(sys.argv[1:], "c:hi:?", ["config=", "help", "identity="])
+ opts, argv = getopt.getopt(sys.argv[1:], "c:hi:?", ["config=", "dont_autosynch", "help", "identity="])
for o, a in opts:
if o in ("-c", "--config"):
cfg_file = a
+ elif o == "--dont_autosynch":
+ self.autosynch = False
elif o in ("-h", "--help", "-?"):
argv = ["help"]
elif o in ("-i", "--identity"):
@@ -460,7 +463,8 @@ class main(rpki.cli.Cmd):
raise BadCommandSyntax("Need to specify prefixes.csv filename")
self.zoo.load_prefixes(argv[0], True)
- self.zoo.synchronize(self.zoo.handle)
+ if self.autosynch:
+ self.zoo.synchronize(self.zoo.handle)
def do_show_child_resources(self, arg):
@@ -494,7 +498,8 @@ class main(rpki.cli.Cmd):
raise BadCommandSyntax("Need to specify asns.csv filename")
self.zoo.load_asns(argv[0], True)
- self.zoo.synchronize(self.zoo.handle)
+ if self.autosynch:
+ self.zoo.synchronize(self.zoo.handle)
def do_load_roa_requests(self, arg):
@@ -508,7 +513,8 @@ class main(rpki.cli.Cmd):
raise BadCommandSyntax("Need to specify roa.csv filename")
self.zoo.load_roa_requests(argv[0])
- self.zoo.synchronize(self.zoo.handle)
+ if self.autosynch:
+ self.zoo.synchronize(self.zoo.handle)
def do_synchronize(self, arg):
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index b923b1a3..f3fc38fa 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -1846,6 +1846,9 @@ class publication_queue(object):
replace = True
def __init__(self):
+ self.clear()
+
+ def clear(self):
self.repositories = {}
self.msgs = {}
self.handlers = {}
@@ -1877,7 +1880,10 @@ class publication_queue(object):
def call_pubd(self, cb, eb):
def loop(iterator, rid):
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
- rpki.async.iterator(self.repositories, loop, cb)
+ def done():
+ self.clear()
+ cb()
+ rpki.async.iterator(self.repositories, loop, done)
@property
def size(self):