aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r--rpki/rpkid.py97
1 files changed, 24 insertions, 73 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index 539946a4..641b5c2c 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -33,6 +33,7 @@ import tornado.gen
import tornado.web
import tornado.locks
import tornado.ioloop
+import tornado.queues
import tornado.httputil
import tornado.httpclient
import tornado.httpserver
@@ -69,8 +70,8 @@ class main(object):
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
- self.task_queue = []
- self.task_event = tornado.locks.Event()
+ self.task_queue = tornado.queues.Queue()
+ self.task_ready = set()
self.http_client_serialize = weakref.WeakValueDictionary()
@@ -182,74 +183,35 @@ class main(object):
tornado.ioloop.IOLoop.current().start()
- def task_add(self, tasks):
+ def task_add(self, *tasks):
"""
- Add zero or more tasks to the task queue.
+ Add tasks to the task queue.
"""
for task in tasks:
- if task in self.task_queue:
+ if task in self.task_ready:
logger.debug("Task %r already queued", task)
else:
logger.debug("Adding %r to task queue", task)
- self.task_queue.append(task)
-
- def task_run(self):
- """
- Kick the task loop to notice recently added tasks.
- """
-
- self.task_event.set()
+ self.task_queue.put(task)
+ self.task_ready.add(task)
@tornado.gen.coroutine
def task_loop(self):
"""
Asynchronous infinite loop to run background tasks.
-
- This code is a bit finicky, because it's managing a collection of
- Future objects which are running independently of the control flow
- here, and the wave function doesn't collapse until we do a yield.
-
- So we keep this brutally simple and don't try to hide too much of
- it in the AbstractTask class. For similar reasons, AbstractTask
- sets aside a .future instance variable for this method's use.
"""
logger.debug("Starting task loop")
- task_event_future = None
while True:
- while None in self.task_queue:
- self.task_queue.remove(None)
-
- futures = []
- for task in self.task_queue:
- if task.future is None:
- task.future = task.start()
- futures.append(task.future)
- if task_event_future is None:
- task_event_future = self.task_event.wait()
- futures.append(task_event_future)
- iterator = tornado.gen.WaitIterator(*futures)
-
- while not iterator.done():
- yield iterator.next()
- if iterator.current_future is task_event_future:
- self.task_event.clear()
- task_event_future = None
- break
- else:
- task = self.task_queue[iterator.current_index]
- task.future = None
- waiting = task.waiting()
- if not waiting:
- self.task_queue[iterator.current_index] = None
- for task in self.task_queue:
- if task is not None and not task.runnable.is_set():
- logger.debug("Reenabling task %r", task)
- task.runnable.set()
- if waiting:
- break
+ task = None
+ try:
+ task = yield self.task_queue.get()
+ self.task_ready.discard(task)
+ yield task.start()
+ except:
+ logger.exception("Unhandled exception from %r", task)
@tornado.gen.coroutine
def cron_loop(self):
@@ -263,31 +225,20 @@ class main(object):
yield tornado.gen.sleep(self.initial_delay)
while True:
logger.debug("cron_loop(): Running")
- yield self.cron_run(wait = False)
+ try:
+ self.cron_run()
+ except:
+ logger.exception("Error queuing cron tasks")
logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period)
yield tornado.gen.sleep(self.cron_period)
- @tornado.gen.coroutine
- def cron_run(self, wait):
+ def cron_run(self):
"""
- Schedule periodic tasks and wait for them to finish.
+ Schedule periodic tasks.
"""
- now = rpki.sundial.now()
- logger.debug("Starting cron run")
- try:
- tenants = rpki.rpkidb.models.Tenant.objects.all()
- except:
- logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
- else:
- tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self))
- self.task_add(tasks)
- if wait:
- futures = [task.wait() for task in tasks]
- self.task_run()
- if wait:
- yield futures
- logger.info("Finished cron run started at %s", now)
+ for tenant in rpki.rpkidb.models.Tenant.objects.all():
+ self.task_add(*tenant.cron_tasks(self))
@tornado.gen.coroutine
def cronjob_handler(self, handler):
@@ -300,7 +251,7 @@ class main(object):
handler.set_status(500, "Running cron internally")
else:
logger.debug("Starting externally triggered cron")
- yield self.cron_run()
+ self.cron_run()
handler.set_status(200)
handler.finish()