From 5d6efe28e2560d4a6b7c752e57f6b667be34bcdc Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Thu, 25 Feb 2016 19:30:01 +0000 Subject: Major simplifcation of rpkid's internal tasking system. svn path=/branches/tk705/; revision=6289 --- rpki/rpkid.py | 97 +++++++++++++++-------------------------------------------- 1 file changed, 24 insertions(+), 73 deletions(-) (limited to 'rpki/rpkid.py') diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 539946a4..641b5c2c 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -33,6 +33,7 @@ import tornado.gen import tornado.web import tornado.locks import tornado.ioloop +import tornado.queues import tornado.httputil import tornado.httpclient import tornado.httpserver @@ -69,8 +70,8 @@ class main(object): self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None - self.task_queue = [] - self.task_event = tornado.locks.Event() + self.task_queue = tornado.queues.Queue() + self.task_ready = set() self.http_client_serialize = weakref.WeakValueDictionary() @@ -182,74 +183,35 @@ class main(object): tornado.ioloop.IOLoop.current().start() - def task_add(self, tasks): + def task_add(self, *tasks): """ - Add zero or more tasks to the task queue. + Add tasks to the task queue. """ for task in tasks: - if task in self.task_queue: + if task in self.task_ready: logger.debug("Task %r already queued", task) else: logger.debug("Adding %r to task queue", task) - self.task_queue.append(task) - - def task_run(self): - """ - Kick the task loop to notice recently added tasks. - """ - - self.task_event.set() + self.task_queue.put(task) + self.task_ready.add(task) @tornado.gen.coroutine def task_loop(self): """ Asynchronous infinite loop to run background tasks. - - This code is a bit finicky, because it's managing a collection of - Future objects which are running independently of the control flow - here, and the wave function doesn't collapse until we do a yield. - - So we keep this brutally simple and don't try to hide too much of - it in the AbstractTask class. For similar reasons, AbstractTask - sets aside a .future instance variable for this method's use. """ logger.debug("Starting task loop") - task_event_future = None while True: - while None in self.task_queue: - self.task_queue.remove(None) - - futures = [] - for task in self.task_queue: - if task.future is None: - task.future = task.start() - futures.append(task.future) - if task_event_future is None: - task_event_future = self.task_event.wait() - futures.append(task_event_future) - iterator = tornado.gen.WaitIterator(*futures) - - while not iterator.done(): - yield iterator.next() - if iterator.current_future is task_event_future: - self.task_event.clear() - task_event_future = None - break - else: - task = self.task_queue[iterator.current_index] - task.future = None - waiting = task.waiting() - if not waiting: - self.task_queue[iterator.current_index] = None - for task in self.task_queue: - if task is not None and not task.runnable.is_set(): - logger.debug("Reenabling task %r", task) - task.runnable.set() - if waiting: - break + task = None + try: + task = yield self.task_queue.get() + self.task_ready.discard(task) + yield task.start() + except: + logger.exception("Unhandled exception from %r", task) @tornado.gen.coroutine def cron_loop(self): @@ -263,31 +225,20 @@ class main(object): yield tornado.gen.sleep(self.initial_delay) while True: logger.debug("cron_loop(): Running") - yield self.cron_run(wait = False) + try: + self.cron_run() + except: + logger.exception("Error queuing cron tasks") logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) yield tornado.gen.sleep(self.cron_period) - @tornado.gen.coroutine - def cron_run(self, wait): + def cron_run(self): """ - Schedule periodic tasks and wait for them to finish. + Schedule periodic tasks. """ - now = rpki.sundial.now() - logger.debug("Starting cron run") - try: - tenants = rpki.rpkidb.models.Tenant.objects.all() - except: - logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") - else: - tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self)) - self.task_add(tasks) - if wait: - futures = [task.wait() for task in tasks] - self.task_run() - if wait: - yield futures - logger.info("Finished cron run started at %s", now) + for tenant in rpki.rpkidb.models.Tenant.objects.all(): + self.task_add(*tenant.cron_tasks(self)) @tornado.gen.coroutine def cronjob_handler(self, handler): @@ -300,7 +251,7 @@ class main(object): handler.set_status(500, "Running cron internally") else: logger.debug("Starting externally triggered cron") - yield self.cron_run() + self.cron_run() handler.set_status(200) handler.finish() -- cgit v1.2.3