diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-19 01:09:29 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-19 01:09:29 +0000 |
commit | 26c65b2bdfb408a1bdeb3557f6e460d78813f1d6 (patch) | |
tree | 743135e60ca412d205aa0952d835a48edfd04f02 /rpkid/rpki/rpkid_tasks.py | |
parent | ccc2eb0174eb708888249966e7c10306315fb1a5 (diff) |
Refactor rpkid high-level task system to use classes rather than
closures, to make it easier for long-running tasks to yield the CPU
periodically. As a side effect, this moves a lot of dense code out of
rpki.left_right.self_elt methods and into separate task-specific
classes. See #275.
svn path=/branches/tk274/; revision=4640
Diffstat (limited to 'rpkid/rpki/rpkid_tasks.py')
-rw-r--r-- | rpkid/rpki/rpkid_tasks.py | 84 |
1 files changed, 68 insertions, 16 deletions
diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py index c1f2cd7b..62d0f474 100644 --- a/rpkid/rpki/rpkid_tasks.py +++ b/rpkid/rpki/rpkid_tasks.py @@ -19,6 +19,23 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ +import rpki.log +import rpki.rpkid +import rpki.async +import rpki.up_down +import rpki.sundial +import rpki.publication +import rpki.exceptions + +## @var max_new_roas_at_once +# Upper limit on the number of ROAs we'll create in a single +# self_elt.update_roas() call. This is a bit of a kludge, and may be +# replaced with something more clever or general later; for the moment +# the goal is to avoid going totally compute bound when somebody +# throws 50,000 new ROA requests at us in a single batch. + +max_new_roas_at_once = 200 + class CompletionHandler(object): """ Track one or more scheduled rpkid tasks and execute a callback when @@ -30,17 +47,26 @@ class CompletionHandler(object): self.tasks = set() def register(self, task): + rpki.log.debug("Completion handler %r registering task %r" % (self, task)) self.tasks.add(task) - task.register_completion(self.complete) + task.register_completion(self.done) - def complete(self, task): + def done(self, task): try: self.tasks.remove(task) except KeyError: rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task)) + else: + rpki.log.debug("Completion handler %r called with registered task %r" % (self, task)) if not self.tasks: + rpki.log.debug("Completion handler %r finished, calling %r" % (self, self.cb)) self.cb() + @property + def count(self): + return len(self.tasks) + + class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. This just @@ -57,6 +83,7 @@ class AbstractTask(object): self.description = description self.completions = [] self.continuation = None + self.clear() def __repr__(self): return rpki.log.log_repr(self, self.description) @@ -65,8 +92,9 @@ class AbstractTask(object): self.completions.append(completion) def exit(self): - for completion in self.completions: - completion() + while self.completions: + self.completions.pop(0)(self) + self.clear() self.self.gctx.task_next() def postpone(self, continuation): @@ -77,6 +105,7 @@ class AbstractTask(object): def __call__(self): if self.continuation is None: rpki.log.debug("Running task %r" % self) + self.clear() self.start() else: rpki.log.debug("Restarting task %r at " % (self, self.continuation)) @@ -90,6 +119,9 @@ class AbstractTask(object): def start(self): raise NotImplementedError + def clear(self): + pass + class PollParentTask(AbstractTask): """ @@ -97,6 +129,12 @@ class PollParentTask(AbstractTask): parents, in turn. """ + def clear(self): + self.parent_iterator = None + self.parent = None + self.ca_map = None + self.class_iterator = None + def start(self): rpki.log.trace() self.gctx.checkpoint() @@ -144,7 +182,7 @@ class PollParentTask(AbstractTask): def ca_loop(self, iterator, ca): self.gctx.checkpoint() - ca.delete(parent, iterator) + ca.delete(self.parent, iterator) def ca_done(self): self.gctx.checkpoint() @@ -159,13 +197,20 @@ class UpdateChildrenTask(AbstractTask): resources and in expiration date. """ + def clear(self): + self.now = None + self.rsn = None + self.publisher = None + self.iterator = None + self.child = None + self.child_certs = None + def start(self): rpki.log.trace() self.gctx.checkpoint() rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id)) - self.now = rpki.sundial.now() - self.rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin) + self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) self.publisher = rpki.rpkid.publication_queue() rpki.async.iterator(self.children, self.loop, self.done) @@ -201,7 +246,7 @@ class UpdateChildrenTask(AbstractTask): ca_detail.generate_manifest(publisher = self.publisher) elif old_resources != new_resources or (old_resources.valid_until < self.rsn and irdb_resources.valid_until > self.now): - rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI())) + rpki.log.debug("Need to reissue child %s certificate SKI %s" % (self.child.child_handle, child_cert.cert.gSKI())) child_cert.reissue( ca_detail = ca_detail, resources = new_resources, @@ -209,7 +254,7 @@ class UpdateChildrenTask(AbstractTask): elif old_resources.valid_until < self.now: rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" - % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) + % (self.child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) child_cert.sql_delete() self.publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository) ca_detail.generate_manifest(publisher = self.publisher) @@ -218,16 +263,16 @@ class UpdateChildrenTask(AbstractTask): raise except Exception, e: self.gctx.checkpoint() - lose(e) + self.lose(e) else: self.gctx.checkpoint() self.gctx.sql.sweep() self.iterator() - def done(): + def done(self): self.gctx.checkpoint() self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.publication_failed) + self.publisher.call_pubd(self.exit, self.publication_failed) def publication_failed(self, e): rpki.log.traceback() @@ -241,6 +286,13 @@ class UpdateROAsTask(AbstractTask): Generate or update ROAs for this self. """ + def clear(self): + self.orphans = None + self.updates = None + self.publisher = None + self.ca_details = None + self.count = None + def start(self): rpki.log.trace() self.gctx.checkpoint() @@ -320,7 +372,7 @@ class UpdateROAsTask(AbstractTask): iterator() def publish(self, done): - if publisher.size > 0: + if self.publisher.size > 0: for ca_detail in self.ca_details: rpki.log.debug("Generating new CRL for %r" % ca_detail) ca_detail.generate_crl(publisher = self.publisher) @@ -341,10 +393,10 @@ class UpdateROAsTask(AbstractTask): self.exit() def done(self): - for roa in orphans: + for roa in self.orphans: try: - ca_details.add(roa.ca_detail) - roa.revoke(publisher = publisher, fast = True) + self.ca_details.add(roa.ca_detail) + roa.revoke(publisher = self.publisher, fast = True) except (SystemExit, rpki.async.ExitNow): raise except Exception, e: |