aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r--rpki/rpkid_tasks.py211
1 files changed, 91 insertions, 120 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index 7e24db35..c359a8d6 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -53,6 +53,14 @@ def queue_task(cls):
return cls
+class PostponeTask(Exception):
+ """
+ Exit a task without finishing it. We use this to signal that a
+ long-running task wants to yield to the task loop but hasn't yet
+ run to completion.
+ """
+
+
class AbstractTask(object):
"""
Abstract base class for rpkid scheduler task objects.
@@ -66,51 +74,47 @@ class AbstractTask(object):
#timeslice = rpki.sundial.timedelta(seconds = 15)
timeslice = rpki.sundial.timedelta(seconds = 60)
- ## @var serialize
- # Lock to force prevent more than one task from running at a time.
-
- serialize = tornado.locks.Lock()
-
def __init__(self, rpkid, tenant, description = None):
self.rpkid = rpkid
self.tenant = tenant
self.description = description
- self.runnable = tornado.locks.Event()
self.done_this = None
self.done_next = None
self.due_date = None
self.started = False
- self.runnable.set()
self.clear()
- # This field belongs to the rpkid task_loop(), don't touch.
- self.future = None
-
def __repr__(self):
return rpki.log.log_repr(self, self.description)
+ def reset_due_date(self):
+ self.due_date = rpki.sundial.now() + self.timeslice
+
@tornado.gen.coroutine
def start(self):
try:
- yield self.serialize.acquire()
logger.debug("%r: Starting", self)
- self.due_date = rpki.sundial.now() + self.timeslice
+ self.reset_due_date()
self.clear()
self.started = True
+ postponing = False
yield self.main()
+ except PostponeTask:
+ postponing = True
except:
logger.exception("%r: Unhandled exception", self)
- #raise
finally:
- logger.debug("%r: Exiting", self)
self.due_date = None
self.started = False
self.clear()
- if self.done_this is not None:
- self.done_this.notify_all()
- self.done_this = self.done_next
- self.done_next = None
- self.serialize.release()
+ if postponing:
+ logger.debug("%r: Postponing", self)
+ else:
+ logger.debug("%r: Exiting", self)
+ if self.done_this is not None:
+ self.done_this.notify_all()
+ self.done_this = self.done_next
+ self.done_next = None
def wait(self):
done = "done_next" if self.started else "done_this"
@@ -125,27 +129,9 @@ class AbstractTask(object):
return self.done_this is not None
@tornado.gen.coroutine
- def postpone(self):
- logger.debug("%r: Postponing", self)
- self.due_date = None
- self.runnable.clear()
- tasks = tuple(task for task in self.rpkid.task_queue if task is not None)
- if any(task.runnable.is_set() for task in tasks):
- logger.debug("%r: Runable tasks exist, leaving well enough alone", self)
- else:
- logger.debug("%r: All tasks were postponed, reenabling one picked at random", self)
- random.choice(tasks).runnable.set()
- try:
- self.serialize.release()
- yield self.runnable.wait()
- finally:
- yield self.serialize.acquire()
- logger.debug("%r: Resuming", self)
- self.due_date = rpki.sundial.now() + self.timeslice
-
- @property
def overdue(self):
- return rpki.sundial.now() > self.due_date
+ yield tornado.gen.moment
+ raise tornado.gen.Return(len(self.rpkid.task_ready) > 0 and rpki.sundial.now() > self.due_date)
@tornado.gen.coroutine
def main(self):
@@ -325,9 +311,10 @@ class UpdateChildrenTask(AbstractTask):
for child in self.tenant.children.all():
try:
- if self.overdue:
+ if (yield self.overdue()):
yield publisher.call_pubd()
- yield self.postpone()
+ self.rpkid.task_add(self)
+ raise PostponeTask
child_certs = list(child.child_certs.filter(ca_detail__state = "active"))
@@ -382,10 +369,6 @@ class UpdateROAsTask(AbstractTask):
Generate or update ROAs for this tenant.
"""
- def clear(self):
- self.publisher = None
- self.ca_details = None
-
# XXX This might need rewriting to avoid race conditions.
#
# There's a theoretical race condition here if we're chugging away
@@ -394,100 +377,88 @@ class UpdateROAsTask(AbstractTask):
# fairly low given that we defer CRL and manifest generation until
# we're ready to publish, but it's theoretically present.
- # Rewritten to conserve memory when postponing. We used to treat
- # postponement as just another odd kind of voluntary yield, but
- # when working with really big data sets this tends to result in
- # extreme memory bloat. So, instead, once we hit the postponement
- # limit we publish what we've got, discard all of our transient
- # data, and recompute what we need to do when we come back. This
- # is a bit less efficient, but should tend to converge towards the
- # correct result, with a much smaller memory footprint.
-
@tornado.gen.coroutine
def main(self):
logger.debug("%r: Updating ROAs", self)
- while True: # Postponement loop
-
- try:
- r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
- except:
- logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
- return
+ try:
+ r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
+ except:
+ logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
+ return
- logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
+ logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
- roas = {}
- seen = set()
- orphans = []
- updates = []
- self.publisher = rpki.rpkid.publication_queue(self.rpkid) # pylint: disable=W0201
- self.ca_details = set() # pylint: disable=W0201
-
- for roa in self.tenant.roas.all():
- k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6)
- if k not in roas:
- roas[k] = roa
- elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
- orphans.append(roas[k])
- roas[k] = roa
- else:
- orphans.append(roa)
+ roas = {}
+ seen = set()
+ orphans = []
+ updates = []
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ ca_details = set()
+
+ for roa in self.tenant.roas.all():
+ k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6)
+ if k not in roas:
+ roas[k] = roa
+ elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
+ orphans.append(roas[k])
+ roas[k] = roa
+ else:
+ orphans.append(roa)
- for r_pdu in r_msg:
- k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
- if k in seen:
- logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
+ for r_pdu in r_msg:
+ k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
+ if k in seen:
+ logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
+ logger.debug("%r: Created new %r", self, roa)
else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
- logger.debug("%r: Created new %r", self, roa)
- else:
- logger.debug("%r: Found existing %r", self, roa)
- updates.append(roa)
+ logger.debug("%r: Found existing %r", self, roa)
+ updates.append(roa)
- r_msg = seen = None
+ r_msg = seen = None
- orphans.extend(roas.itervalues())
+ orphans.extend(roas.itervalues())
- roas = None
+ roas = None
- while updates and not self.overdue:
- roa = updates.pop(0)
- try:
- roa.update(publisher = self.publisher)
- self.ca_details.add(roa.ca_detail)
- except rpki.exceptions.NoCoveringCertForROA:
- logger.warning("%r: No covering certificate for %r, skipping", self, roa)
- except:
- logger.exception("%r: Could not update %r, skipping", self, roa)
+ postponing = False
- if not self.overdue:
+ while updates and not postponing:
+ if (yield self.overdue()):
+ postponing = True
break
-
- roas = seen = orphans = updates = None
- yield self.publish()
- yield self.postpone()
-
- for roa in orphans:
+ roa = updates.pop(0)
try:
- self.ca_details.add(roa.ca_detail)
- roa.revoke(publisher = self.publisher)
+ roa.update(publisher = publisher)
+ ca_details.add(roa.ca_detail)
+ except rpki.exceptions.NoCoveringCertForROA:
+ logger.warning("%r: No covering certificate for %r, skipping", self, roa)
except:
- logger.exception("%r: Could not revoke %r", self, roa)
+ logger.exception("%r: Could not update %r, skipping", self, roa)
- yield self.publish()
+ updates = None
- @tornado.gen.coroutine
- def publish(self):
- if not self.publisher.empty():
- for ca_detail in self.ca_details:
+ if not postponing:
+ for roa in orphans:
+ try:
+ ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = publisher)
+ except:
+ logger.exception("%r: Could not revoke %r", self, roa)
+
+ if not publisher.empty():
+ for ca_detail in ca_details:
logger.debug("%r: Generating new CRL and manifest for %r", self, ca_detail)
- ca_detail.generate_crl_and_manifest(publisher = self.publisher)
- yield self.publisher.call_pubd()
- self.ca_details.clear()
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+ yield publisher.call_pubd()
+
+ if postponing:
+ raise PostponeTask
@queue_task