diff options
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r-- | rpkid/rpki/rpkid.py | 149 |
1 files changed, 111 insertions, 38 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index f3fc38fa..93726512 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -73,6 +73,8 @@ class main(object): self.foreground = False self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None + self.task_current = None + self.task_queue = [] opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?", ["config=", "debug", "foreground", "help", "profile="]) @@ -323,60 +325,91 @@ class main(object): if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive + def task_add(self, handler, cb = None, description = None): + """ + Add a task to the scheduler task queue, unless it's already queued. + """ + t = task(self, handler, cb, description) + rpki.log.debug("New task %r" % t) + rpki.log.debug("Task queue %r" % self.task_queue) + if t not in self.task_queue: + rpki.log.debug("Adding %r to task queue" % t) + self.task_queue.append(t) + return True + else: + rpki.log.debug("Task %r was already in the task queue" % t) + return False + + def task_next(self): + """ + Pull next task from the task queue and put it the deferred event + queue (we don't want to run it directly, as that could eventually + blow out our call stack). + + Not yet sure what to do here if task queue is empty. For now we + just return, on the theory that we've nothing left to do until the + next cron timer fires. This may be a bad assumption, revisit if + the rest of the code doesn't fit well with this assumption. + """ + rpki.log.debug("Looking for next task") + try: + self.task_current = self.task_queue.pop(0) + except IndexError: + self.task_current = None + else: + rpki.log.debug("Pulled %r from task queue" % self.task_current) + rpki.async.defer(self.task_current.run) + + def task_run(self): + """ + Run first task on the task queue, unless one is running already. + """ + if self.task_current is None: + self.task_next() + + def cron_done(self, cb): + """ + Completion handler for timer-driven cron. + """ + self.sql.sweep() + self.cron_timeout = None + rpki.log.info("Finished cron run") + cb() + def cron(self, cb = None): """ Periodic tasks. """ rpki.log.trace() + self.sql.ping() - def loop(iterator, s): - self.checkpoint() - s.cron(iterator) - - def done(): - self.sql.sweep() - self.cron_timeout = None - rpki.log.info("Finished cron run started at %s" % now) - if not self.use_internal_cron: - cb() - - def lose(e): - self.cron_timeout = None - if self.use_internal_cron: - rpki.log.traceback() - else: - raise - - try: - now = rpki.sundial.now() + now = rpki.sundial.now() - assert self.use_internal_cron or self.cron_timeout is None + rpki.log.debug("Starting cron run") - if self.use_internal_cron: + for s in rpki.left_right.self_elt.sql_fetch_all(self): + self.task_add(s.cron) + self.task_add(self.cron_done, cb) - if self.cron_timeout is not None and self.cron_timeout < now: - rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) - self.cron_timeout = None + assert self.use_internal_cron or self.cron_timeout is None - when = now + self.cron_period - rpki.log.debug("Scheduling next cron run at %s" % when) - self.cron_timer.set(when) + if self.cron_timeout is not None and self.cron_timeout < now: + rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) + self.cron_timeout = None - if self.cron_timeout is not None: - rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - return + if self.use_internal_cron: + when = now + self.cron_period + rpki.log.debug("Scheduling next cron run at %s" % when) + self.cron_timer.set(when) - self.sql.ping() + if self.cron_timeout is None: self.checkpoint(self.use_internal_cron) - rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done) + self.task_run() - except (rpki.async.ExitNow, SystemExit): - self.cron_timeout = None - raise + elif self.use_internal_cron: + rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - except Exception, e: - lose(e) def cronjob_handler(self, query, path, cb): """ @@ -391,8 +424,47 @@ class main(object): if self.use_internal_cron: cb(500, reason = "Running cron internally") else: + rpki.log.debug("Starting externally triggered cron") self.cron(done) +class task(object): + """ + Scheduler task object. + """ + + def __init__(self, gctx, handler, cb = None, description = None): + self.gctx = gctx + self.handler = handler + self.cb = cb + self.description = description + + def __cmp__(self, other): + return cmp(self.handler, other.handler) + + def __hash__(self): + return self.handler.__hash__() + + def __repr__(self): + if self.description is None: + return repr(self.handler) + else: + return "<%r: %s>" % (self.handler, self.description) + + def done(self): + """ + Completion handler for task. + """ + if self.cb is not None: + self.cb() + self.gctx.task_next() + + def run(self): + """ + Run this task when called via the deferred event system. + """ + rpki.log.debug("Running task %r" % self) + self.handler(self.done) + class ca_obj(rpki.sql.sql_persistent): """ Internal CA object. @@ -1879,6 +1951,7 @@ class publication_queue(object): def call_pubd(self, cb, eb): def loop(iterator, rid): + rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) def done(): self.clear() |