diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-19 01:09:29 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-19 01:09:29 +0000 |
commit | 26c65b2bdfb408a1bdeb3557f6e460d78813f1d6 (patch) | |
tree | 743135e60ca412d205aa0952d835a48edfd04f02 /rpkid/rpki/rpkid.py | |
parent | ccc2eb0174eb708888249966e7c10306315fb1a5 (diff) |
Refactor rpkid high-level task system to use classes rather than
closures, to make it easier for long-running tasks to yield the CPU
periodically. As a side effect, this moves a lot of dense code out of
rpki.left_right.self_elt methods and into separate task-specific
classes. See #275.
svn path=/branches/tk274/; revision=4640
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r-- | rpkid/rpki/rpkid.py | 83 |
1 files changed, 20 insertions, 63 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index 80b0b7cb..962bedaa 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -57,6 +57,7 @@ import rpki.relaxng import rpki.log import rpki.async import rpki.daemonize +import rpki.rpkid_tasks class main(object): """ @@ -323,19 +324,18 @@ class main(object): if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive - def task_add(self, handler, cb = None, description = None): + def task_add(self, task): """ Add a task to the scheduler task queue, unless it's already queued. """ - t = task(self, handler, cb, description) - rpki.log.debug("New task %r" % t) + rpki.log.debug("New task %r" % task) rpki.log.debug("Task queue %r" % self.task_queue) - if t not in self.task_queue: - rpki.log.debug("Adding %r to task queue" % t) - self.task_queue.append(t) + if task not in self.task_queue: + rpki.log.debug("Adding %r to task queue" % task) + self.task_queue.append(task) return True else: - rpki.log.debug("Task %r was already in the task queue" % t) + rpki.log.debug("Task %r was already in the task queue" % task) return False def task_next(self): @@ -343,20 +343,14 @@ class main(object): Pull next task from the task queue and put it the deferred event queue (we don't want to run it directly, as that could eventually blow out our call stack). - - Not yet sure what to do here if task queue is empty. For now we - just return, on the theory that we've nothing left to do until the - next cron timer fires. This may be a bad assumption, revisit if - the rest of the code doesn't fit well with this assumption. """ - rpki.log.debug("Looking for next task") try: self.task_current = self.task_queue.pop(0) except IndexError: self.task_current = None else: rpki.log.debug("Pulled %r from task queue" % self.task_current) - rpki.async.defer(self.task_current.run) + rpki.async.defer(self.task_current) def task_run(self): """ @@ -365,15 +359,6 @@ class main(object): if self.task_current is None: self.task_next() - def cron_done(self, cb): - """ - Completion handler for timer-driven cron. - """ - self.sql.sweep() - self.cron_timeout = None - rpki.log.info("Finished cron run") - cb() - def cron(self, cb = None): """ Periodic tasks. @@ -385,9 +370,17 @@ class main(object): rpki.log.debug("Starting cron run") + def done(): + self.sql.sweep() + self.cron_timeout = None + rpki.log.info("Finished cron run started at %s" % now) + if cb is not None: + cb() + + completion = rpki.rpkid_tasks.CompletionHandler(done) for s in rpki.left_right.self_elt.sql_fetch_all(self): - self.task_add(s.cron) - self.task_add(self.cron_done, cb) + s.schedule_cron_tasks(completion) + nothing_queued = completion.count == 0 assert self.use_internal_cron or self.cron_timeout is None @@ -407,6 +400,8 @@ class main(object): elif self.use_internal_cron: rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) + if nothing_queued: + done() def cronjob_handler(self, query, path, cb): """ @@ -424,44 +419,6 @@ class main(object): rpki.log.debug("Starting externally triggered cron") self.cron(done) -class task(object): - """ - Scheduler task object. - """ - - def __init__(self, gctx, handler, cb = None, description = None): - self.gctx = gctx - self.handler = handler - self.cb = cb - self.description = description - - def __cmp__(self, other): - return cmp(self.handler, other.handler) - - def __hash__(self): - return self.handler.__hash__() - - def __repr__(self): - if self.description is None: - return repr(self.handler) - else: - return "<%r: %s>" % (self.handler, self.description) - - def done(self): - """ - Completion handler for task. - """ - if self.cb is not None: - self.cb() - self.gctx.task_next() - - def run(self): - """ - Run this task when called via the deferred event system. - """ - rpki.log.debug("Running task %r" % self) - self.handler(self.done) - class ca_obj(rpki.sql.sql_persistent): """ Internal CA object. |