aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/rpkid.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-19 01:09:29 +0000
committerRob Austein <sra@hactrn.net>2012-08-19 01:09:29 +0000
commit26c65b2bdfb408a1bdeb3557f6e460d78813f1d6 (patch)
tree743135e60ca412d205aa0952d835a48edfd04f02 /rpkid/rpki/rpkid.py
parentccc2eb0174eb708888249966e7c10306315fb1a5 (diff)
Refactor rpkid high-level task system to use classes rather than
closures, to make it easier for long-running tasks to yield the CPU periodically. As a side effect, this moves a lot of dense code out of rpki.left_right.self_elt methods and into separate task-specific classes. See #275. svn path=/branches/tk274/; revision=4640
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r--rpkid/rpki/rpkid.py83
1 files changed, 20 insertions, 63 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index 80b0b7cb..962bedaa 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -57,6 +57,7 @@ import rpki.relaxng
import rpki.log
import rpki.async
import rpki.daemonize
+import rpki.rpkid_tasks
class main(object):
"""
@@ -323,19 +324,18 @@ class main(object):
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
- def task_add(self, handler, cb = None, description = None):
+ def task_add(self, task):
"""
Add a task to the scheduler task queue, unless it's already queued.
"""
- t = task(self, handler, cb, description)
- rpki.log.debug("New task %r" % t)
+ rpki.log.debug("New task %r" % task)
rpki.log.debug("Task queue %r" % self.task_queue)
- if t not in self.task_queue:
- rpki.log.debug("Adding %r to task queue" % t)
- self.task_queue.append(t)
+ if task not in self.task_queue:
+ rpki.log.debug("Adding %r to task queue" % task)
+ self.task_queue.append(task)
return True
else:
- rpki.log.debug("Task %r was already in the task queue" % t)
+ rpki.log.debug("Task %r was already in the task queue" % task)
return False
def task_next(self):
@@ -343,20 +343,14 @@ class main(object):
Pull next task from the task queue and put it the deferred event
queue (we don't want to run it directly, as that could eventually
blow out our call stack).
-
- Not yet sure what to do here if task queue is empty. For now we
- just return, on the theory that we've nothing left to do until the
- next cron timer fires. This may be a bad assumption, revisit if
- the rest of the code doesn't fit well with this assumption.
"""
- rpki.log.debug("Looking for next task")
try:
self.task_current = self.task_queue.pop(0)
except IndexError:
self.task_current = None
else:
rpki.log.debug("Pulled %r from task queue" % self.task_current)
- rpki.async.defer(self.task_current.run)
+ rpki.async.defer(self.task_current)
def task_run(self):
"""
@@ -365,15 +359,6 @@ class main(object):
if self.task_current is None:
self.task_next()
- def cron_done(self, cb):
- """
- Completion handler for timer-driven cron.
- """
- self.sql.sweep()
- self.cron_timeout = None
- rpki.log.info("Finished cron run")
- cb()
-
def cron(self, cb = None):
"""
Periodic tasks.
@@ -385,9 +370,17 @@ class main(object):
rpki.log.debug("Starting cron run")
+ def done():
+ self.sql.sweep()
+ self.cron_timeout = None
+ rpki.log.info("Finished cron run started at %s" % now)
+ if cb is not None:
+ cb()
+
+ completion = rpki.rpkid_tasks.CompletionHandler(done)
for s in rpki.left_right.self_elt.sql_fetch_all(self):
- self.task_add(s.cron)
- self.task_add(self.cron_done, cb)
+ s.schedule_cron_tasks(completion)
+ nothing_queued = completion.count == 0
assert self.use_internal_cron or self.cron_timeout is None
@@ -407,6 +400,8 @@ class main(object):
elif self.use_internal_cron:
rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
+ if nothing_queued:
+ done()
def cronjob_handler(self, query, path, cb):
"""
@@ -424,44 +419,6 @@ class main(object):
rpki.log.debug("Starting externally triggered cron")
self.cron(done)
-class task(object):
- """
- Scheduler task object.
- """
-
- def __init__(self, gctx, handler, cb = None, description = None):
- self.gctx = gctx
- self.handler = handler
- self.cb = cb
- self.description = description
-
- def __cmp__(self, other):
- return cmp(self.handler, other.handler)
-
- def __hash__(self):
- return self.handler.__hash__()
-
- def __repr__(self):
- if self.description is None:
- return repr(self.handler)
- else:
- return "<%r: %s>" % (self.handler, self.description)
-
- def done(self):
- """
- Completion handler for task.
- """
- if self.cb is not None:
- self.cb()
- self.gctx.task_next()
-
- def run(self):
- """
- Run this task when called via the deferred event system.
- """
- rpki.log.debug("Running task %r" % self)
- self.handler(self.done)
-
class ca_obj(rpki.sql.sql_persistent):
"""
Internal CA object.