diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-23 21:42:06 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-23 21:42:06 +0000 |
commit | 07dfb053fc4602c9be7927a6259ae07074cfbf4c (patch) | |
tree | efe7cd7c0dac6d06890a4c90653af055a50ad49e /rpki | |
parent | d97d997b135005d7c71d07e3befaef71789b8b06 (diff) |
Task system now working with Tornado. Two new problems: some kind of
UTF-8 whining on what are supposed to be binary fields that's probably
the result of a MySQL upgrade, and CMS Replay exceptions due to the
pseudo-random order in which HTTP client connections run in Tornado.
The UTF-8 mess is probably a good reason to change over to Django's
native binary field type, which we were going to want to do anyway.
The CMS Replay problem is not Tornado's fault: we probably would have
seen it in the old code were it not for an accidental side effect of a
long-since-abandoned attempt to use persistent HTTP connections. The
fix is probably to serialize requests to a particular host using use a
tornaodo.queue.Queue() object, or something like that.
svn path=/branches/tk705/; revision=6143
Diffstat (limited to 'rpki')
-rw-r--r-- | rpki/rpkid.py | 227 | ||||
-rw-r--r-- | rpki/rpkid_tasks.py | 111 | ||||
-rw-r--r-- | rpki/rpkidb/models.py | 30 |
3 files changed, 191 insertions, 177 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 2258f606..37c02ab7 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -29,6 +29,7 @@ import argparse import tornado.gen import tornado.web +import tornado.locks import tornado.ioloop import tornado.httputil import tornado.httpclient @@ -65,8 +66,9 @@ class main(object): self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None - self.task_current = None + self.task_queue = [] + self.task_event = tornado.locks.Event() parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("-c", "--config", @@ -143,6 +145,9 @@ class main(object): logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) + logger.debug("Scheduling task loop") + tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop) + rpkid = self class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 @@ -171,6 +176,125 @@ class main(object): tornado.ioloop.IOLoop.current().start() + def task_add(self, tasks): + """ + Add zero or more tasks to the task queue. + """ + + for task in tasks: + if task in self.task_queue: + logger.debug("Task %r already queued", task) + else: + logger.debug("Adding %r to task queue", task) + self.task_queue.append(task) + + def task_run(self): + """ + Kick the task loop to notice recently added tasks. + """ + + self.task_event.set() + + @tornado.gen.coroutine + def task_loop(self): + """ + Asynchronous infinite loop to run background tasks. + + This code is a bit finicky, because it's managing a collection of + Future objects which are running independently of the control flow + here, and the wave function doesn't collapse until we do a yield. + + So we keep this brutally simple and don't try to hide too much of + it in the AbstractTask class. For similar reasons, AbstractTask + sets aside a .future instance variable for this method's use. + """ + + logger.debug("Starting task loop") + task_event_future = None + + while True: + while None in self.task_queue: + self.task_queue.remove(None) + + futures = [] + for task in self.task_queue: + if task.future is None: + task.future = task.start() + futures.append(task.future) + if task_event_future is None: + task_event_future = self.task_event.wait() + futures.append(task_event_future) + iterator = tornado.gen.WaitIterator(*futures) + + while not iterator.done(): + yield iterator.next() + if iterator.current_future is task_event_future: + self.task_event.clear() + task_event_future = None + break + else: + task = self.task_queue[iterator.current_index] + task.future = None + waiting = task.waiting() + if not waiting: + self.task_queue[iterator.current_index] = None + for task in self.task_queue: + if task is not None and not task.runnable.is_set(): + logger.debug("Reenabling task %r", task) + task.runnable.set() + if waiting: + break + + @tornado.gen.coroutine + def cron_loop(self): + """ + Asynchronous infinite loop to drive cron cycle. + """ + + logger.debug("cron_loop(): Starting") + assert self.use_internal_cron + logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay) + yield tornado.gen.sleep(self.initial_delay) + while True: + logger.debug("cron_loop(): Running") + yield self.cron_run() + logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) + yield tornado.gen.sleep(self.cron_period) + + @tornado.gen.coroutine + def cron_run(self): + """ + Schedule periodic tasks. + """ + + now = rpki.sundial.now() + logger.debug("Starting cron run") + try: + tenants = rpki.rpkidb.models.Tenant.objects.all() + except: + logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") + else: + tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self)) + self.task_add(tasks) + futures = [task.wait() for task in tasks] + self.task_run() + yield futures + logger.info("Finished cron run started at %s", now) + + @tornado.gen.coroutine + def cronjob_handler(self, handler): + """ + External trigger to schedule periodic tasks. Obsolete for + produciton use, but portions of the test framework still use this. + """ + + if self.use_internal_cron: + handler.set_status(500, "Running cron internally") + else: + logger.debug("Starting externally triggered cron") + yield self.cron() + handler.set_status(200) + handler.finish() @staticmethod def _compose_left_right_query(): @@ -181,7 +305,6 @@ class main(object): return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, type = "query", version = rpki.left_right.version) - @tornado.gen.coroutine def irdb_query(self, q_msg): """ @@ -223,7 +346,6 @@ class main(object): raise tornado.gen.Return(r_msg) - @tornado.gen.coroutine def irdb_query_child_resources(self, tenant_handle, child_handle): """ @@ -246,7 +368,6 @@ class main(object): raise tornado.gen.Return(bag) - @tornado.gen.coroutine def irdb_query_roa_requests(self, tenant_handle): """ @@ -258,7 +379,6 @@ class main(object): r_msg = yield self.irdb_query(q_msg) raise tornado.gen.Return(r_msg) - @tornado.gen.coroutine def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): """ @@ -283,7 +403,6 @@ class main(object): r_msg = yield self.irdb_query(q_msg) raise tornado.gen.Return(r_msg) - @property def left_right_models(self): """ @@ -302,7 +421,6 @@ class main(object): rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } return self._left_right_models - @property def left_right_trivial_handlers(self): """ @@ -317,7 +435,6 @@ class main(object): rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } return self._left_right_trivial_handlers - def handle_list_published_objects(self, q_pdu, r_msg): """ <list_published_objects/> server. @@ -348,7 +465,6 @@ class main(object): SubElement(r_msg, rpki.left_right.tag_list_published_objects, uri = c.uri, **kw).text = c.cert.get_Base64() - def handle_list_received_resources(self, q_pdu, r_msg): """ <list_received_resources/> server. @@ -375,7 +491,6 @@ class main(object): if msg_tag is not None: r_pdu.set("tag", msg_tag) - @tornado.gen.coroutine def left_right_handler(self, handler): """ @@ -388,7 +503,7 @@ class main(object): if content_type not in rpki.left_right.allowed_content_types: handler.set_status(415, "No handler for Content-Type %s" % content_type) handler.finish() - raise tornado.gen.Return + return handler.set_header("Content-Type", rpki.left_right.content_type) @@ -460,7 +575,6 @@ class main(object): handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) handler.finish() - @tornado.gen.coroutine def up_down_handler(self, handler, tenant_handle, child_handle): """ @@ -473,7 +587,7 @@ class main(object): if content_type not in rpki.up_down.allowed_content_types: handler.set_status(415, "No handler for Content-Type %s" % content_type) handler.finish() - raise tornado.gen.Return + return try: child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) @@ -494,93 +608,6 @@ class main(object): handler.finish() - def task_add(self, task): - """ - Add a task to the scheduler task queue, unless it's already queued. - """ - - if task not in self.task_queue: - logger.debug("Adding %r to task queue", task) - self.task_queue.append(task) - return True - else: - logger.debug("Task %r was already in the task queue", task) - return False - - - def task_next(self): - """ - Schedule next task in the queue to be run. - """ - - try: - self.task_current = self.task_queue.pop(0) - except IndexError: - self.task_current = None - else: - tornado.ioloop.IOLoop.current().add_callback(self.task_current) - - - def task_run(self): - """ - Schedule first queued task unless a task is running already. - """ - - if self.task_current is None: - self.task_next() - - - @tornado.gen.coroutine - def cron_loop(self): - """ - Asynchronous infinite loop to drive cron cycle. - """ - - assert self.use_internal_cron - yield tornado.gen.sleep(self.initial_delay) - while True: - yield self.cron_run() - yield tornado.gen.sleep(self.cron_period) - - - @tornado.gen.coroutine - def cron_run(self): - """ - Periodic tasks. - """ - - now = rpki.sundial.now() - logger.debug("Starting cron run") - futures = [] - try: - tenants = rpki.rpkidb.models.Tenant.objects.all() - except: - logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") - else: - for tenant in tenants: - futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self)) - if futures: - yield futures - logger.info("Finished cron run started at %s", now) - - - @tornado.gen.coroutine - def cronjob_handler(self, handler): - """ - External trigger for periodic tasks. This is somewhat obsolete - now that we have internal timers, but the test framework still - uses it. - """ - - if self.use_internal_cron: - handler.set_status(500, "Running cron internally") - else: - logger.debug("Starting externally triggered cron") - yield self.cron() - handler.set_status(200) - handler.finish() - - class publication_queue(object): """ Utility to simplify publication from within rpkid. diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index edca9ace..b6713447 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -54,8 +54,7 @@ def queue_task(cls): class AbstractTask(object): """ - Abstract base class for rpkid scheduler task objects. This just - handles the scheduler hooks, real work starts in self.start. + Abstract base class for rpkid scheduler task objects. """ ## @var timeslice @@ -68,58 +67,68 @@ class AbstractTask(object): self.rpkid = rpkid self.tenant = tenant self.description = description - self.resumed = tornado.locks.Condition() - self.completed = tornado.locks.Condition() + self.runnable = tornado.locks.Event() + self.done_this = None + self.done_next = None self.due_date = None self.started = False + self.runnable.set() self.clear() + # This field belongs to the rpkid task_loop(), don't touch. + self.future = None + def __repr__(self): return rpki.log.log_repr(self, self.description) - def exit(self): - logger.debug("%r: Exiting", self) - self.due_date = None - self.started = False - self.clear() - self.completed.notify_all() - self.rpkid.task_next() - - @tornado.gen.coroutine - def postpone(self): - logger.debug("%r: Postponed", self) - self.due_date = None - self.rpkid.task_add(self) - self.rpkid.task_next() - yield self.resumed.wait() - @tornado.gen.coroutine - def __call__(self): + def start(self): try: + logger.debug("%r: Starting", self) self.due_date = rpki.sundial.now() + self.timeslice - if self.started: - logger.debug("%r: Resuming", self) - self.resumed.notify() - else: - logger.debug("%r: Starting", self) - self.clear() - self.started = True - yield self.start() + self.clear() + self.started = True + yield self.main() except: logger.exception("%r: Unhandled exception", self) - self.exit() - # - # Unclear whether we should re-raise the exception here or not, - # but re-raising it is probably safer until we know for sure. - # - raise + #raise + finally: + logger.debug("%r: Exiting", self) + self.due_date = None + self.started = False + self.clear() + if self.done_this is not None: + self.done_this.notify_all() + self.done_this = self.done_next + self.done_next = None + + def wait(self): + done = "done_next" if self.started else "done_this" + condition = getattr(self, done) + if condition is None: + condition = tornado.locks.Condition() + setattr(self, done, condition) + future = condition.wait() + return future + + def waiting(self): + return self.done_this is not None + + @tornado.gen.coroutine + def postpone(self): + logger.debug("%r: Postponing", self) + self.due_date = None + self.runnable.clear() + yield self.runnable.wait() + logger.debug("%r: Resuming", self) + self.due_date = rpki.sundial.now() + self.timeslice @property def overdue(self): return rpki.sundial.now() > self.due_date @tornado.gen.coroutine - def start(self): + def main(self): raise NotImplementedError def clear(self): @@ -134,7 +143,7 @@ class PollParentTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Polling parents", self) for parent in self.tenant.parents.all(): @@ -166,8 +175,6 @@ class PollParentTask(AbstractTask): logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name) yield ca.destroy(parent) - self.exit() - @queue_task class UpdateChildrenTask(AbstractTask): @@ -178,7 +185,7 @@ class UpdateChildrenTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating children", self) now = rpki.sundial.now() rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin) @@ -232,8 +239,6 @@ class UpdateChildrenTask(AbstractTask): except: logger.exception("%r: Couldn't publish, skipping", self) - self.exit() - @queue_task class UpdateROAsTask(AbstractTask): @@ -246,14 +251,14 @@ class UpdateROAsTask(AbstractTask): self.ca_details = None @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating ROAs", self) try: r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) except: logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) - raise tornado.gen.Return + return logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) @@ -312,8 +317,6 @@ class UpdateROAsTask(AbstractTask): yield self.publish() - self.exit() - @tornado.gen.coroutine def publish(self): if not self.publisher.empty(): @@ -339,7 +342,7 @@ class UpdateGhostbustersTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating Ghostbuster records", self) parent_handles = set(p.parent_handle for p in self.tenant.parents.all()) @@ -394,8 +397,6 @@ class UpdateGhostbustersTask(AbstractTask): except: logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle) - self.exit() - @queue_task class UpdateEECertificatesTask(AbstractTask): @@ -407,7 +408,7 @@ class UpdateEECertificatesTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating EE certificates", self) try: @@ -473,8 +474,6 @@ class UpdateEECertificatesTask(AbstractTask): except: logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle) - self.exit() - @queue_task class RegenerateCRLsAndManifestsTask(AbstractTask): @@ -490,7 +489,7 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Regenerating CRLs and manifests", self) try: @@ -516,8 +515,6 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): except: logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self) - self.exit() - @queue_task class CheckFailedPublication(AbstractTask): @@ -527,7 +524,7 @@ class CheckFailedPublication(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Checking for failed publication actions", self) try: @@ -538,5 +535,3 @@ class CheckFailedPublication(AbstractTask): except: logger.exception("%r: Couldn't run failed publications, skipping", self) - - self.exit() diff --git a/rpki/rpkidb/models.py b/rpki/rpkidb/models.py index 2693064a..852c8957 100644 --- a/rpki/rpkidb/models.py +++ b/rpki/rpkidb/models.py @@ -334,27 +334,19 @@ class Tenant(models.Model): @tornado.gen.coroutine def serve_run_now(self, rpkid): logger.debug("Forced immediate run of periodic actions for tenant %s[%r]", self.tenant_handle, self) - futures = [condition.wait() for condition in self.schedule_cron_tasks(rpkid)] + tasks = self.cron_tasks(rpkid) + rpkid.task_add(tasks) + futures = [task.wait() for task in tasks] rpkid.task_run() - logger.debug("serve_run_now() futures: %r", futures) - assert futures - try: - yield futures - except: - logger.exception("serve_run_now() failed") - raise - else: - logger.debug("serve_run_now() done") + yield futures - def schedule_cron_tasks(self, rpkid): + def cron_tasks(self, rpkid): try: - tasks = self.cron_tasks + return self._cron_tasks except AttributeError: - tasks = self.cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes) - for task in tasks: - rpkid.task_add(task) - yield task.completed # Plain old Python generator yield, this is not a coroutine + self._cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes) + return self._cron_tasks def find_covering_ca_details(self, resources): @@ -451,7 +443,7 @@ class Repository(models.Model): """ if len(q_msg) == 0: - raise tornado.gen.Return + return for q_pdu in q_msg: logger.info("Sending %r to pubd", q_pdu) @@ -781,7 +773,7 @@ class CA(models.Model): logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", class_name, parent.tenant.tenant_handle, parent.parent_handle) yield self.rekey(rpkid) - raise tornado.gen.Return + return for ca_detail in ca_details: @@ -1199,7 +1191,7 @@ class CADetail(models.Model): if self.state == "pending": yield self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url) - raise tornado.gen.Return + return validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter() |