diff options
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 111 |
1 files changed, 53 insertions, 58 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index edca9ace..b6713447 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -54,8 +54,7 @@ def queue_task(cls): class AbstractTask(object): """ - Abstract base class for rpkid scheduler task objects. This just - handles the scheduler hooks, real work starts in self.start. + Abstract base class for rpkid scheduler task objects. """ ## @var timeslice @@ -68,58 +67,68 @@ class AbstractTask(object): self.rpkid = rpkid self.tenant = tenant self.description = description - self.resumed = tornado.locks.Condition() - self.completed = tornado.locks.Condition() + self.runnable = tornado.locks.Event() + self.done_this = None + self.done_next = None self.due_date = None self.started = False + self.runnable.set() self.clear() + # This field belongs to the rpkid task_loop(), don't touch. + self.future = None + def __repr__(self): return rpki.log.log_repr(self, self.description) - def exit(self): - logger.debug("%r: Exiting", self) - self.due_date = None - self.started = False - self.clear() - self.completed.notify_all() - self.rpkid.task_next() - - @tornado.gen.coroutine - def postpone(self): - logger.debug("%r: Postponed", self) - self.due_date = None - self.rpkid.task_add(self) - self.rpkid.task_next() - yield self.resumed.wait() - @tornado.gen.coroutine - def __call__(self): + def start(self): try: + logger.debug("%r: Starting", self) self.due_date = rpki.sundial.now() + self.timeslice - if self.started: - logger.debug("%r: Resuming", self) - self.resumed.notify() - else: - logger.debug("%r: Starting", self) - self.clear() - self.started = True - yield self.start() + self.clear() + self.started = True + yield self.main() except: logger.exception("%r: Unhandled exception", self) - self.exit() - # - # Unclear whether we should re-raise the exception here or not, - # but re-raising it is probably safer until we know for sure. - # - raise + #raise + finally: + logger.debug("%r: Exiting", self) + self.due_date = None + self.started = False + self.clear() + if self.done_this is not None: + self.done_this.notify_all() + self.done_this = self.done_next + self.done_next = None + + def wait(self): + done = "done_next" if self.started else "done_this" + condition = getattr(self, done) + if condition is None: + condition = tornado.locks.Condition() + setattr(self, done, condition) + future = condition.wait() + return future + + def waiting(self): + return self.done_this is not None + + @tornado.gen.coroutine + def postpone(self): + logger.debug("%r: Postponing", self) + self.due_date = None + self.runnable.clear() + yield self.runnable.wait() + logger.debug("%r: Resuming", self) + self.due_date = rpki.sundial.now() + self.timeslice @property def overdue(self): return rpki.sundial.now() > self.due_date @tornado.gen.coroutine - def start(self): + def main(self): raise NotImplementedError def clear(self): @@ -134,7 +143,7 @@ class PollParentTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Polling parents", self) for parent in self.tenant.parents.all(): @@ -166,8 +175,6 @@ class PollParentTask(AbstractTask): logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name) yield ca.destroy(parent) - self.exit() - @queue_task class UpdateChildrenTask(AbstractTask): @@ -178,7 +185,7 @@ class UpdateChildrenTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating children", self) now = rpki.sundial.now() rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin) @@ -232,8 +239,6 @@ class UpdateChildrenTask(AbstractTask): except: logger.exception("%r: Couldn't publish, skipping", self) - self.exit() - @queue_task class UpdateROAsTask(AbstractTask): @@ -246,14 +251,14 @@ class UpdateROAsTask(AbstractTask): self.ca_details = None @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating ROAs", self) try: r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) except: logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) - raise tornado.gen.Return + return logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) @@ -312,8 +317,6 @@ class UpdateROAsTask(AbstractTask): yield self.publish() - self.exit() - @tornado.gen.coroutine def publish(self): if not self.publisher.empty(): @@ -339,7 +342,7 @@ class UpdateGhostbustersTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating Ghostbuster records", self) parent_handles = set(p.parent_handle for p in self.tenant.parents.all()) @@ -394,8 +397,6 @@ class UpdateGhostbustersTask(AbstractTask): except: logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle) - self.exit() - @queue_task class UpdateEECertificatesTask(AbstractTask): @@ -407,7 +408,7 @@ class UpdateEECertificatesTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Updating EE certificates", self) try: @@ -473,8 +474,6 @@ class UpdateEECertificatesTask(AbstractTask): except: logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle) - self.exit() - @queue_task class RegenerateCRLsAndManifestsTask(AbstractTask): @@ -490,7 +489,7 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Regenerating CRLs and manifests", self) try: @@ -516,8 +515,6 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): except: logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self) - self.exit() - @queue_task class CheckFailedPublication(AbstractTask): @@ -527,7 +524,7 @@ class CheckFailedPublication(AbstractTask): """ @tornado.gen.coroutine - def start(self): + def main(self): logger.debug("%r: Checking for failed publication actions", self) try: @@ -538,5 +535,3 @@ class CheckFailedPublication(AbstractTask): except: logger.exception("%r: Couldn't run failed publications, skipping", self) - - self.exit() |