diff options
author | Rob Austein <sra@hactrn.net> | 2016-02-25 19:30:01 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-02-25 19:30:01 +0000 |
commit | 5d6efe28e2560d4a6b7c752e57f6b667be34bcdc (patch) | |
tree | 35758a8056023c6e8cba5557c0b3af381fb920f2 /rpki | |
parent | cbc7f0f9e151af13398e4b3234a826d03bfcb6a9 (diff) |
Major simplifcation of rpkid's internal tasking system.
svn path=/branches/tk705/; revision=6289
Diffstat (limited to 'rpki')
-rw-r--r-- | rpki/rpkid.py | 97 | ||||
-rw-r--r-- | rpki/rpkid_tasks.py | 211 | ||||
-rw-r--r-- | rpki/rpkidb/models.py | 6 |
3 files changed, 117 insertions, 197 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 539946a4..641b5c2c 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -33,6 +33,7 @@ import tornado.gen import tornado.web import tornado.locks import tornado.ioloop +import tornado.queues import tornado.httputil import tornado.httpclient import tornado.httpserver @@ -69,8 +70,8 @@ class main(object): self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None - self.task_queue = [] - self.task_event = tornado.locks.Event() + self.task_queue = tornado.queues.Queue() + self.task_ready = set() self.http_client_serialize = weakref.WeakValueDictionary() @@ -182,74 +183,35 @@ class main(object): tornado.ioloop.IOLoop.current().start() - def task_add(self, tasks): + def task_add(self, *tasks): """ - Add zero or more tasks to the task queue. + Add tasks to the task queue. """ for task in tasks: - if task in self.task_queue: + if task in self.task_ready: logger.debug("Task %r already queued", task) else: logger.debug("Adding %r to task queue", task) - self.task_queue.append(task) - - def task_run(self): - """ - Kick the task loop to notice recently added tasks. - """ - - self.task_event.set() + self.task_queue.put(task) + self.task_ready.add(task) @tornado.gen.coroutine def task_loop(self): """ Asynchronous infinite loop to run background tasks. - - This code is a bit finicky, because it's managing a collection of - Future objects which are running independently of the control flow - here, and the wave function doesn't collapse until we do a yield. - - So we keep this brutally simple and don't try to hide too much of - it in the AbstractTask class. For similar reasons, AbstractTask - sets aside a .future instance variable for this method's use. """ logger.debug("Starting task loop") - task_event_future = None while True: - while None in self.task_queue: - self.task_queue.remove(None) - - futures = [] - for task in self.task_queue: - if task.future is None: - task.future = task.start() - futures.append(task.future) - if task_event_future is None: - task_event_future = self.task_event.wait() - futures.append(task_event_future) - iterator = tornado.gen.WaitIterator(*futures) - - while not iterator.done(): - yield iterator.next() - if iterator.current_future is task_event_future: - self.task_event.clear() - task_event_future = None - break - else: - task = self.task_queue[iterator.current_index] - task.future = None - waiting = task.waiting() - if not waiting: - self.task_queue[iterator.current_index] = None - for task in self.task_queue: - if task is not None and not task.runnable.is_set(): - logger.debug("Reenabling task %r", task) - task.runnable.set() - if waiting: - break + task = None + try: + task = yield self.task_queue.get() + self.task_ready.discard(task) + yield task.start() + except: + logger.exception("Unhandled exception from %r", task) @tornado.gen.coroutine def cron_loop(self): @@ -263,31 +225,20 @@ class main(object): yield tornado.gen.sleep(self.initial_delay) while True: logger.debug("cron_loop(): Running") - yield self.cron_run(wait = False) + try: + self.cron_run() + except: + logger.exception("Error queuing cron tasks") logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) yield tornado.gen.sleep(self.cron_period) - @tornado.gen.coroutine - def cron_run(self, wait): + def cron_run(self): """ - Schedule periodic tasks and wait for them to finish. + Schedule periodic tasks. """ - now = rpki.sundial.now() - logger.debug("Starting cron run") - try: - tenants = rpki.rpkidb.models.Tenant.objects.all() - except: - logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") - else: - tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self)) - self.task_add(tasks) - if wait: - futures = [task.wait() for task in tasks] - self.task_run() - if wait: - yield futures - logger.info("Finished cron run started at %s", now) + for tenant in rpki.rpkidb.models.Tenant.objects.all(): + self.task_add(*tenant.cron_tasks(self)) @tornado.gen.coroutine def cronjob_handler(self, handler): @@ -300,7 +251,7 @@ class main(object): handler.set_status(500, "Running cron internally") else: logger.debug("Starting externally triggered cron") - yield self.cron_run() + self.cron_run() handler.set_status(200) handler.finish() diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 7e24db35..c359a8d6 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -53,6 +53,14 @@ def queue_task(cls): return cls +class PostponeTask(Exception): + """ + Exit a task without finishing it. We use this to signal that a + long-running task wants to yield to the task loop but hasn't yet + run to completion. + """ + + class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. @@ -66,51 +74,47 @@ class AbstractTask(object): #timeslice = rpki.sundial.timedelta(seconds = 15) timeslice = rpki.sundial.timedelta(seconds = 60) - ## @var serialize - # Lock to force prevent more than one task from running at a time. - - serialize = tornado.locks.Lock() - def __init__(self, rpkid, tenant, description = None): self.rpkid = rpkid self.tenant = tenant self.description = description - self.runnable = tornado.locks.Event() self.done_this = None self.done_next = None self.due_date = None self.started = False - self.runnable.set() self.clear() - # This field belongs to the rpkid task_loop(), don't touch. - self.future = None - def __repr__(self): return rpki.log.log_repr(self, self.description) + def reset_due_date(self): + self.due_date = rpki.sundial.now() + self.timeslice + @tornado.gen.coroutine def start(self): try: - yield self.serialize.acquire() logger.debug("%r: Starting", self) - self.due_date = rpki.sundial.now() + self.timeslice + self.reset_due_date() self.clear() self.started = True + postponing = False yield self.main() + except PostponeTask: + postponing = True except: logger.exception("%r: Unhandled exception", self) - #raise finally: - logger.debug("%r: Exiting", self) self.due_date = None self.started = False self.clear() - if self.done_this is not None: - self.done_this.notify_all() - self.done_this = self.done_next - self.done_next = None - self.serialize.release() + if postponing: + logger.debug("%r: Postponing", self) + else: + logger.debug("%r: Exiting", self) + if self.done_this is not None: + self.done_this.notify_all() + self.done_this = self.done_next + self.done_next = None def wait(self): done = "done_next" if self.started else "done_this" @@ -125,27 +129,9 @@ class AbstractTask(object): return self.done_this is not None @tornado.gen.coroutine - def postpone(self): - logger.debug("%r: Postponing", self) - self.due_date = None - self.runnable.clear() - tasks = tuple(task for task in self.rpkid.task_queue if task is not None) - if any(task.runnable.is_set() for task in tasks): - logger.debug("%r: Runable tasks exist, leaving well enough alone", self) - else: - logger.debug("%r: All tasks were postponed, reenabling one picked at random", self) - random.choice(tasks).runnable.set() - try: - self.serialize.release() - yield self.runnable.wait() - finally: - yield self.serialize.acquire() - logger.debug("%r: Resuming", self) - self.due_date = rpki.sundial.now() + self.timeslice - - @property def overdue(self): - return rpki.sundial.now() > self.due_date + yield tornado.gen.moment + raise tornado.gen.Return(len(self.rpkid.task_ready) > 0 and rpki.sundial.now() > self.due_date) @tornado.gen.coroutine def main(self): @@ -325,9 +311,10 @@ class UpdateChildrenTask(AbstractTask): for child in self.tenant.children.all(): try: - if self.overdue: + if (yield self.overdue()): yield publisher.call_pubd() - yield self.postpone() + self.rpkid.task_add(self) + raise PostponeTask child_certs = list(child.child_certs.filter(ca_detail__state = "active")) @@ -382,10 +369,6 @@ class UpdateROAsTask(AbstractTask): Generate or update ROAs for this tenant. """ - def clear(self): - self.publisher = None - self.ca_details = None - # XXX This might need rewriting to avoid race conditions. # # There's a theoretical race condition here if we're chugging away @@ -394,100 +377,88 @@ class UpdateROAsTask(AbstractTask): # fairly low given that we defer CRL and manifest generation until # we're ready to publish, but it's theoretically present. - # Rewritten to conserve memory when postponing. We used to treat - # postponement as just another odd kind of voluntary yield, but - # when working with really big data sets this tends to result in - # extreme memory bloat. So, instead, once we hit the postponement - # limit we publish what we've got, discard all of our transient - # data, and recompute what we need to do when we come back. This - # is a bit less efficient, but should tend to converge towards the - # correct result, with a much smaller memory footprint. - @tornado.gen.coroutine def main(self): logger.debug("%r: Updating ROAs", self) - while True: # Postponement loop - - try: - r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) - except: - logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) - return + try: + r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) + except: + logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) + return - logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) + logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) - roas = {} - seen = set() - orphans = [] - updates = [] - self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201 - self.ca_details = set() # pylint: disable=W0201 - - for roa in self.tenant.roas.all(): - k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6) - if k not in roas: - roas[k] = roa - elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): - orphans.append(roas[k]) - roas[k] = roa - else: - orphans.append(roa) + roas = {} + seen = set() + orphans = [] + updates = [] + publisher = rpki.rpkid.publication_queue(self.rpkid) + ca_details = set() + + for roa in self.tenant.roas.all(): + k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6) + if k not in roas: + roas[k] = roa + elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): + orphans.append(roas[k]) + roas[k] = roa + else: + orphans.append(roa) - for r_pdu in r_msg: - k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) - if k in seen: - logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) + for r_pdu in r_msg: + k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) + if k in seen: + logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) + else: + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + logger.debug("%r: Created new %r", self, roa) else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) - logger.debug("%r: Created new %r", self, roa) - else: - logger.debug("%r: Found existing %r", self, roa) - updates.append(roa) + logger.debug("%r: Found existing %r", self, roa) + updates.append(roa) - r_msg = seen = None + r_msg = seen = None - orphans.extend(roas.itervalues()) + orphans.extend(roas.itervalues()) - roas = None + roas = None - while updates and not self.overdue: - roa = updates.pop(0) - try: - roa.update(publisher = self.publisher) - self.ca_details.add(roa.ca_detail) - except rpki.exceptions.NoCoveringCertForROA: - logger.warning("%r: No covering certificate for %r, skipping", self, roa) - except: - logger.exception("%r: Could not update %r, skipping", self, roa) + postponing = False - if not self.overdue: + while updates and not postponing: + if (yield self.overdue()): + postponing = True break - - roas = seen = orphans = updates = None - yield self.publish() - yield self.postpone() - - for roa in orphans: + roa = updates.pop(0) try: - self.ca_details.add(roa.ca_detail) - roa.revoke(publisher = self.publisher) + roa.update(publisher = publisher) + ca_details.add(roa.ca_detail) + except rpki.exceptions.NoCoveringCertForROA: + logger.warning("%r: No covering certificate for %r, skipping", self, roa) except: - logger.exception("%r: Could not revoke %r", self, roa) + logger.exception("%r: Could not update %r, skipping", self, roa) - yield self.publish() + updates = None - @tornado.gen.coroutine - def publish(self): - if not self.publisher.empty(): - for ca_detail in self.ca_details: + if not postponing: + for roa in orphans: + try: + ca_details.add(roa.ca_detail) + roa.revoke(publisher = publisher) + except: + logger.exception("%r: Could not revoke %r", self, roa) + + if not publisher.empty(): + for ca_detail in ca_details: logger.debug("%r: Generating new CRL and manifest for %r", self, ca_detail) - ca_detail.generate_crl_and_manifest(publisher = self.publisher) - yield self.publisher.call_pubd() - self.ca_details.clear() + ca_detail.generate_crl_and_manifest(publisher = publisher) + yield publisher.call_pubd() + + if postponing: + raise PostponeTask @queue_task diff --git a/rpki/rpkidb/models.py b/rpki/rpkidb/models.py index 6b26a27d..b76c2e9a 100644 --- a/rpki/rpkidb/models.py +++ b/rpki/rpkidb/models.py @@ -378,10 +378,8 @@ class Tenant(models.Model): trace_call_chain() logger.debug("Forced immediate run of periodic actions for %r", self) tasks = self.cron_tasks(rpkid = rpkid) - rpkid.task_add(tasks) - futures = [task.wait() for task in tasks] - rpkid.task_run() - yield futures + rpkid.task_add(*tasks) + yield [task.wait() for task in tasks] def cron_tasks(self, rpkid): |