aboutsummaryrefslogtreecommitdiff
path: root/rpki
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-10-23 21:42:06 +0000
committerRob Austein <sra@hactrn.net>2015-10-23 21:42:06 +0000
commit07dfb053fc4602c9be7927a6259ae07074cfbf4c (patch)
treeefe7cd7c0dac6d06890a4c90653af055a50ad49e /rpki
parentd97d997b135005d7c71d07e3befaef71789b8b06 (diff)
Task system now working with Tornado. Two new problems: some kind of
UTF-8 whining on what are supposed to be binary fields that's probably the result of a MySQL upgrade, and CMS Replay exceptions due to the pseudo-random order in which HTTP client connections run in Tornado. The UTF-8 mess is probably a good reason to change over to Django's native binary field type, which we were going to want to do anyway. The CMS Replay problem is not Tornado's fault: we probably would have seen it in the old code were it not for an accidental side effect of a long-since-abandoned attempt to use persistent HTTP connections. The fix is probably to serialize requests to a particular host using use a tornaodo.queue.Queue() object, or something like that. svn path=/branches/tk705/; revision=6143
Diffstat (limited to 'rpki')
-rw-r--r--rpki/rpkid.py227
-rw-r--r--rpki/rpkid_tasks.py111
-rw-r--r--rpki/rpkidb/models.py30
3 files changed, 191 insertions, 177 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index 2258f606..37c02ab7 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -29,6 +29,7 @@ import argparse
import tornado.gen
import tornado.web
+import tornado.locks
import tornado.ioloop
import tornado.httputil
import tornado.httpclient
@@ -65,8 +66,9 @@ class main(object):
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
- self.task_current = None
+
self.task_queue = []
+ self.task_event = tornado.locks.Event()
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("-c", "--config",
@@ -143,6 +145,9 @@ class main(object):
logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay)
tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop)
+ logger.debug("Scheduling task loop")
+ tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop)
+
rpkid = self
class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223
@@ -171,6 +176,125 @@ class main(object):
tornado.ioloop.IOLoop.current().start()
+ def task_add(self, tasks):
+ """
+ Add zero or more tasks to the task queue.
+ """
+
+ for task in tasks:
+ if task in self.task_queue:
+ logger.debug("Task %r already queued", task)
+ else:
+ logger.debug("Adding %r to task queue", task)
+ self.task_queue.append(task)
+
+ def task_run(self):
+ """
+ Kick the task loop to notice recently added tasks.
+ """
+
+ self.task_event.set()
+
+ @tornado.gen.coroutine
+ def task_loop(self):
+ """
+ Asynchronous infinite loop to run background tasks.
+
+ This code is a bit finicky, because it's managing a collection of
+ Future objects which are running independently of the control flow
+ here, and the wave function doesn't collapse until we do a yield.
+
+ So we keep this brutally simple and don't try to hide too much of
+ it in the AbstractTask class. For similar reasons, AbstractTask
+ sets aside a .future instance variable for this method's use.
+ """
+
+ logger.debug("Starting task loop")
+ task_event_future = None
+
+ while True:
+ while None in self.task_queue:
+ self.task_queue.remove(None)
+
+ futures = []
+ for task in self.task_queue:
+ if task.future is None:
+ task.future = task.start()
+ futures.append(task.future)
+ if task_event_future is None:
+ task_event_future = self.task_event.wait()
+ futures.append(task_event_future)
+ iterator = tornado.gen.WaitIterator(*futures)
+
+ while not iterator.done():
+ yield iterator.next()
+ if iterator.current_future is task_event_future:
+ self.task_event.clear()
+ task_event_future = None
+ break
+ else:
+ task = self.task_queue[iterator.current_index]
+ task.future = None
+ waiting = task.waiting()
+ if not waiting:
+ self.task_queue[iterator.current_index] = None
+ for task in self.task_queue:
+ if task is not None and not task.runnable.is_set():
+ logger.debug("Reenabling task %r", task)
+ task.runnable.set()
+ if waiting:
+ break
+
+ @tornado.gen.coroutine
+ def cron_loop(self):
+ """
+ Asynchronous infinite loop to drive cron cycle.
+ """
+
+ logger.debug("cron_loop(): Starting")
+ assert self.use_internal_cron
+ logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay)
+ yield tornado.gen.sleep(self.initial_delay)
+ while True:
+ logger.debug("cron_loop(): Running")
+ yield self.cron_run()
+ logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period)
+ yield tornado.gen.sleep(self.cron_period)
+
+ @tornado.gen.coroutine
+ def cron_run(self):
+ """
+ Schedule periodic tasks.
+ """
+
+ now = rpki.sundial.now()
+ logger.debug("Starting cron run")
+ try:
+ tenants = rpki.rpkidb.models.Tenant.objects.all()
+ except:
+ logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
+ else:
+ tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self))
+ self.task_add(tasks)
+ futures = [task.wait() for task in tasks]
+ self.task_run()
+ yield futures
+ logger.info("Finished cron run started at %s", now)
+
+ @tornado.gen.coroutine
+ def cronjob_handler(self, handler):
+ """
+ External trigger to schedule periodic tasks. Obsolete for
+ produciton use, but portions of the test framework still use this.
+ """
+
+ if self.use_internal_cron:
+ handler.set_status(500, "Running cron internally")
+ else:
+ logger.debug("Starting externally triggered cron")
+ yield self.cron()
+ handler.set_status(200)
+ handler.finish()
@staticmethod
def _compose_left_right_query():
@@ -181,7 +305,6 @@ class main(object):
return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "query", version = rpki.left_right.version)
-
@tornado.gen.coroutine
def irdb_query(self, q_msg):
"""
@@ -223,7 +346,6 @@ class main(object):
raise tornado.gen.Return(r_msg)
-
@tornado.gen.coroutine
def irdb_query_child_resources(self, tenant_handle, child_handle):
"""
@@ -246,7 +368,6 @@ class main(object):
raise tornado.gen.Return(bag)
-
@tornado.gen.coroutine
def irdb_query_roa_requests(self, tenant_handle):
"""
@@ -258,7 +379,6 @@ class main(object):
r_msg = yield self.irdb_query(q_msg)
raise tornado.gen.Return(r_msg)
-
@tornado.gen.coroutine
def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles):
"""
@@ -283,7 +403,6 @@ class main(object):
r_msg = yield self.irdb_query(q_msg)
raise tornado.gen.Return(r_msg)
-
@property
def left_right_models(self):
"""
@@ -302,7 +421,6 @@ class main(object):
rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
return self._left_right_models
-
@property
def left_right_trivial_handlers(self):
"""
@@ -317,7 +435,6 @@ class main(object):
rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
return self._left_right_trivial_handlers
-
def handle_list_published_objects(self, q_pdu, r_msg):
"""
<list_published_objects/> server.
@@ -348,7 +465,6 @@ class main(object):
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = c.uri, **kw).text = c.cert.get_Base64()
-
def handle_list_received_resources(self, q_pdu, r_msg):
"""
<list_received_resources/> server.
@@ -375,7 +491,6 @@ class main(object):
if msg_tag is not None:
r_pdu.set("tag", msg_tag)
-
@tornado.gen.coroutine
def left_right_handler(self, handler):
"""
@@ -388,7 +503,7 @@ class main(object):
if content_type not in rpki.left_right.allowed_content_types:
handler.set_status(415, "No handler for Content-Type %s" % content_type)
handler.finish()
- raise tornado.gen.Return
+ return
handler.set_header("Content-Type", rpki.left_right.content_type)
@@ -460,7 +575,6 @@ class main(object):
handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
handler.finish()
-
@tornado.gen.coroutine
def up_down_handler(self, handler, tenant_handle, child_handle):
"""
@@ -473,7 +587,7 @@ class main(object):
if content_type not in rpki.up_down.allowed_content_types:
handler.set_status(415, "No handler for Content-Type %s" % content_type)
handler.finish()
- raise tornado.gen.Return
+ return
try:
child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
@@ -494,93 +608,6 @@ class main(object):
handler.finish()
- def task_add(self, task):
- """
- Add a task to the scheduler task queue, unless it's already queued.
- """
-
- if task not in self.task_queue:
- logger.debug("Adding %r to task queue", task)
- self.task_queue.append(task)
- return True
- else:
- logger.debug("Task %r was already in the task queue", task)
- return False
-
-
- def task_next(self):
- """
- Schedule next task in the queue to be run.
- """
-
- try:
- self.task_current = self.task_queue.pop(0)
- except IndexError:
- self.task_current = None
- else:
- tornado.ioloop.IOLoop.current().add_callback(self.task_current)
-
-
- def task_run(self):
- """
- Schedule first queued task unless a task is running already.
- """
-
- if self.task_current is None:
- self.task_next()
-
-
- @tornado.gen.coroutine
- def cron_loop(self):
- """
- Asynchronous infinite loop to drive cron cycle.
- """
-
- assert self.use_internal_cron
- yield tornado.gen.sleep(self.initial_delay)
- while True:
- yield self.cron_run()
- yield tornado.gen.sleep(self.cron_period)
-
-
- @tornado.gen.coroutine
- def cron_run(self):
- """
- Periodic tasks.
- """
-
- now = rpki.sundial.now()
- logger.debug("Starting cron run")
- futures = []
- try:
- tenants = rpki.rpkidb.models.Tenant.objects.all()
- except:
- logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
- else:
- for tenant in tenants:
- futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self))
- if futures:
- yield futures
- logger.info("Finished cron run started at %s", now)
-
-
- @tornado.gen.coroutine
- def cronjob_handler(self, handler):
- """
- External trigger for periodic tasks. This is somewhat obsolete
- now that we have internal timers, but the test framework still
- uses it.
- """
-
- if self.use_internal_cron:
- handler.set_status(500, "Running cron internally")
- else:
- logger.debug("Starting externally triggered cron")
- yield self.cron()
- handler.set_status(200)
- handler.finish()
-
-
class publication_queue(object):
"""
Utility to simplify publication from within rpkid.
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index edca9ace..b6713447 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -54,8 +54,7 @@ def queue_task(cls):
class AbstractTask(object):
"""
- Abstract base class for rpkid scheduler task objects. This just
- handles the scheduler hooks, real work starts in self.start.
+ Abstract base class for rpkid scheduler task objects.
"""
## @var timeslice
@@ -68,58 +67,68 @@ class AbstractTask(object):
self.rpkid = rpkid
self.tenant = tenant
self.description = description
- self.resumed = tornado.locks.Condition()
- self.completed = tornado.locks.Condition()
+ self.runnable = tornado.locks.Event()
+ self.done_this = None
+ self.done_next = None
self.due_date = None
self.started = False
+ self.runnable.set()
self.clear()
+ # This field belongs to the rpkid task_loop(), don't touch.
+ self.future = None
+
def __repr__(self):
return rpki.log.log_repr(self, self.description)
- def exit(self):
- logger.debug("%r: Exiting", self)
- self.due_date = None
- self.started = False
- self.clear()
- self.completed.notify_all()
- self.rpkid.task_next()
-
- @tornado.gen.coroutine
- def postpone(self):
- logger.debug("%r: Postponed", self)
- self.due_date = None
- self.rpkid.task_add(self)
- self.rpkid.task_next()
- yield self.resumed.wait()
-
@tornado.gen.coroutine
- def __call__(self):
+ def start(self):
try:
+ logger.debug("%r: Starting", self)
self.due_date = rpki.sundial.now() + self.timeslice
- if self.started:
- logger.debug("%r: Resuming", self)
- self.resumed.notify()
- else:
- logger.debug("%r: Starting", self)
- self.clear()
- self.started = True
- yield self.start()
+ self.clear()
+ self.started = True
+ yield self.main()
except:
logger.exception("%r: Unhandled exception", self)
- self.exit()
- #
- # Unclear whether we should re-raise the exception here or not,
- # but re-raising it is probably safer until we know for sure.
- #
- raise
+ #raise
+ finally:
+ logger.debug("%r: Exiting", self)
+ self.due_date = None
+ self.started = False
+ self.clear()
+ if self.done_this is not None:
+ self.done_this.notify_all()
+ self.done_this = self.done_next
+ self.done_next = None
+
+ def wait(self):
+ done = "done_next" if self.started else "done_this"
+ condition = getattr(self, done)
+ if condition is None:
+ condition = tornado.locks.Condition()
+ setattr(self, done, condition)
+ future = condition.wait()
+ return future
+
+ def waiting(self):
+ return self.done_this is not None
+
+ @tornado.gen.coroutine
+ def postpone(self):
+ logger.debug("%r: Postponing", self)
+ self.due_date = None
+ self.runnable.clear()
+ yield self.runnable.wait()
+ logger.debug("%r: Resuming", self)
+ self.due_date = rpki.sundial.now() + self.timeslice
@property
def overdue(self):
return rpki.sundial.now() > self.due_date
@tornado.gen.coroutine
- def start(self):
+ def main(self):
raise NotImplementedError
def clear(self):
@@ -134,7 +143,7 @@ class PollParentTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Polling parents", self)
for parent in self.tenant.parents.all():
@@ -166,8 +175,6 @@ class PollParentTask(AbstractTask):
logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name)
yield ca.destroy(parent)
- self.exit()
-
@queue_task
class UpdateChildrenTask(AbstractTask):
@@ -178,7 +185,7 @@ class UpdateChildrenTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating children", self)
now = rpki.sundial.now()
rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin)
@@ -232,8 +239,6 @@ class UpdateChildrenTask(AbstractTask):
except:
logger.exception("%r: Couldn't publish, skipping", self)
- self.exit()
-
@queue_task
class UpdateROAsTask(AbstractTask):
@@ -246,14 +251,14 @@ class UpdateROAsTask(AbstractTask):
self.ca_details = None
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating ROAs", self)
try:
r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
except:
logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
- raise tornado.gen.Return
+ return
logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
@@ -312,8 +317,6 @@ class UpdateROAsTask(AbstractTask):
yield self.publish()
- self.exit()
-
@tornado.gen.coroutine
def publish(self):
if not self.publisher.empty():
@@ -339,7 +342,7 @@ class UpdateGhostbustersTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating Ghostbuster records", self)
parent_handles = set(p.parent_handle for p in self.tenant.parents.all())
@@ -394,8 +397,6 @@ class UpdateGhostbustersTask(AbstractTask):
except:
logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle)
- self.exit()
-
@queue_task
class UpdateEECertificatesTask(AbstractTask):
@@ -407,7 +408,7 @@ class UpdateEECertificatesTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating EE certificates", self)
try:
@@ -473,8 +474,6 @@ class UpdateEECertificatesTask(AbstractTask):
except:
logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle)
- self.exit()
-
@queue_task
class RegenerateCRLsAndManifestsTask(AbstractTask):
@@ -490,7 +489,7 @@ class RegenerateCRLsAndManifestsTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Regenerating CRLs and manifests", self)
try:
@@ -516,8 +515,6 @@ class RegenerateCRLsAndManifestsTask(AbstractTask):
except:
logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self)
- self.exit()
-
@queue_task
class CheckFailedPublication(AbstractTask):
@@ -527,7 +524,7 @@ class CheckFailedPublication(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Checking for failed publication actions", self)
try:
@@ -538,5 +535,3 @@ class CheckFailedPublication(AbstractTask):
except:
logger.exception("%r: Couldn't run failed publications, skipping", self)
-
- self.exit()
diff --git a/rpki/rpkidb/models.py b/rpki/rpkidb/models.py
index 2693064a..852c8957 100644
--- a/rpki/rpkidb/models.py
+++ b/rpki/rpkidb/models.py
@@ -334,27 +334,19 @@ class Tenant(models.Model):
@tornado.gen.coroutine
def serve_run_now(self, rpkid):
logger.debug("Forced immediate run of periodic actions for tenant %s[%r]", self.tenant_handle, self)
- futures = [condition.wait() for condition in self.schedule_cron_tasks(rpkid)]
+ tasks = self.cron_tasks(rpkid)
+ rpkid.task_add(tasks)
+ futures = [task.wait() for task in tasks]
rpkid.task_run()
- logger.debug("serve_run_now() futures: %r", futures)
- assert futures
- try:
- yield futures
- except:
- logger.exception("serve_run_now() failed")
- raise
- else:
- logger.debug("serve_run_now() done")
+ yield futures
- def schedule_cron_tasks(self, rpkid):
+ def cron_tasks(self, rpkid):
try:
- tasks = self.cron_tasks
+ return self._cron_tasks
except AttributeError:
- tasks = self.cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes)
- for task in tasks:
- rpkid.task_add(task)
- yield task.completed # Plain old Python generator yield, this is not a coroutine
+ self._cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes)
+ return self._cron_tasks
def find_covering_ca_details(self, resources):
@@ -451,7 +443,7 @@ class Repository(models.Model):
"""
if len(q_msg) == 0:
- raise tornado.gen.Return
+ return
for q_pdu in q_msg:
logger.info("Sending %r to pubd", q_pdu)
@@ -781,7 +773,7 @@ class CA(models.Model):
logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying",
class_name, parent.tenant.tenant_handle, parent.parent_handle)
yield self.rekey(rpkid)
- raise tornado.gen.Return
+ return
for ca_detail in ca_details:
@@ -1199,7 +1191,7 @@ class CADetail(models.Model):
if self.state == "pending":
yield self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url)
- raise tornado.gen.Return
+ return
validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter()