diff options
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 1265 |
1 files changed, 604 insertions, 661 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 58b4bcfe..ee4f90d3 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -22,9 +22,18 @@ because interactions with rpkid scheduler were getting too complicated. """ import logging +import random + +import tornado.gen +import tornado.web +import tornado.locks +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + import rpki.log import rpki.rpkid -import rpki.async import rpki.up_down import rpki.sundial import rpki.publication @@ -35,700 +44,634 @@ logger = logging.getLogger(__name__) task_classes = () def queue_task(cls): - """ - Class decorator to add a new task class to task_classes. - """ - - global task_classes - task_classes += (cls,) - return cls - - -class CompletionHandler(object): - """ - Track one or more scheduled rpkid tasks and execute a callback when - the last of them terminates. - """ - - ## @var debug - # Debug logging. - - debug = False - - def __init__(self, cb): - self.cb = cb - self.tasks = set() - - def register(self, task): - if self.debug: - logger.debug("Completion handler %r registering task %r", self, task) - self.tasks.add(task) - task.register_completion(self.done) - - def done(self, task): - try: - self.tasks.remove(task) - except KeyError: - logger.warning("Completion handler %r called with unregistered task %r, blundering onwards", self, task) - else: - if self.debug: - logger.debug("Completion handler %r called with registered task %r", self, task) - if not self.tasks: - if self.debug: - logger.debug("Completion handler %r finished, calling %r", self, self.cb) - self.cb() - - @property - def count(self): - return len(self.tasks) + """ + Class decorator to add a new task class to task_classes. + """ + + global task_classes # pylint: disable=W0603 + task_classes += (cls,) + return cls + + +class PostponeTask(Exception): + """ + Exit a task without finishing it. We use this to signal that a + long-running task wants to yield to the task loop but hasn't yet + run to completion. + """ class AbstractTask(object): - """ - Abstract base class for rpkid scheduler task objects. This just - handles the scheduler hooks, real work starts in self.start. - - NB: This assumes that the rpki.rpkid.rpkid.task_* methods have been - rewritten to expect instances of subclasses of this class, rather - than expecting thunks to be wrapped up in the older version of this - class. Rewrite, rewrite, remove this comment when done, OK! - """ - - ## @var timeslice - # How long before a task really should consider yielding the CPU to - # let something else run. - - timeslice = rpki.sundial.timedelta(seconds = 15) - - def __init__(self, s, description = None): - self.self = s - self.description = description - self.completions = [] - self.continuation = None - self.due_date = None - self.clear() - - def __repr__(self): - return rpki.log.log_repr(self, self.description) - - def register_completion(self, completion): - self.completions.append(completion) - - def exit(self): - self.self.gctx.sql.sweep() - while self.completions: - self.completions.pop(0)(self) - self.clear() - self.due_date = None - self.self.gctx.task_next() - - def postpone(self, continuation): - self.self.gctx.sql.sweep() - self.continuation = continuation - self.due_date = None - self.self.gctx.task_add(self) - self.self.gctx.task_next() - - def __call__(self): - self.due_date = rpki.sundial.now() + self.timeslice - if self.continuation is None: - logger.debug("Running task %r", self) - self.clear() - self.start() - else: - logger.debug("Restarting task %r at %r", self, self.continuation) - continuation = self.continuation - self.continuation = None - continuation() - - @property - def overdue(self): - return rpki.sundial.now() > self.due_date - - def __getattr__(self, name): - return getattr(self.self, name) - - def start(self): - raise NotImplementedError - - def clear(self): - pass + """ + Abstract base class for rpkid scheduler task objects. + """ + + ## @var timeslice + # How long before a task really should consider yielding the CPU + # to let something else run. Should this be something we can + # configure from rpki.conf? + + #timeslice = rpki.sundial.timedelta(seconds = 15) + timeslice = rpki.sundial.timedelta(seconds = 120) + + def __init__(self, rpkid, tenant, description = None): + self.rpkid = rpkid + self.tenant = tenant + self.description = description + self.done_this = None + self.done_next = None + self.due_date = None + self.started = False + self.postponed = False + self.clear() + + def __repr__(self): + return rpki.log.log_repr(self, self.description) + + @tornado.gen.coroutine + def start(self): + try: + logger.debug("%r: Starting", self) + self.due_date = rpki.sundial.now() + self.timeslice + self.clear() + self.started = True + self.postponed = False + yield self.main() + except PostponeTask: + self.postponed = True + except: + logger.exception("%r: Unhandled exception", self) + finally: + self.due_date = None + self.started = False + self.clear() + if self.postponed: + logger.debug("%r: Postponing", self) + self.rpkid.task_add(self) + else: + logger.debug("%r: Exiting", self) + if self.done_this is not None: + self.done_this.notify_all() + self.done_this = self.done_next + self.done_next = None + + def wait(self): + done = "done_next" if self.started else "done_this" + condition = getattr(self, done) + if condition is None: + condition = tornado.locks.Condition() + setattr(self, done, condition) + future = condition.wait() + return future + + def waiting(self): + return self.done_this is not None + + @tornado.gen.coroutine + def overdue(self): + yield tornado.gen.moment + raise tornado.gen.Return(rpki.sundial.now() > self.due_date and + any(not task.postponed for task in self.rpkid.task_ready)) + + @tornado.gen.coroutine + def main(self): + raise NotImplementedError + + def clear(self): + pass @queue_task class PollParentTask(AbstractTask): - """ - Run the regular client poll cycle with each of this self's - parents, in turn. - """ - - def clear(self): - self.parent_iterator = None - self.parent = None - self.ca_map = None - self.class_iterator = None - - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] polling parents", self.self_handle, self.self_id) - rpki.async.iterator(self.parents, self.parent_loop, self.exit) - - def parent_loop(self, parent_iterator, parent): - self.parent_iterator = parent_iterator - self.parent = parent - rpki.up_down.list_pdu.query(parent, self.got_list, self.list_failed) - - def got_list(self, r_msg): - self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas) - self.gctx.checkpoint() - rpki.async.iterator(r_msg.payload.classes, self.class_loop, self.class_done) - - def list_failed(self, e): - logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent) - self.parent_iterator() - - def class_loop(self, class_iterator, rc): - self.gctx.checkpoint() - self.class_iterator = class_iterator - try: - ca = self.ca_map.pop(rc.class_name) - except KeyError: - rpki.rpkid.ca_obj.create(self.parent, rc, class_iterator, self.class_create_failed) - else: - ca.check_for_updates(self.parent, rc, class_iterator, self.class_update_failed) - - def class_update_failed(self, e): - logger.exception("Couldn't update class, skipping") - self.class_iterator() - - def class_create_failed(self, e): - logger.exception("Couldn't create class, skipping") - self.class_iterator() - - def class_done(self): - rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done) - - def ca_loop(self, iterator, ca): - self.gctx.checkpoint() - ca.delete(self.parent, iterator) - - def ca_done(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() - self.parent_iterator() + """ + Run the regular client poll cycle with each of this tenant's + parents, in turn. + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Polling parents", self) + + for parent in rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant): + try: + logger.debug("%r: Executing list query", self) + list_r_msg = yield parent.up_down_list_query(rpkid = self.rpkid) + except: + logger.exception("%r: Couldn't get resource class list from %r, skipping", self, parent) + continue + + logger.debug("%r: Parsing list response", self) + + ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas.all()) + + for rc in list_r_msg.getiterator(rpki.up_down.tag_class): + try: + class_name = rc.get("class_name") + ca = ca_map.pop(class_name, None) + if ca is None: + yield self.create(parent = parent, rc = rc, class_name = class_name) + else: + yield self.update(parent = parent, rc = rc, class_name = class_name, ca = ca) + except: + logger.exception("Couldn't update resource class %r, skipping", class_name) + + for class_name, ca in ca_map.iteritems(): + logger.debug("%r: Destroying orphaned %r for resource class %r", self, ca, class_name) + yield ca.destroy(rpkid = self.rpkid, parent = parent) + + @tornado.gen.coroutine + def create(self, parent, rc, class_name): + logger.debug("%r: Creating new CA for resource class %r", self, class_name) + ca = rpki.rpkidb.models.CA.objects.create( + parent = parent, + parent_resource_class = class_name, + sia_uri = parent.construct_sia_uri(rc)) + ca_detail = ca.create_detail() + r_msg = yield parent.up_down_issue_query(rpkid = self.rpkid, ca = ca, ca_detail = ca_detail) + elt = r_msg.find(rpki.up_down.tag_class).find(rpki.up_down.tag_certificate) + uri = elt.get("cert_url") + cert = rpki.x509.X509(Base64 = elt.text) + logger.debug("%r: %r received certificate %s", self, ca, uri) + yield ca_detail.activate(rpkid = self.rpkid, ca = ca, cert = cert, uri = uri) + + @tornado.gen.coroutine + def update(self, parent, rc, class_name, ca): + + # pylint: disable=C0330 + + logger.debug("%r: Checking updates for %r", self, ca) + + sia_uri = parent.construct_sia_uri(rc) + sia_uri_changed = ca.sia_uri != sia_uri + + if sia_uri_changed: + logger.debug("SIA changed: was %s now %s", ca.sia_uri, sia_uri) + ca.sia_uri = sia_uri + + rc_resources = rpki.resource_set.resource_bag( + asn = rc.get("resource_set_as"), + v4 = rc.get("resource_set_ipv4"), + v6 = rc.get("resource_set_ipv6"), + valid_until = rc.get("resource_set_notafter")) + + cert_map = {} + + for c in rc.getiterator(rpki.up_down.tag_certificate): + x = rpki.x509.X509(Base64 = c.text) + u = rpki.up_down.multi_uri(c.get("cert_url")).rsync() + cert_map[x.gSKI()] = (x, u) + + ca_details = ca.ca_details.exclude(state = "revoked") + + if not ca_details: + logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", + class_name, parent.tenant.tenant_handle, parent.parent_handle) + yield ca.rekey(rpkid = self.rpkid) + return + + for ca_detail in ca_details: + + rc_cert, rc_cert_uri = cert_map.pop(ca_detail.public_key.gSKI(), (None, None)) + + if rc_cert is None: + logger.warning("g(SKI) %s in resource class %s is in database but missing from list_response to %s from %s, " + "maybe parent certificate went away?", + ca_detail.public_key.gSKI(), class_name, parent.tenant.tenant_handle, parent.parent_handle) + publisher = rpki.rpkid.publication_queue(rpkid = self.rpkid) + ca_detail.destroy(publisher = publisher) + yield publisher.call_pubd() + continue + + if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert_uri: + logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert_uri) + ca_detail.ca_cert_uri = rc_cert_uri + ca_detail.save() + + if ca_detail.state not in ("pending", "active"): + continue + + if ca_detail.state == "pending": + current_resources = rpki.resource_set.resource_bag() + else: + current_resources = ca_detail.latest_ca_cert.get_3779resources() + + if (ca_detail.state == "pending" or + sia_uri_changed or + ca_detail.latest_ca_cert != rc_cert or + ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or + current_resources.undersized(rc_resources) or + current_resources.oversized(rc_resources)): + + yield ca_detail.update( + rpkid = self.rpkid, + parent = parent, + ca = ca, + rc = rc, + sia_uri_changed = sia_uri_changed, + old_resources = current_resources) + + if cert_map: + logger.warning("Unknown certificate g(SKI)%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?", + "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle) @queue_task class UpdateChildrenTask(AbstractTask): - """ - Check for updated IRDB data for all of this self's children and - issue new certs as necessary. Must handle changes both in - resources and in expiration date. - """ - - def clear(self): - self.now = None - self.rsn = None - self.publisher = None - self.iterator = None - self.child = None - self.child_certs = None - - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating children", self.self_handle, self.self_id) - self.now = rpki.sundial.now() - self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) - self.publisher = rpki.rpkid.publication_queue() - rpki.async.iterator(self.children, self.loop, self.done) - - def loop(self, iterator, child): - self.gctx.checkpoint() - self.gctx.sql.sweep() - self.iterator = iterator - self.child = child - self.child_certs = child.child_certs - if self.overdue: - self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed) - else: - self.do_child() - - def do_child(self): - if self.child_certs: - self.gctx.irdb_query_child_resources(self.child.self.self_handle, self.child.child_handle, - self.got_resources, self.lose) - else: - self.iterator() - - def lose(self, e): - logger.exception("Couldn't update child %r, skipping", self.child) - self.iterator() - - def got_resources(self, irdb_resources): - try: - for child_cert in self.child_certs: - ca_detail = child_cert.ca_detail - ca = ca_detail.ca - if ca_detail.state == "active": - old_resources = child_cert.cert.get_3779resources() - new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() - old_aia = child_cert.cert.get_AIA()[0] - new_aia = ca_detail.ca_cert_uri - - if new_resources.empty(): - logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - child_cert.revoke(publisher = self.publisher) - ca_detail.generate_crl(publisher = self.publisher) - ca_detail.generate_manifest(publisher = self.publisher) - - elif (old_resources != new_resources or - old_aia != new_aia or - (old_resources.valid_until < self.rsn and - irdb_resources.valid_until > self.now and - old_resources.valid_until != irdb_resources.valid_until)): - - logger.debug("Need to reissue child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - if old_resources != new_resources: - logger.debug("Child %s SKI %s resources changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources, new_resources) - if old_resources.valid_until != irdb_resources.valid_until: - logger.debug("Child %s SKI %s validity changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - - new_resources.valid_until = irdb_resources.valid_until - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources, - publisher = self.publisher) - - elif old_resources.valid_until < self.now: - logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - child_cert.sql_delete() - self.publisher.withdraw( - cls = rpki.publication.certificate_elt, - uri = child_cert.uri, - obj = child_cert.cert, - repository = ca.parent.repository) - ca_detail.generate_manifest(publisher = self.publisher) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - self.gctx.checkpoint() - self.lose(e) - else: - self.gctx.checkpoint() - self.gctx.sql.sweep() - self.iterator() - - def done(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() - self.publisher.call_pubd(self.exit, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() + """ + Check for updated IRDB data for all of this tenant's children and + issue new certs as necessary. Must handle changes both in + resources and in expiration date. + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Updating children", self) + now = rpki.sundial.now() + rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin) + publisher = rpki.rpkid.publication_queue(self.rpkid) + postponing = False + + child_certs = rpki.rpkidb.models.ChildCert.objects.filter(child__tenant = self.tenant, ca_detail__state = "active") + child_handles = sorted(set(child_cert.child.child_handle for child_cert in child_certs)) + irdb_resources = dict(zip(child_handles, (yield self.rpkid.irdb_query_children_resources(self.tenant.tenant_handle, child_handles)))) + + for child_cert in child_certs: + try: + ca_detail = child_cert.ca_detail + child_handle = child_cert.child.child_handle + old_resources = child_cert.cert.get_3779resources() + new_resources = old_resources & irdb_resources[child_handle] & ca_detail.latest_ca_cert.get_3779resources() + old_aia = child_cert.cert.get_AIA()[0] + new_aia = ca_detail.ca_cert_uri + + assert child_cert.gski == child_cert.cert.gSKI() + + if new_resources.empty(): + logger.debug("Resources shrank to null set, revoking and withdrawing child %s g(SKI) %s", + child_handle, child_cert.gski) + child_cert.revoke(publisher = publisher) + ca_detail.generate_crl_and_manifest(publisher = publisher) + + elif (old_resources != new_resources or old_aia != new_aia or + (old_resources.valid_until < rsn and + irdb_resources[child_handle].valid_until > now and + old_resources.valid_until != irdb_resources[child_handle].valid_until)): + logger.debug("Need to reissue child %s certificate g(SKI) %s", child_handle, + child_cert.gski) + if old_resources != new_resources: + logger.debug("Child %s g(SKI) %s resources changed: old %s new %s", + child_handle, child_cert.gski, old_resources, new_resources) + if old_resources.valid_until != irdb_resources[child_handle].valid_until: + logger.debug("Child %s g(SKI) %s validity changed: old %s new %s", + child_handle, child_cert.gski, old_resources.valid_until, + irdb_resources[child_handle].valid_until) + + new_resources.valid_until = irdb_resources[child_handle].valid_until + child_cert.reissue(ca_detail = ca_detail, resources = new_resources, publisher = publisher) + + elif old_resources.valid_until < now: + logger.debug("Child %s certificate g(SKI) %s has expired: cert.valid_until %s, irdb.valid_until %s", + child_handle, child_cert.gski, old_resources.valid_until, + irdb_resources[child_handle].valid_until) + child_cert.delete() + publisher.queue(uri = child_cert.uri, + old_obj = child_cert.cert, + repository = ca_detail.ca.parent.repository) + ca_detail.generate_crl_and_manifest(publisher = publisher) + + except: + logger.exception("%r: Couldn't update %r, skipping", self, child_cert) + + finally: + if (yield self.overdue()): + postponing = True + break + try: + yield publisher.call_pubd() + except: + logger.exception("%r: Couldn't publish, skipping", self) -@queue_task -class UpdateROAsTask(AbstractTask): - """ - Generate or update ROAs for this self. - """ - - def clear(self): - self.orphans = None - self.updates = None - self.publisher = None - self.ca_details = None - self.count = None - - def start(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() - logger.debug("Self %s[%d] updating ROAs", self.self_handle, self.self_id) - - logger.debug("Issuing query for ROA requests") - self.gctx.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed) - - def got_roa_requests(self, roa_requests): - self.gctx.checkpoint() - logger.debug("Received response to query for ROA requests") - - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - - roas = {} - seen = set() - self.orphans = [] - self.updates = [] - self.publisher = rpki.rpkid.publication_queue() - self.ca_details = set() - - for roa in self.roas: - k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) - if k not in roas: - roas[k] = roa - elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and - (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): - self.orphans.append(roas[k]) - roas[k] = roa - else: - self.orphans.append(roa) - - for roa_request in roa_requests: - k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) - if k in seen: - logger.warning("Skipping duplicate ROA request %r", roa_request) - else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - logger.debug("Created new %r", roa) - else: - logger.debug("Found existing %r", roa) - self.updates.append(roa) - - self.orphans.extend(roas.itervalues()) - - if self.overdue: - self.postpone(self.begin_loop) - else: - self.begin_loop() - - def begin_loop(self): - self.count = 0 - rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True) - - def loop(self, iterator, roa): - self.gctx.checkpoint() - try: - roa.update(publisher = self.publisher, fast = True) - self.ca_details.add(roa.ca_detail) - self.gctx.sql.sweep() - except (SystemExit, rpki.async.ExitNow): - raise - except rpki.exceptions.NoCoveringCertForROA: - logger.warning("No covering certificate for %r, skipping", roa) - except Exception: - logger.exception("Could not update %r, skipping", roa) - self.count += 1 - if self.overdue: - self.publish(lambda: self.postpone(iterator)) - else: - iterator() - - def publish(self, done): - if not self.publisher.empty(): - for ca_detail in self.ca_details: - logger.debug("Generating new CRL for %r", ca_detail) - ca_detail.generate_crl(publisher = self.publisher) - logger.debug("Generating new manifest for %r", ca_detail) - ca_detail.generate_manifest(publisher = self.publisher) - self.ca_details.clear() - self.gctx.sql.sweep() - self.gctx.checkpoint() - self.publisher.call_pubd(done, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() - - def done(self): - for roa in self.orphans: - try: - self.ca_details.add(roa.ca_detail) - roa.revoke(publisher = self.publisher, fast = True) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not revoke %r", roa) - self.gctx.sql.sweep() - self.gctx.checkpoint() - self.publish(self.exit) - - def roa_requests_failed(self, e): - logger.exception("Could not fetch ROA requests for %s, skipping", self.self_handle) - self.exit() + if postponing: + raise PostponeTask @queue_task -class UpdateGhostbustersTask(AbstractTask): - """ - Generate or update Ghostbuster records for this self. - - This was originally based on the ROA update code. It's possible - that both could benefit from refactoring, but at this point the - potential scaling issues for ROAs completely dominate structure of - the ROA code, and aren't relevant here unless someone is being - exceptionally silly. - """ - - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating Ghostbuster records", - self.self_handle, self.self_id) - - self.gctx.irdb_query_ghostbuster_requests(self.self_handle, - (p.parent_handle for p in self.parents), - self.got_ghostbuster_requests, - self.ghostbuster_requests_failed) - - def got_ghostbuster_requests(self, ghostbuster_requests): - - try: - self.gctx.checkpoint() - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - - ghostbusters = {} - orphans = [] - publisher = rpki.rpkid.publication_queue() - ca_details = set() - seen = set() - - parents = dict((p.parent_handle, p) for p in self.parents) - - for ghostbuster in self.ghostbusters: - k = (ghostbuster.ca_detail_id, ghostbuster.vcard) - if ghostbuster.ca_detail.state != "active" or k in ghostbusters: - orphans.append(ghostbuster) - else: - ghostbusters[k] = ghostbuster - - for ghostbuster_request in ghostbuster_requests: - if ghostbuster_request.parent_handle not in parents: - logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", ghostbuster_request.parent_handle) - continue - k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard) - if k in seen: - logger.warning("Skipping duplicate Ghostbuster request %r", ghostbuster_request) - continue - seen.add(k) - for ca in parents[ghostbuster_request.parent_handle].cas: - ca_detail = ca.active_ca_detail - if ca_detail is not None: - ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None) - if ghostbuster is None: - ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard) - logger.debug("Created new %r for %r", ghostbuster, ghostbuster_request.parent_handle) +class UpdateROAsTask(AbstractTask): + """ + Generate or update ROAs for this tenant. + """ + + # XXX This might need rewriting to avoid race conditions. + # + # There's a theoretical race condition here if we're chugging away + # and something else needs to update the manifest or CRL, or if + # some back-end operation generates or destroys ROAs. The risk is + # fairly low given that we defer CRL and manifest generation until + # we're ready to publish, but it's theoretically present. + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Updating ROAs", self) + + try: + r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) + except: + logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) + return + + logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) + + roas = {} + seen = set() + orphans = [] + creates = [] + updates = [] + publisher = rpki.rpkid.publication_queue(self.rpkid) + ca_details = set() + + for roa in self.tenant.roas.all(): + k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6) + if k not in roas: + roas[k] = roa + elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"): + orphans.append(roas[k]) + roas[k] = roa + else: + orphans.append(roa) + + for r_pdu in r_msg: + k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) + if k in seen: + logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) + continue + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + logger.debug("%r: Try to create %r", self, roa) + creates.append(roa) else: - logger.debug("Found existing %r for %s", ghostbuster, ghostbuster_request.parent_handle) - ghostbuster.update(publisher = publisher, fast = True) - ca_details.add(ca_detail) + logger.debug("%r: Found existing %r", self, roa) + updates.append(roa) + + orphans.extend(roas.itervalues()) + + roas = creates + updates + + r_msg = seen = creates = updates = None - orphans.extend(ghostbusters.itervalues()) - for ghostbuster in orphans: - ca_details.add(ghostbuster.ca_detail) - ghostbuster.revoke(publisher = publisher, fast = True) + postponing = False - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) + while roas and not postponing: + if (yield self.overdue()): + postponing = True + break + roa = roas.pop(0) + try: + roa.update(publisher = publisher) + ca_details.add(roa.ca_detail) + except rpki.exceptions.NoCoveringCertForROA: + logger.warning("%r: No covering certificate for %r, skipping", self, roa) + except: + logger.exception("%r: Could not update %r, skipping", self, roa) - self.gctx.sql.sweep() + if not postponing: + for roa in orphans: + try: + ca_details.add(roa.ca_detail) + roa.revoke(publisher = publisher) + except: + logger.exception("%r: Could not revoke %r", self, roa) - self.gctx.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) + if not publisher.empty(): + for ca_detail in ca_details: + logger.debug("%r: Generating new CRL and manifest for %r", self, ca_detail) + ca_detail.generate_crl_and_manifest(publisher = publisher) + yield publisher.call_pubd() - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update Ghostbuster records for %s, skipping", self.self_handle) - self.exit() + if postponing: + raise PostponeTask - def publication_failed(self, e): - logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() - def ghostbuster_requests_failed(self, e): - logger.exception("Could not fetch Ghostbuster record requests for %s, skipping", self.self_handle) - self.exit() +@queue_task +class UpdateGhostbustersTask(AbstractTask): + """ + Generate or update Ghostbuster records for this tenant. + + This was originally based on the ROA update code. It's possible + that both could benefit from refactoring, but at this point the + potential scaling issues for ROAs completely dominate structure of + the ROA code, and aren't relevant here unless someone is being + exceptionally silly. + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Updating Ghostbuster records", self) + parent_handles = set(p.parent_handle for p in rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant)) + + try: + r_msg = yield self.rpkid.irdb_query_ghostbuster_requests(self.tenant.tenant_handle, parent_handles) + + ghostbusters = {} + orphans = [] + publisher = rpki.rpkid.publication_queue(self.rpkid) + ca_details = set() + seen = set() + + for ghostbuster in self.tenant.ghostbusters.all(): + k = (ghostbuster.ca_detail.pk, ghostbuster.vcard) + if ghostbuster.ca_detail.state != "active" or k in ghostbusters: + orphans.append(ghostbuster) + else: + ghostbusters[k] = ghostbuster + + for r_pdu in r_msg: + if not rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant, parent_handle = r_pdu.get("parent_handle")).exists(): + logger.warning("%r: Unknown parent_handle %r in Ghostbuster request, skipping", self, r_pdu.get("parent_handle")) + continue + k = (r_pdu.get("parent_handle"), r_pdu.text) + if k in seen: + logger.warning("%r: Skipping duplicate Ghostbuster request %r", self, r_pdu) + continue + seen.add(k) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), + ca__parent__tenant = self.tenant, + state = "active"): + ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None) + if ghostbuster is None: + ghostbuster = rpki.rpkidb.models.Ghostbuster(tenant = self.tenant, ca_detail = ca_detail, vcard = r_pdu.text) + logger.debug("%r: Created new %r for %r", self, ghostbuster, r_pdu.get("parent_handle")) + else: + logger.debug("%r: Found existing %r for %r", self, ghostbuster, r_pdu.get("parent_handle")) + ghostbuster.update(publisher = publisher) + ca_details.add(ca_detail) + + orphans.extend(ghostbusters.itervalues()) + for ghostbuster in orphans: + ca_details.add(ghostbuster.ca_detail) + ghostbuster.revoke(publisher = publisher) + + for ca_detail in ca_details: + ca_detail.generate_crl_and_manifest(publisher = publisher) + + yield publisher.call_pubd() + + except: + logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle) @queue_task class UpdateEECertificatesTask(AbstractTask): - """ - Generate or update EE certificates for this self. - - Not yet sure what kind of scaling constraints this task might have, - so keeping it simple for initial version, we can optimize later. - """ - - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating EE certificates", self.self_handle, self.self_id) - - self.gctx.irdb_query_ee_certificate_requests(self.self_handle, - self.got_requests, - self.get_requests_failed) - - def got_requests(self, requests): - - try: - self.gctx.checkpoint() - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - - publisher = rpki.rpkid.publication_queue() - - existing = dict() - for ee in self.ee_certificates: - gski = ee.gski - if gski not in existing: - existing[gski] = set() - existing[gski].add(ee) - - ca_details = set() - - for req in requests: - ees = existing.pop(req.gski, ()) - resources = rpki.resource_set.resource_bag( - asn = req.asn, - v4 = req.ipv4, - v6 = req.ipv6, - valid_until = req.valid_until) - covering = self.find_covering_ca_details(resources) - ca_details.update(covering) - - for ee in ees: - if ee.ca_detail in covering: - logger.debug("Updating existing EE certificate for %s %s", - req.gski, resources) - ee.reissue( - resources = resources, - publisher = publisher) - covering.remove(ee.ca_detail) - else: - logger.debug("Existing EE certificate for %s %s is no longer covered", - req.gski, resources) - ee.revoke(publisher = publisher) - - for ca_detail in covering: - logger.debug("No existing EE certificate for %s %s", - req.gski, resources) - rpki.rpkid.ee_cert_obj.create( - ca_detail = ca_detail, - subject_name = rpki.x509.X501DN.from_cn(req.cn, req.sn), - subject_key = req.pkcs10.getPublicKey(), - resources = resources, - publisher = publisher, - eku = req.eku or None) - - # Anything left is an orphan - for ees in existing.values(): - for ee in ees: - ca_details.add(ee.ca_detail) - ee.revoke(publisher = publisher) - - self.gctx.sql.sweep() - - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - - self.gctx.sql.sweep() - - self.gctx.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update EE certificates for %s, skipping", self.self_handle) - self.exit() - - def publication_failed(self, e): - logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() - - def get_requests_failed(self, e): - logger.exception("Could not fetch EE certificate requests for %s, skipping", self.self_handle) - self.exit() + """ + Generate or update EE certificates for this tenant. + + Not yet sure what kind of scaling constraints this task might have, + so keeping it simple for initial version, we can optimize later. + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Updating EE certificates", self) + + try: + r_msg = yield self.rpkid.irdb_query_ee_certificate_requests(self.tenant.tenant_handle) + + publisher = rpki.rpkid.publication_queue(self.rpkid) + + logger.debug("%r: Examining EE certificate requests", self) + + existing = dict() + for ee in self.tenant.ee_certificates.all(): + gski = ee.gski + if gski not in existing: + existing[gski] = set() + existing[gski].add(ee) + + ca_details = set() + + for r_pdu in r_msg: + gski = r_pdu.get("gski") + ees = existing.pop(gski, ()) + + resources = rpki.resource_set.resource_bag( + asn = r_pdu.get("asn"), + v4 = r_pdu.get("ipv4"), + v6 = r_pdu.get("ipv6"), + valid_until = r_pdu.get("valid_until")) + covering = self.tenant.find_covering_ca_details(resources) + ca_details.update(covering) + + for ee in ees: + if ee.ca_detail in covering: + logger.debug("%r: Updating %r for %s %s", self, ee, gski, resources) + ee.reissue(resources = resources, publisher = publisher) + covering.remove(ee.ca_detail) + else: + # This probably never happens, as the most likely cause would be a CA certificate + # being revoked, which should trigger automatic clean up of issued certificates. + logger.debug("%r: %r for %s %s is no longer covered", self, ee, gski, resources) + ca_details.add(ee.ca_detail) + ee.revoke(publisher = publisher) + + subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn")) + subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey() + + for ca_detail in covering: + logger.debug("%r: No existing EE certificate for %s %s", self, gski, resources) + cn, sn = subject_name.extract_cn_and_sn() + cert = ca_detail.issue_ee( + ca = ca_detail.ca, + subject_key = subject_key, + sia = None, + resources = resources, + notAfter = resources.valid_until, + cn = cn, + sn = sn, + eku = r_pdu.get("eku", "").split(",") or None) + ee = rpki.rpkidb.models.EECertificate.objects.create( + tenant = ca_detail.ca.parent.tenant, + ca_detail = ca_detail, + cert = cert, + gski = subject_key.gSKI()) + publisher.queue( + uri = ee.uri, + new_obj = cert, + repository = ca_detail.ca.parent.repository, + handler = ee.published_callback) + + # Anything left is an orphan + for ees in existing.values(): + for ee in ees: + ca_details.add(ee.ca_detail) + ee.revoke(publisher = publisher) + + for ca_detail in ca_details: + ca_detail.generate_crl_and_manifest(publisher = publisher) + + yield publisher.call_pubd() + + except: + logger.exception("%r: Could not update EE certificates, skipping", self) @queue_task class RegenerateCRLsAndManifestsTask(AbstractTask): - """ - Generate new CRLs and manifests as necessary for all of this self's - CAs. Extracting nextUpdate from a manifest is hard at the moment - due to implementation silliness, so for now we generate a new - manifest whenever we generate a new CRL - - This code also cleans up tombstones left behind by revoked ca_detail - objects, since we're walking through the relevant portions of the - database anyway. - """ - - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] regenerating CRLs and manifests", - self.self_handle, self.self_id) - - now = rpki.sundial.now() - crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval) - regen_margin = max(self.gctx.cron_period * 2, crl_interval / 4) - publisher = rpki.rpkid.publication_queue() - - for parent in self.parents: - for ca in parent.cas: + """ + Generate new CRLs and manifests as necessary for all of this tenant's + CAs. Extracting nextUpdate from a manifest is hard at the moment + due to implementation silliness, so for now we generate a new + manifest whenever we generate a new CRL + + This code also cleans up tombstones left behind by revoked ca_detail + objects, since we're walking through the relevant portions of the + database anyway. + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Regenerating CRLs and manifests", self) + try: - for ca_detail in ca.revoked_ca_details: - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.delete(ca = ca, publisher = publisher) - for ca_detail in ca.active_or_deprecated_ca_details: - if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca) - - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.lose) - - def lose(self, e): - logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() + publisher = rpki.rpkid.publication_queue(self.rpkid) + now = rpki.sundial.now() + + ca_details = rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, + next_crl_manifest_update__isnull = False) + + for ca_detail in ca_details.filter(next_crl_manifest_update__lt = now, + state = "revoked"): + ca_detail.destroy(publisher = publisher) + + for ca_detail in ca_details.filter(state__in = ("active", "deprecated"), + next_crl_manifest_update__lt = now + max( + rpki.sundial.timedelta(seconds = self.tenant.crl_interval) / 4, + rpki.sundial.timedelta(seconds = self.rpkid.cron_period ) * 2)): + ca_detail.generate_crl_and_manifest(publisher = publisher) + + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self) @queue_task class CheckFailedPublication(AbstractTask): - """ - Periodic check for objects we tried to publish but failed (eg, due - to pubd being down or unreachable). - """ - - def start(self): - publisher = rpki.rpkid.publication_queue() - for parent in self.parents: - for ca in parent.cas: - ca_detail = ca.active_ca_detail - if ca_detail is not None: - ca_detail.check_failed_publication(publisher) - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() - self.exit() + """ + Periodic check for objects we tried to publish but failed (eg, due + to pubd being down or unreachable). + """ + + @tornado.gen.coroutine + def main(self): + logger.debug("%r: Checking for failed publication actions", self) + + try: + publisher = rpki.rpkid.publication_queue(self.rpkid) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): + ca_detail.check_failed_publication(publisher) + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't run failed publications, skipping", self) |