diff options
author | Rob Austein <sra@hactrn.net> | 2015-11-10 13:09:07 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-11-10 13:09:07 +0000 |
commit | ac415cdd0f88f8479975627772dd0a84797b261a (patch) | |
tree | 4c943706862165f42d4164138504446c3e132ea0 /rpki/rpkid_tasks.py | |
parent | 947f220a4884a44b62afd18892b14433e440a139 (diff) |
Use a lock to serialize rpkid tasks. Add temporary trace call
sequence trace code to rpki.rpkidb.models to assist in simplifying
some of the gratuitously complicated method call chains. Various
trivial PyLint cleanups.
svn path=/branches/tk705/; revision=6161
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 5c28afc3..989042b9 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -47,7 +47,7 @@ def queue_task(cls): Class decorator to add a new task class to task_classes. """ - global task_classes + global task_classes # pylint: disable=W0603 task_classes += (cls,) return cls @@ -63,6 +63,11 @@ class AbstractTask(object): timeslice = rpki.sundial.timedelta(seconds = 15) + ## @var serialize + # Lock to force prevent more than one task from running at a time. + + serialize = tornado.locks.Lock() + def __init__(self, rpkid, tenant, description = None): self.rpkid = rpkid self.tenant = tenant @@ -84,6 +89,7 @@ class AbstractTask(object): @tornado.gen.coroutine def start(self): try: + yield self.serialize.acquire() logger.debug("%r: Starting", self) self.due_date = rpki.sundial.now() + self.timeslice self.clear() @@ -101,6 +107,7 @@ class AbstractTask(object): self.done_this.notify_all() self.done_this = self.done_next self.done_next = None + self.serialize.release() def wait(self): done = "done_next" if self.started else "done_this" @@ -119,7 +126,11 @@ class AbstractTask(object): logger.debug("%r: Postponing", self) self.due_date = None self.runnable.clear() - yield self.runnable.wait() + try: + self.serialize.release() + yield self.runnable.wait() + finally: + yield self.serialize.acquire() logger.debug("%r: Resuming", self) self.due_date = rpki.sundial.now() + self.timeslice @@ -217,7 +228,7 @@ class UpdateChildrenTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - elif (old_resources != new_resources or old_aia != new_aia or (old_resources.valid_until < rsn and irdb_resources.valid_until > now and old_resources.valid_until != irdb_resources.valid_until)): + elif old_resources != new_resources or old_aia != new_aia or (old_resources.valid_until < rsn and irdb_resources.valid_until > now and old_resources.valid_until != irdb_resources.valid_until): logger.debug("Need to reissue child %s certificate g(SKI) %s", child.child_handle, child_cert.gski) if old_resources != new_resources: logger.debug("Child %s g(SKI) %s resources changed: old %s new %s", child.child_handle, child_cert.gski, old_resources, new_resources) @@ -275,7 +286,7 @@ class UpdateROAsTask(AbstractTask): k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: roas[k] = roa - elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): + elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): orphans.append(roas[k]) roas[k] = roa else: @@ -418,6 +429,8 @@ class UpdateEECertificatesTask(AbstractTask): publisher = rpki.rpkid.publication_queue(self.rpkid) + logger.debug("%r: Examining EE certificate requests", self) + existing = dict() for ee in self.tenant.ee_certificates.all(): gski = ee.gski @@ -441,25 +454,25 @@ class UpdateEECertificatesTask(AbstractTask): for ee in ees: if ee.ca_detail in covering: - logger.debug("Updating existing EE certificate for %s %s", gski, resources) + logger.debug("%r: Updating existing EE certificate for %s %s", self, gski, resources) ee.reissue(resources = resources, publisher = publisher) covering.remove(ee.ca_detail) else: - logger.debug("Existing EE certificate for %s %s is no longer covered", gski, resources) + logger.debug("%r: Existing EE certificate for %s %s is no longer covered", self, gski, resources) ee.revoke(publisher = publisher) subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn")) subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey() for ca_detail in covering: - logger.debug("No existing EE certificate for %s %s", gski, resources) + logger.debug("%r: No existing EE certificate for %s %s", self, gski, resources) rpki.rpkidb.models.EECertificate.create( # sic: class method, not Django manager method (for now, anyway) - ca_detail = ca_detail, - subject_name = subject_name, - subject_key = subject_key, - resources = resources, - publisher = publisher, - eku = r_pdu.get("eku", "").split(",") or None) + ca_detail = ca_detail, + subject_name = subject_name, + subject_key = subject_key, + resources = resources, + publisher = publisher, + eku = r_pdu.get("eku", "").split(",") or None) # Anything left is an orphan for ees in existing.values(): @@ -474,7 +487,7 @@ class UpdateEECertificatesTask(AbstractTask): yield publisher.call_pubd() except: - logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle) + logger.exception("%r: Could not update EE certificates, skipping", self) @queue_task |