diff options
-rw-r--r-- | rpkid/rpki/left_right.py | 20 | ||||
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 11 |
2 files changed, 30 insertions, 1 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index cce982b4..ac0e22e1 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -277,18 +277,22 @@ class self_elt(data_elt): """ def one(): + self.gctx.checkpoint() rpki.log.debug("Self %s[%d] polling parents" % (self.self_handle, self.self_id)) self.client_poll(two) def two(): + self.gctx.checkpoint() rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id)) self.update_children(three) def three(): + self.gctx.checkpoint() rpki.log.debug("Self %s[%d] updating ROAs" % (self.self_handle, self.self_id)) self.update_roas(four) def four(): + self.gctx.checkpoint() rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id)) self.regenerate_crls_and_manifests(cb) @@ -307,6 +311,7 @@ class self_elt(data_elt): def got_list(r_msg): ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas()) + self.gctx.checkpoint() def class_loop(class_iterator, rc): @@ -320,6 +325,7 @@ class self_elt(data_elt): rpki.log.warn("Couldn't create class, skipping: %s" % e) class_iterator() + self.gctx.checkpoint() if rc.class_name in ca_map: ca = ca_map[rc.class_name] del ca_map[rc.class_name] @@ -330,9 +336,11 @@ class self_elt(data_elt): def class_done(): def ca_loop(iterator, ca): + self.gctx.checkpoint() ca.delete(parent, iterator) def ca_done(): + self.gctx.checkpoint() self.gctx.sql.sweep() parent_iterator() @@ -392,10 +400,13 @@ class self_elt(data_elt): except (SystemExit, rpki.async.ExitNow): raise except Exception, e: + self.gctx.checkpoint() lose(e) else: + self.gctx.checkpoint() iterator() + self.gctx.checkpoint() child_certs = child.child_certs() if child_certs: self.gctx.irdb_query_child_resources(child.self().self_handle, child.child_handle, got_resources, lose) @@ -406,7 +417,9 @@ class self_elt(data_elt): def lose(e): rpki.log.traceback() rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.checkpoint() cb() + self.gctx.checkpoint() publisher.call_pubd(cb, lose) rpki.async.iterator(self.children(), loop, done) @@ -447,8 +460,10 @@ class self_elt(data_elt): def lose(e): rpki.log.traceback() rpki.log.warn("Couldn't publish updated CRLs and manifests for self %r, skipping: %s" % (self.self_handle, e)) + self.gctx.checkpoint() cb() + self.gctx.checkpoint() publisher.call_pubd(cb, lose) @@ -459,6 +474,8 @@ class self_elt(data_elt): def got_roa_requests(roa_requests): + self.gctx.checkpoint() + roas = dict(((r.asn, str(r.ipv4), str(r.ipv6)), r) for r in self.roas()) publisher = rpki.rpki_engine.publication_queue() @@ -492,8 +509,10 @@ class self_elt(data_elt): def publication_failed(e): rpki.log.traceback() rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) + self.gctx.checkpoint() cb() + self.gctx.checkpoint() publisher.call_pubd(cb, publication_failed) def roa_requests_failed(e): @@ -501,6 +520,7 @@ class self_elt(data_elt): rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e)) cb() + self.gctx.checkpoint() self.gctx.irdb_query_roa_requests(self.self_handle, got_roa_requests, roa_requests_failed) class bsc_elt(data_elt): diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index b81540d9..147ca8cc 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -197,6 +197,14 @@ class rpkid_context(object): rpki.log.traceback() cb(400, "Could not process PDU: %s" % data) + def checkpoint(self): + """ + Record that we were still alive when we got here, by resetting + keepalive timer. + """ + self.cron_timeout = rpki.sundial.now() + self.cron_keepalive + rpki.log.debug("Checkpoint: keepalive timer reset to %s" % self.cron_timeout) + def cron(self, cb = None): """ Periodic tasks. @@ -223,9 +231,10 @@ class rpkid_context(object): rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) return - self.cron_timeout = now + self.cron_keepalive + self.checkpoint() def loop(iterator, s): + self.checkpoint() s.cron(iterator) def done(): |