aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r--rpki/rpkid_tasks.py111
1 files changed, 53 insertions, 58 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index edca9ace..b6713447 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -54,8 +54,7 @@ def queue_task(cls):
class AbstractTask(object):
"""
- Abstract base class for rpkid scheduler task objects. This just
- handles the scheduler hooks, real work starts in self.start.
+ Abstract base class for rpkid scheduler task objects.
"""
## @var timeslice
@@ -68,58 +67,68 @@ class AbstractTask(object):
self.rpkid = rpkid
self.tenant = tenant
self.description = description
- self.resumed = tornado.locks.Condition()
- self.completed = tornado.locks.Condition()
+ self.runnable = tornado.locks.Event()
+ self.done_this = None
+ self.done_next = None
self.due_date = None
self.started = False
+ self.runnable.set()
self.clear()
+ # This field belongs to the rpkid task_loop(), don't touch.
+ self.future = None
+
def __repr__(self):
return rpki.log.log_repr(self, self.description)
- def exit(self):
- logger.debug("%r: Exiting", self)
- self.due_date = None
- self.started = False
- self.clear()
- self.completed.notify_all()
- self.rpkid.task_next()
-
- @tornado.gen.coroutine
- def postpone(self):
- logger.debug("%r: Postponed", self)
- self.due_date = None
- self.rpkid.task_add(self)
- self.rpkid.task_next()
- yield self.resumed.wait()
-
@tornado.gen.coroutine
- def __call__(self):
+ def start(self):
try:
+ logger.debug("%r: Starting", self)
self.due_date = rpki.sundial.now() + self.timeslice
- if self.started:
- logger.debug("%r: Resuming", self)
- self.resumed.notify()
- else:
- logger.debug("%r: Starting", self)
- self.clear()
- self.started = True
- yield self.start()
+ self.clear()
+ self.started = True
+ yield self.main()
except:
logger.exception("%r: Unhandled exception", self)
- self.exit()
- #
- # Unclear whether we should re-raise the exception here or not,
- # but re-raising it is probably safer until we know for sure.
- #
- raise
+ #raise
+ finally:
+ logger.debug("%r: Exiting", self)
+ self.due_date = None
+ self.started = False
+ self.clear()
+ if self.done_this is not None:
+ self.done_this.notify_all()
+ self.done_this = self.done_next
+ self.done_next = None
+
+ def wait(self):
+ done = "done_next" if self.started else "done_this"
+ condition = getattr(self, done)
+ if condition is None:
+ condition = tornado.locks.Condition()
+ setattr(self, done, condition)
+ future = condition.wait()
+ return future
+
+ def waiting(self):
+ return self.done_this is not None
+
+ @tornado.gen.coroutine
+ def postpone(self):
+ logger.debug("%r: Postponing", self)
+ self.due_date = None
+ self.runnable.clear()
+ yield self.runnable.wait()
+ logger.debug("%r: Resuming", self)
+ self.due_date = rpki.sundial.now() + self.timeslice
@property
def overdue(self):
return rpki.sundial.now() > self.due_date
@tornado.gen.coroutine
- def start(self):
+ def main(self):
raise NotImplementedError
def clear(self):
@@ -134,7 +143,7 @@ class PollParentTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Polling parents", self)
for parent in self.tenant.parents.all():
@@ -166,8 +175,6 @@ class PollParentTask(AbstractTask):
logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name)
yield ca.destroy(parent)
- self.exit()
-
@queue_task
class UpdateChildrenTask(AbstractTask):
@@ -178,7 +185,7 @@ class UpdateChildrenTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating children", self)
now = rpki.sundial.now()
rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin)
@@ -232,8 +239,6 @@ class UpdateChildrenTask(AbstractTask):
except:
logger.exception("%r: Couldn't publish, skipping", self)
- self.exit()
-
@queue_task
class UpdateROAsTask(AbstractTask):
@@ -246,14 +251,14 @@ class UpdateROAsTask(AbstractTask):
self.ca_details = None
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating ROAs", self)
try:
r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
except:
logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
- raise tornado.gen.Return
+ return
logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
@@ -312,8 +317,6 @@ class UpdateROAsTask(AbstractTask):
yield self.publish()
- self.exit()
-
@tornado.gen.coroutine
def publish(self):
if not self.publisher.empty():
@@ -339,7 +342,7 @@ class UpdateGhostbustersTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating Ghostbuster records", self)
parent_handles = set(p.parent_handle for p in self.tenant.parents.all())
@@ -394,8 +397,6 @@ class UpdateGhostbustersTask(AbstractTask):
except:
logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle)
- self.exit()
-
@queue_task
class UpdateEECertificatesTask(AbstractTask):
@@ -407,7 +408,7 @@ class UpdateEECertificatesTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Updating EE certificates", self)
try:
@@ -473,8 +474,6 @@ class UpdateEECertificatesTask(AbstractTask):
except:
logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle)
- self.exit()
-
@queue_task
class RegenerateCRLsAndManifestsTask(AbstractTask):
@@ -490,7 +489,7 @@ class RegenerateCRLsAndManifestsTask(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Regenerating CRLs and manifests", self)
try:
@@ -516,8 +515,6 @@ class RegenerateCRLsAndManifestsTask(AbstractTask):
except:
logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self)
- self.exit()
-
@queue_task
class CheckFailedPublication(AbstractTask):
@@ -527,7 +524,7 @@ class CheckFailedPublication(AbstractTask):
"""
@tornado.gen.coroutine
- def start(self):
+ def main(self):
logger.debug("%r: Checking for failed publication actions", self)
try:
@@ -538,5 +535,3 @@ class CheckFailedPublication(AbstractTask):
except:
logger.exception("%r: Couldn't run failed publications, skipping", self)
-
- self.exit()