diff options
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 705 |
1 files changed, 239 insertions, 466 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 91fa787d..0a9c1654 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -22,9 +22,17 @@ because interactions with rpkid scheduler were getting too complicated. """ import logging + +import tornado.gen +import tornado.web +import tornado.locks +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + import rpki.log import rpki.rpkid -import rpki.async import rpki.up_down import rpki.sundial import rpki.publication @@ -44,45 +52,6 @@ def queue_task(cls): return cls -class CompletionHandler(object): - """ - Track one or more scheduled rpkid tasks and execute a callback when - the last of them terminates. - """ - - ## @var debug - # Debug logging. - - debug = False - - def __init__(self, cb): - self.cb = cb - self.tasks = set() - - def register(self, task): - if self.debug: - logger.debug("Completion handler %r registering task %r", self, task) - self.tasks.add(task) - task.register_completion(self.done) - - def done(self, task): - try: - self.tasks.remove(task) - except KeyError: - logger.warning("Completion handler %r called with unregistered task %r, blundering onwards", self, task) - else: - if self.debug: - logger.debug("Completion handler %r called with registered task %r", self, task) - if not self.tasks: - if self.debug: - logger.debug("Completion handler %r finished, calling %r", self, self.cb) - self.cb() - - @property - def count(self): - return len(self.tasks) - - class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. This just @@ -95,53 +64,61 @@ class AbstractTask(object): timeslice = rpki.sundial.timedelta(seconds = 15) - def __init__(self, rpkid, s, description = None): - self.rpkid = rpkid - self.tenant = s + def __init__(self, rpkid, tenant, description = None): + self.rpkid = rpkid + self.tenant = tenant self.description = description - self.completions = [] - self.continuation = None - self.due_date = None + self.resumed = tornado.locks.Condition() + self.completed = tornado.locks.Condition() + self.due_date = None + self.started = False self.clear() def __repr__(self): return rpki.log.log_repr(self, self.description) - def register_completion(self, completion): - self.completions.append(completion) - def exit(self): - while self.completions: - self.completions.pop(0)(self) - self.clear() + logger.debug("%r: Exiting", self) self.due_date = None + self.started = False + self.clear() + self.completed.notify_all() self.rpkid.task_next() - def postpone(self, continuation): - self.continuation = continuation + @tornado.gen.coroutine + def postpone(self): + logger.debug("%r: Postponed", self) self.due_date = None self.rpkid.task_add(self) self.rpkid.task_next() + yield self.resumed.wait() + @tornado.gen.coroutine def __call__(self): - self.due_date = rpki.sundial.now() + self.timeslice - if self.continuation is None: - logger.debug("Running task %r", self) - self.clear() - self.start() - else: - logger.debug("Restarting task %r at %r", self, self.continuation) - continuation = self.continuation - self.continuation = None - continuation() + try: + self.due_date = rpki.sundial.now() + self.timeslice + if self.started: + logger.debug("%r: Resuming", self) + self.resumed.notify() + else: + logger.debug("%r: Starting", self) + self.clear() + self.started = True + yield self.start() + except: + logger.exception("%r: Unhandled exception", self) + self.exit() + # + # Unclear whether we should re-raise the exception here or not, + # but re-raising it is probably safer until we know for sure. + # + raise @property def overdue(self): return rpki.sundial.now() > self.due_date - def __getattr__(self, name): - return getattr(self.tenant, name) - + @tornado.gen.coroutine def start(self): raise NotImplementedError @@ -152,340 +129,207 @@ class AbstractTask(object): @queue_task class PollParentTask(AbstractTask): """ - Run the regular client poll cycle with each of this self's + Run the regular client poll cycle with each of this tenant's parents, in turn. """ - def clear(self): - logger.debug("PollParentTask.clear()") - self.parent_iterator = None - self.parent = None - self.ca_map = None - self.class_iterator = None - self.started = False - + @tornado.gen.coroutine def start(self): - logger.debug("PollParentTask.start()") - self.rpkid.checkpoint() - logger.debug("Self %s[%r] polling parents", self.tenant_handle, self) - assert not self.started - self.started = True - rpki.async.iterator(self.parents.all(), self.parent_loop, self.exit) - - def parent_loop(self, parent_iterator, parent): - logger.debug("PollParentTask.parent_loop()") - self.parent_iterator = parent_iterator - self.parent = parent - parent.up_down_list_query(rpkid = self.rpkid, cb = self.got_list, eb = self.list_failed) - - def got_list(self, r_msg): - logger.debug("PollParentTask.got_list()") - self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas.all()) - self.rpkid.checkpoint() - rpki.async.iterator(r_msg.getiterator(rpki.up_down.tag_class), self.class_loop, self.class_done) - - def list_failed(self, e): - logger.debug("PollParentTask.list_failed()") - logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent) - self.parent_iterator() - - def class_loop(self, class_iterator, rc): - logger.debug("PollParentTask.class_loop()") - self.rpkid.checkpoint() - self.class_iterator = class_iterator - try: - ca = self.ca_map.pop(rc.get("class_name")) - except KeyError: - rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = self.parent, rc = rc, - cb = class_iterator, eb = self.class_create_failed) - else: - ca.check_for_updates(rpkid = self.rpkid, parent = self.parent, rc = rc, cb = class_iterator, eb = self.class_update_failed) - - def class_update_failed(self, e): - logger.debug("PollParentTask.class_update_failed()") - logger.exception("Couldn't update class, skipping") - self.class_iterator() - - def class_create_failed(self, e): - logger.debug("PollParentTask.class_create_failed()") - logger.exception("Couldn't create class, skipping") - self.class_iterator() - - def class_done(self): - logger.debug("PollParentTask.class_done()") - rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done) - - def ca_loop(self, iterator, ca): - logger.debug("PollParentTask.ca_loop()") - self.rpkid.checkpoint() - ca.destroy(self.parent, iterator) - - def ca_done(self): - logger.debug("PollParentTask.ca_done()") - self.rpkid.checkpoint() - self.parent_iterator() + logger.debug("%r: Polling parents", self) + + for parent in self.tenant.parents.all(): + try: + logger.debug("%r: Executing list query", self) + r_msg = yield parent.up_down_list_query(rpkid = self.rpkid) + except: + logger.exception("%r: Couldn't get resource class list from parent %r, skipping", self, parent) + continue + + logger.debug("%r: Parsing list response", self) + + ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas.all()) + + for rc in r_msg.getiterator(rpki.up_down.tag_class): + try: + class_name = rc.get("class_name") + ca = ca_map.pop(class_name, None) + if ca is None: + logger.debug("%r: Creating new CA for resource class %r", self, class_name) + rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = parent, rc = rc) + else: + logger.debug("%r: Checking updates for existing CA %r for resource class %r", self, ca, class_name) + yield ca.check_for_updates(rpkid = self.rpkid, parent = parent, rc = rc) + except: + logger.exception("Couldn't update resource class %r, skipping", class_name) + + for ca, class_name in ca_map.iteritems(): + logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name) + yield ca.destroy(parent) + + self.exit() @queue_task class UpdateChildrenTask(AbstractTask): """ - Check for updated IRDB data for all of this self's children and + Check for updated IRDB data for all of this tenant's children and issue new certs as necessary. Must handle changes both in resources and in expiration date. """ - def clear(self): - self.now = None - self.rsn = None - self.publisher = None - self.iterator = None - self.child = None - self.child_certs = None - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating children", self.tenant_handle, self) - assert not self.started - self.started = True - self.now = rpki.sundial.now() - self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) - self.publisher = rpki.rpkid.publication_queue(self.rpkid) - rpki.async.iterator(self.children.all(), self.loop, self.done) - - def loop(self, iterator, child): - self.rpkid.checkpoint() - self.iterator = iterator - self.child = child - self.child_certs = child.child_certs - if self.overdue: - self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed) - else: - self.do_child() - - def do_child(self): - if self.child_certs: - self.rpkid.irdb_query_child_resources(self.child.tenant.tenant_handle, self.child.child_handle, - self.got_resources, self.lose) - else: - self.iterator() - - def lose(self, e): - logger.exception("Couldn't update child %r, skipping", self.child) - self.iterator() - - def got_resources(self, irdb_resources): + logger.debug("%r: Updating children", self) + now = rpki.sundial.now() + rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin) + publisher = rpki.rpkid.publication_queue(self.rpkid) + + for child in self.tenant.children.all(): + try: + if self.overdue: + yield publisher.call_pubd() + yield self.postpone() + + child_certs = list(child.child_certs.filter(ca_detail__state = "active")) + + if child_certs: + irdb_resources = yield self.rpkid.irdb_query_child_resources(child.tenant.tenant_handle, child.child_handle) + + for child_cert in child_certs: + ca_detail = child_cert.ca_detail + old_resources = child_cert.cert.get_3779resources() + new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() + old_aia = child_cert.cert.get_AIA()[0] + new_aia = ca_detail.ca_cert_uri + + if new_resources.empty(): + logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI()) + child_cert.revoke(publisher = publisher) + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + + elif (old_resources != new_resources or old_aia != new_aia or (old_resources.valid_until < rsn and irdb_resources.valid_until > now and old_resources.valid_until != irdb_resources.valid_until)): + logger.debug("Need to reissue child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI()) + if old_resources != new_resources: + logger.debug("Child %s SKI %s resources changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources, new_resources) + if old_resources.valid_until != irdb_resources.valid_until: + logger.debug("Child %s SKI %s validity changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until) + + new_resources.valid_until = irdb_resources.valid_until + child_cert.reissue(ca_detail = ca_detail, resources = new_resources, publisher = publisher) + + elif old_resources.valid_until < now: + logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until) + child_cert.delete() + publisher.queue(uri = child_cert.uri, old_obj = child_cert.cert, repository = ca_detail.ca.parent.repository) + ca_detail.generate_manifest(publisher = publisher) + + except: + logger.exception("%r: Couldn't update child %r, skipping", self, child) + try: - for child_cert in self.child_certs.filter(ca_detail__state = "active"): - ca_detail = child_cert.ca_detail - old_resources = child_cert.cert.get_3779resources() - new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() - old_aia = child_cert.cert.get_AIA()[0] - new_aia = ca_detail.ca_cert_uri - - if new_resources.empty(): - logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - child_cert.revoke(publisher = self.publisher) - ca_detail.generate_crl(publisher = self.publisher) - ca_detail.generate_manifest(publisher = self.publisher) - - elif (old_resources != new_resources or - old_aia != new_aia or - (old_resources.valid_until < self.rsn and - irdb_resources.valid_until > self.now and - old_resources.valid_until != irdb_resources.valid_until)): - - logger.debug("Need to reissue child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - if old_resources != new_resources: - logger.debug("Child %s SKI %s resources changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources, new_resources) - if old_resources.valid_until != irdb_resources.valid_until: - logger.debug("Child %s SKI %s validity changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - - new_resources.valid_until = irdb_resources.valid_until - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources, - publisher = self.publisher) - - elif old_resources.valid_until < self.now: - logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - child_cert.delete() - self.publisher.queue( - uri = child_cert.uri, - old_obj = child_cert.cert, - repository = ca_detail.ca.parent.repository) - ca_detail.generate_manifest(publisher = self.publisher) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - self.rpkid.checkpoint() - self.lose(e) - else: - self.rpkid.checkpoint() - self.iterator() - - def done(self): - self.rpkid.checkpoint() - self.publisher.call_pubd(self.exit, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() + yield publisher.call_pubd() + except: + logger.exception("%r: Couldn't publish, skipping", self) + self.exit() @queue_task class UpdateROAsTask(AbstractTask): """ - Generate or update ROAs for this self. + Generate or update ROAs for this tenant. """ def clear(self): - self.orphans = None - self.updates = None - self.publisher = None + self.publisher = None self.ca_details = None - self.count = None - self.started = False + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating ROAs", self.tenant_handle, self) - assert not self.started - self.started = True - logger.debug("Issuing query for ROA requests") - self.rpkid.irdb_query_roa_requests(self.tenant_handle, self.got_roa_requests, self.roa_requests_failed) + logger.debug("%r: Updating ROAs", self) + + try: + r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) + except: + logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) + raise tornado.gen.Return - def got_roa_requests(self, r_msg): - self.rpkid.checkpoint() - logger.debug("Received response to query for ROA requests") + logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) roas = {} seen = set() - self.orphans = [] - self.updates = [] + orphans = [] + updates = [] self.publisher = rpki.rpkid.publication_queue(self.rpkid) self.ca_details = set() - logger.debug("UpdateROAsTask.got_roa_requests(): setup done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX - for roa in self.tenant.roas.all(): - logger.debug("UpdateROAsTask.got_roa_requests(): roa loop, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: roas[k] = roa - elif (roa.roa is not None and - roa.cert is not None and - roa.ca_detail is not None and - roa.ca_detail.state == "active" and - (roas[k].roa is None or - roas[k].cert is None or - roas[k].ca_detail is None or - roas[k].ca_detail.state != "active")): - self.orphans.append(roas[k]) + elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): + orphans.append(roas[k]) roas[k] = roa else: - self.orphans.append(roa) - - logger.debug("UpdateROAsTask.got_roa_requests(): roa loop done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX + orphans.append(roa) for r_pdu in r_msg: - logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) if k in seen: - logger.warning("Skipping duplicate ROA request %r", r_pdu) + logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) else: seen.add(k) roa = roas.pop(k, None) if roa is None: - roa = rpki.rpkidb.models.ROA(asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) - roa.tenant = self.tenant - logger.debug("Created new %r", roa) + roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + logger.debug("%r: Created new %r", self, roa) else: - logger.debug("Found existing %r", roa) - self.updates.append(roa) + logger.debug("%r: Found existing %r", self, roa) + updates.append(roa) - logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX + orphans.extend(roas.itervalues()) - self.orphans.extend(roas.itervalues()) + while updates: + if self.overdue: + yield self.publish() + yield self.postpone() + roa = updates.pop(0) + try: + roa.update(publisher = self.publisher, fast = True) + self.ca_details.add(roa.ca_detail) + except rpki.exceptions.NoCoveringCertForROA: + logger.warning("%r: No covering certificate for %r, skipping", self, roa) + except: + logger.exception("%r: Could not update %r, skipping", self, roa) - if self.overdue: - self.postpone(self.begin_loop) - else: - self.begin_loop() + for roa in orphans: + try: + self.ca_details.add(roa.ca_detail) + roa.revoke(publisher = self.publisher, fast = True) + except: + logger.exception("%r: Could not revoke %r", self, roa) - def begin_loop(self): - self.count = 0 - rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True) + yield self.publish() - def loop(self, iterator, roa): - self.rpkid.checkpoint() - try: - roa.update(publisher = self.publisher, fast = True) - self.ca_details.add(roa.ca_detail) - except (SystemExit, rpki.async.ExitNow): - raise - except rpki.exceptions.NoCoveringCertForROA: - logger.warning("No covering certificate for %r, skipping", roa) - except Exception: - logger.exception("Could not update %r, skipping", roa) - self.count += 1 - if self.overdue: - self.publish(lambda: self.postpone(iterator)) - else: - iterator() - - def publish(self, done): + self.exit() + + @tornado.gen.coroutine + def publish(self): if not self.publisher.empty(): for ca_detail in self.ca_details: - logger.debug("Generating new CRL for %r", ca_detail) + logger.debug("%r: Generating new CRL for %r", self, ca_detail) ca_detail.generate_crl(publisher = self.publisher) - logger.debug("Generating new manifest for %r", ca_detail) + logger.debug("%r: Generating new manifest for %r", self, ca_detail) ca_detail.generate_manifest(publisher = self.publisher) + yield self.publisher.call_pubd() self.ca_details.clear() - self.rpkid.checkpoint() - self.publisher.call_pubd(done, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() - - def done(self): - for roa in self.orphans: - try: - self.ca_details.add(roa.ca_detail) - roa.revoke(publisher = self.publisher, fast = True) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not revoke %r", roa) - self.rpkid.checkpoint() - self.publish(self.exit) - - def roa_requests_failed(self, e): - logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant_handle) - self.exit() @queue_task class UpdateGhostbustersTask(AbstractTask): """ - Generate or update Ghostbuster records for this self. + Generate or update Ghostbuster records for this tenant. This was originally based on the ROA update code. It's possible that both could benefit from refactoring, but at this point the @@ -494,23 +338,13 @@ class UpdateGhostbustersTask(AbstractTask): exceptionally silly. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating Ghostbuster records", self.tenant_handle, self) - assert not self.started - self.started = True + logger.debug("%r: Updating Ghostbuster records", self) parent_handles = set(p.parent_handle for p in self.tenant.parents.all()) - self.rpkid.irdb_query_ghostbuster_requests(self.tenant_handle, parent_handles, - self.got_ghostbuster_requests, - self.ghostbuster_requests_failed) - - def got_ghostbuster_requests(self, r_msg): try: - self.rpkid.checkpoint() + r_msg = yield self.rpkid.irdb_query_ghostbuster_requests(self.tenant.tenant_handle, parent_handles) ghostbusters = {} orphans = [] @@ -529,22 +363,20 @@ class UpdateGhostbustersTask(AbstractTask): try: self.tenant.parents.get(parent_handle = r_pdu.get("parent_handle")) except rpki.rpkidb.models.Parent.DoesNotExist: - logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", r_pdu.get("parent_handle")) + logger.warning("%r: Unknown parent_handle %r in Ghostbuster request, skipping", self, r_pdu.get("parent_handle")) continue k = (r_pdu.get("parent_handle"), r_pdu.text) if k in seen: - logger.warning("Skipping duplicate Ghostbuster request %r", r_pdu) + logger.warning("%r: Skipping duplicate Ghostbuster request %r", self, r_pdu) continue seen.add(k) - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), - ca__parent__tenant = self.tenant, state = "active"): + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), ca__parent__tenant = self.tenant, state = "active"): ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None) if ghostbuster is None: - ghostbuster = rpki.rpkidb.models.Ghostbuster(ca_detail = ca_detail, vcard = r_pdu.text) - ghostbuster.tenant = self.tenant - logger.debug("Created new %r for %r", ghostbuster, r_pdu.get("parent_handle")) + ghostbuster = rpki.rpkidb.models.Ghostbuster(tenant = self.tenant, ca_detail = ca_detail, vcard = r_pdu.text) + logger.debug("%r: Created new %r for %r", self, ghostbuster, r_pdu.get("parent_handle")) else: - logger.debug("Found existing %r for %s", ghostbuster, r_pdu.get("parent_handle")) + logger.debug("%r: Found existing %r for %s", self, ghostbuster, r_pdu.get("parent_handle")) ghostbuster.update(publisher = publisher, fast = True) ca_details.add(ca_detail) @@ -557,22 +389,11 @@ class UpdateGhostbustersTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) + yield publisher.call_pubd() - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant_handle) - self.exit() - - def publication_failed(self, e): - logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + except: + logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle) - def ghostbuster_requests_failed(self, e): - logger.exception("Could not fetch Ghostbuster record requests for %s, skipping", self.tenant_handle) self.exit() @@ -585,22 +406,12 @@ class UpdateEECertificatesTask(AbstractTask): so keeping it simple for initial version, we can optimize later. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating EE certificates", self.tenant_handle, self) - assert not self.started - self.started = True - self.rpkid.irdb_query_ee_certificate_requests(self.tenant_handle, - self.got_requests, - self.get_requests_failed) - - def got_requests(self, r_msg): + logger.debug("%r: Updating EE certificates", self) try: - self.rpkid.checkpoint() + r_msg = yield self.rpkid.irdb_query_ee_certificate_requests(self.tenant.tenant_handle) publisher = rpki.rpkid.publication_queue(self.rpkid) @@ -621,28 +432,23 @@ class UpdateEECertificatesTask(AbstractTask): v4 = rpki.resource_set.resource_set_ipv4(r_pdu.get("ipv4")), v6 = rpki.resource_set.resource_set_ipv6(r_pdu.get("ipv6")), valid_until = rpki.sundial.datetime.fromXMLtime(r_pdu.get("valid_until"))) - covering = self.find_covering_ca_details(resources) + covering = self.tenant.find_covering_ca_details(resources) ca_details.update(covering) for ee in ees: if ee.ca_detail in covering: - logger.debug("Updating existing EE certificate for %s %s", - gski, resources) - ee.reissue( - resources = resources, - publisher = publisher) + logger.debug("Updating existing EE certificate for %s %s", gski, resources) + ee.reissue(resources = resources, publisher = publisher) covering.remove(ee.ca_detail) else: - logger.debug("Existing EE certificate for %s %s is no longer covered", - gski, resources) + logger.debug("Existing EE certificate for %s %s is no longer covered", gski, resources) ee.revoke(publisher = publisher) subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn")) subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey() for ca_detail in covering: - logger.debug("No existing EE certificate for %s %s", - gski, resources) + logger.debug("No existing EE certificate for %s %s", gski, resources) rpki.rpkidb.models.EECertificate.create( # sic: class method, not Django manager method (for now, anyway) ca_detail = ca_detail, subject_name = subject_name, @@ -661,29 +467,18 @@ class UpdateEECertificatesTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update EE certificates for %s, skipping", self.tenant_handle) - self.exit() + yield publisher.call_pubd() - def publication_failed(self, e): - logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + except: + logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle) - def get_requests_failed(self, e): - logger.exception("Could not fetch EE certificate requests for %s, skipping", self.tenant_handle) self.exit() @queue_task class RegenerateCRLsAndManifestsTask(AbstractTask): """ - Generate new CRLs and manifests as necessary for all of this self's + Generate new CRLs and manifests as necessary for all of this tenant's CAs. Extracting nextUpdate from a manifest is hard at the moment due to implementation silliness, so for now we generate a new manifest whenever we generate a new CRL @@ -693,48 +488,33 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): database anyway. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] regenerating CRLs and manifests", self.tenant_handle, self) - assert not self.started - self.started = True - now = rpki.sundial.now() - crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval) - regen_margin = max(self.rpkid.cron_period * 2, crl_interval / 4) - publisher = rpki.rpkid.publication_queue(self.rpkid) + logger.debug("%r: Regenerating CRLs and manifests", self) - logger.debug("RegenerateCRLsAndManifestsTask: setup complete") # XXX + try: + now = rpki.sundial.now() + crl_interval = rpki.sundial.timedelta(seconds = self.tenant.crl_interval) + regen_margin = max(rpki.sundial.timedelta(seconds = self.rpkid.cron_period) * 2, crl_interval / 4) + publisher = rpki.rpkid.publication_queue(self.rpkid) - for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant): - logger.debug("RegenerateCRLsAndManifestsTask: checking CA %r", ca) # XXX - try: - for ca_detail in ca.ca_details.filter(state = "revoked"): - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.destroy(ca = ca, publisher = publisher) - for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")): - if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca) - - logger.debug("RegenerateCRLsAndManifestsTask: CA loop complete") # XXX - - self.rpkid.checkpoint() - publisher.call_pubd(self.done, self.lose) - - def done(self): - logger.debug("RegenerateCRLsAndManifestsTask: publication complete") # XXX - self.exit() + for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant): + try: + for ca_detail in ca.ca_details.filter(state = "revoked"): + if now > ca_detail.latest_crl.getNextUpdate(): + ca_detail.destroy(ca = ca, publisher = publisher) + for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")): + if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + except: + logger.exception("%r: Couldn't regenerate CRLs and manifests for CA %r, skipping", self, ca) + + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self) - def lose(self, e): - logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.tenant_handle) - self.rpkid.checkpoint() self.exit() @@ -745,24 +525,17 @@ class CheckFailedPublication(AbstractTask): to pubd being down or unreachable). """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - assert not self.started - logger.debug("CheckFailedPublication starting") - self.started = True - publisher = rpki.rpkid.publication_queue(self.rpkid) - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): - ca_detail.check_failed_publication(publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.done, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + logger.debug("%r: Checking for failed publication actions", self) + + try: + publisher = rpki.rpkid.publication_queue(self.rpkid) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): + ca_detail.check_failed_publication(publisher) + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't run failed publications, skipping", self) - def done(self): - logger.debug("CheckFailedPublication finished") self.exit() |