diff options
-rw-r--r-- | rpkid/rpki/async.py | 8 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 64 | ||||
-rw-r--r-- | rpkid/rpki/rpkic.py | 14 | ||||
-rw-r--r-- | rpkid/rpki/rpkid.py | 8 |
4 files changed, 57 insertions, 37 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index a7de6b25..6e9d1edf 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -385,6 +385,14 @@ def exit_event_loop(): """ raise ExitNow +def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)): + """ + Use a near-term timer to schedule an event after letting the timer + and I/O systems run. + """ + t = timer(handler) + t.set(delay) + class gc_summary(object): """ Periodic summary of GC state, for tracking down memory bloat. diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 87256a6e..b74b12b5 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -48,7 +48,7 @@ enforce_strict_up_down_xml_sender = False # the goal is to avoid going totally compute bound when somebody # throws 50,000 new ROA requests at us in a single batch. -max_new_roas_at_once = 500 +max_new_roas_at_once = 50 class left_right_namespace(object): """ @@ -683,40 +683,45 @@ class self_elt(data_elt): publisher = rpki.rpkid.publication_queue() ca_details = set() - - stopped_early = False seen = set() - for roa_request in roa_requests: + + def loop(iterator, roa_request): self.gctx.checkpoint() try: k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) if k in seen: rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) - continue - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - rpki.log.debug("Couldn't find existing ROA, created %r" % roa) - if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: - rpki.log.warn("Too many new ROAs in a single batch, deferring processing some of them") - stopped_early = True - break else: - rpki.log.debug("Found existing %r" % roa) - roa.update(publisher = publisher, fast = True) - ca_details.add(roa.ca_detail) + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) + rpki.log.debug("Couldn't find existing ROA, created %r" % roa) + else: + rpki.log.debug("Found existing %r" % roa) + roa.update(publisher = publisher, fast = True) + ca_details.add(roa.ca_detail) except (SystemExit, rpki.async.ExitNow): raise except Exception, e: if not isinstance(e, rpki.exceptions.NoCoveringCertForROA): rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) + if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: + self.gctx.sql.sweep() + self.gctx.checkpoint() + publisher.call_pubd(iterator, publication_failed) + else: + iterator() + + def publication_failed(e): + rpki.log.traceback() + rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.checkpoint() + cb() - # Don't flush old ROAs when we didn't finish creating new ones. - # We may regret this, but our options in this case are limited. + def done(): - if not stopped_early: orphans.extend(roas.itervalues()) for roa in orphans: try: @@ -728,22 +733,17 @@ class self_elt(data_elt): rpki.log.traceback() rpki.log.warn("Could not revoke %r: %s" % (roa, e)) - self.gctx.sql.sweep() - - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) + self.gctx.sql.sweep() - self.gctx.sql.sweep() + for ca_detail in ca_details: + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) - def publication_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.sql.sweep() self.gctx.checkpoint() - cb() + publisher.call_pubd(cb, publication_failed) - self.gctx.checkpoint() - publisher.call_pubd(cb, publication_failed) + rpki.async.iterator(roa_requests, loop, done) def roa_requests_failed(e): rpki.log.traceback() diff --git a/rpkid/rpki/rpkic.py b/rpkid/rpki/rpkic.py index 3ea44689..0559b13e 100644 --- a/rpkid/rpki/rpkic.py +++ b/rpkid/rpki/rpkic.py @@ -83,11 +83,14 @@ class main(rpki.cli.Cmd): cfg_file = None handle = None + self.autosynch = True - opts, argv = getopt.getopt(sys.argv[1:], "c:hi:?", ["config=", "help", "identity="]) + opts, argv = getopt.getopt(sys.argv[1:], "c:hi:?", ["config=", "dont_autosynch", "help", "identity="]) for o, a in opts: if o in ("-c", "--config"): cfg_file = a + elif o == "--dont_autosynch": + self.autosynch = False elif o in ("-h", "--help", "-?"): argv = ["help"] elif o in ("-i", "--identity"): @@ -460,7 +463,8 @@ class main(rpki.cli.Cmd): raise BadCommandSyntax("Need to specify prefixes.csv filename") self.zoo.load_prefixes(argv[0], True) - self.zoo.synchronize(self.zoo.handle) + if self.autosynch: + self.zoo.synchronize(self.zoo.handle) def do_show_child_resources(self, arg): @@ -494,7 +498,8 @@ class main(rpki.cli.Cmd): raise BadCommandSyntax("Need to specify asns.csv filename") self.zoo.load_asns(argv[0], True) - self.zoo.synchronize(self.zoo.handle) + if self.autosynch: + self.zoo.synchronize(self.zoo.handle) def do_load_roa_requests(self, arg): @@ -508,7 +513,8 @@ class main(rpki.cli.Cmd): raise BadCommandSyntax("Need to specify roa.csv filename") self.zoo.load_roa_requests(argv[0]) - self.zoo.synchronize(self.zoo.handle) + if self.autosynch: + self.zoo.synchronize(self.zoo.handle) def do_synchronize(self, arg): diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index b923b1a3..f3fc38fa 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -1846,6 +1846,9 @@ class publication_queue(object): replace = True def __init__(self): + self.clear() + + def clear(self): self.repositories = {} self.msgs = {} self.handlers = {} @@ -1877,7 +1880,10 @@ class publication_queue(object): def call_pubd(self, cb, eb): def loop(iterator, rid): self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) - rpki.async.iterator(self.repositories, loop, cb) + def done(): + self.clear() + cb() + rpki.async.iterator(self.repositories, loop, done) @property def size(self): |