aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r--rpkid/rpki/rpkid.py149
1 files changed, 111 insertions, 38 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index f3fc38fa..93726512 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -73,6 +73,8 @@ class main(object):
self.foreground = False
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
+ self.task_current = None
+ self.task_queue = []
opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?",
["config=", "debug", "foreground", "help", "profile="])
@@ -323,60 +325,91 @@ class main(object):
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
+ def task_add(self, handler, cb = None, description = None):
+ """
+ Add a task to the scheduler task queue, unless it's already queued.
+ """
+ t = task(self, handler, cb, description)
+ rpki.log.debug("New task %r" % t)
+ rpki.log.debug("Task queue %r" % self.task_queue)
+ if t not in self.task_queue:
+ rpki.log.debug("Adding %r to task queue" % t)
+ self.task_queue.append(t)
+ return True
+ else:
+ rpki.log.debug("Task %r was already in the task queue" % t)
+ return False
+
+ def task_next(self):
+ """
+ Pull next task from the task queue and put it the deferred event
+ queue (we don't want to run it directly, as that could eventually
+ blow out our call stack).
+
+ Not yet sure what to do here if task queue is empty. For now we
+ just return, on the theory that we've nothing left to do until the
+ next cron timer fires. This may be a bad assumption, revisit if
+ the rest of the code doesn't fit well with this assumption.
+ """
+ rpki.log.debug("Looking for next task")
+ try:
+ self.task_current = self.task_queue.pop(0)
+ except IndexError:
+ self.task_current = None
+ else:
+ rpki.log.debug("Pulled %r from task queue" % self.task_current)
+ rpki.async.defer(self.task_current.run)
+
+ def task_run(self):
+ """
+ Run first task on the task queue, unless one is running already.
+ """
+ if self.task_current is None:
+ self.task_next()
+
+ def cron_done(self, cb):
+ """
+ Completion handler for timer-driven cron.
+ """
+ self.sql.sweep()
+ self.cron_timeout = None
+ rpki.log.info("Finished cron run")
+ cb()
+
def cron(self, cb = None):
"""
Periodic tasks.
"""
rpki.log.trace()
+ self.sql.ping()
- def loop(iterator, s):
- self.checkpoint()
- s.cron(iterator)
-
- def done():
- self.sql.sweep()
- self.cron_timeout = None
- rpki.log.info("Finished cron run started at %s" % now)
- if not self.use_internal_cron:
- cb()
-
- def lose(e):
- self.cron_timeout = None
- if self.use_internal_cron:
- rpki.log.traceback()
- else:
- raise
-
- try:
- now = rpki.sundial.now()
+ now = rpki.sundial.now()
- assert self.use_internal_cron or self.cron_timeout is None
+ rpki.log.debug("Starting cron run")
- if self.use_internal_cron:
+ for s in rpki.left_right.self_elt.sql_fetch_all(self):
+ self.task_add(s.cron)
+ self.task_add(self.cron_done, cb)
- if self.cron_timeout is not None and self.cron_timeout < now:
- rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
- self.cron_timeout = None
+ assert self.use_internal_cron or self.cron_timeout is None
- when = now + self.cron_period
- rpki.log.debug("Scheduling next cron run at %s" % when)
- self.cron_timer.set(when)
+ if self.cron_timeout is not None and self.cron_timeout < now:
+ rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
+ self.cron_timeout = None
- if self.cron_timeout is not None:
- rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- return
+ if self.use_internal_cron:
+ when = now + self.cron_period
+ rpki.log.debug("Scheduling next cron run at %s" % when)
+ self.cron_timer.set(when)
- self.sql.ping()
+ if self.cron_timeout is None:
self.checkpoint(self.use_internal_cron)
- rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done)
+ self.task_run()
- except (rpki.async.ExitNow, SystemExit):
- self.cron_timeout = None
- raise
+ elif self.use_internal_cron:
+ rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- except Exception, e:
- lose(e)
def cronjob_handler(self, query, path, cb):
"""
@@ -391,8 +424,47 @@ class main(object):
if self.use_internal_cron:
cb(500, reason = "Running cron internally")
else:
+ rpki.log.debug("Starting externally triggered cron")
self.cron(done)
+class task(object):
+ """
+ Scheduler task object.
+ """
+
+ def __init__(self, gctx, handler, cb = None, description = None):
+ self.gctx = gctx
+ self.handler = handler
+ self.cb = cb
+ self.description = description
+
+ def __cmp__(self, other):
+ return cmp(self.handler, other.handler)
+
+ def __hash__(self):
+ return self.handler.__hash__()
+
+ def __repr__(self):
+ if self.description is None:
+ return repr(self.handler)
+ else:
+ return "<%r: %s>" % (self.handler, self.description)
+
+ def done(self):
+ """
+ Completion handler for task.
+ """
+ if self.cb is not None:
+ self.cb()
+ self.gctx.task_next()
+
+ def run(self):
+ """
+ Run this task when called via the deferred event system.
+ """
+ rpki.log.debug("Running task %r" % self)
+ self.handler(self.done)
+
class ca_obj(rpki.sql.sql_persistent):
"""
Internal CA object.
@@ -1879,6 +1951,7 @@ class publication_queue(object):
def call_pubd(self, cb, eb):
def loop(iterator, rid):
+ rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
def done():
self.clear()