aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-17 23:50:13 +0000
committerRob Austein <sra@hactrn.net>2012-08-17 23:50:13 +0000
commitc57891061967171f5914cd1002273f1a7024a084 (patch)
treef96c2ef35e683c59511fc1c52126c6167af74dbb
parent7073c95e8c4b008d4650a9076d80fa401239855e (diff)
Checkpoint. New class-based tasks, not yet tested, rest of code not
yet adapted to match. svn path=/branches/tk274/; revision=4635
-rw-r--r--rpkid/rpki/rpkid_tasks.py505
1 files changed, 505 insertions, 0 deletions
diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py
new file mode 100644
index 00000000..c1f2cd7b
--- /dev/null
+++ b/rpkid/rpki/rpkid_tasks.py
@@ -0,0 +1,505 @@
+"""
+rpkid task objects. Split out from rpki.left_right and rpki.rpkid
+because interactions with rpkid scheduler were getting too complicated.
+
+$Id$
+
+Copyright (C) 2012 Internet Systems Consortium ("ISC")
+
+Permission to use, copy, modify, and distribute this software for any
+purpose with or without fee is hereby granted, provided that the above
+copyright notice and this permission notice appear in all copies.
+
+THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
+"""
+
+class CompletionHandler(object):
+ """
+ Track one or more scheduled rpkid tasks and execute a callback when
+ the last of them terminates.
+ """
+
+ def __init__(self, cb):
+ self.cb = cb
+ self.tasks = set()
+
+ def register(self, task):
+ self.tasks.add(task)
+ task.register_completion(self.complete)
+
+ def complete(self, task):
+ try:
+ self.tasks.remove(task)
+ except KeyError:
+ rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task))
+ if not self.tasks:
+ self.cb()
+
+class AbstractTask(object):
+ """
+ Abstract base class for rpkid scheduler task objects. This just
+ handles the scheduler hooks, real work starts in self.start.
+
+ NB: This assumes that the rpki.rpkid.rpkid.task_* methods have been
+ rewritten to expect instances of subclasses of this class, rather
+ than expecting thunks to be wrapped up in the older version of this
+ class. Rewrite, rewrite, remove this comment when done, OK!
+ """
+
+ def __init__(self, s, description = None):
+ self.self = s
+ self.description = description
+ self.completions = []
+ self.continuation = None
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.description)
+
+ def register_completion(self, completion):
+ self.completions.append(completion)
+
+ def exit(self):
+ for completion in self.completions:
+ completion()
+ self.self.gctx.task_next()
+
+ def postpone(self, continuation):
+ self.continuation = continuation
+ self.self.gctx.task_add(self)
+ self.self.gctx.task_next()
+
+ def __call__(self):
+ if self.continuation is None:
+ rpki.log.debug("Running task %r" % self)
+ self.start()
+ else:
+ rpki.log.debug("Restarting task %r at " % (self, self.continuation))
+ continuation = self.continuation
+ self.continuation = None
+ continuation()
+
+ def __getattr__(self, name):
+ return getattr(self.self, name)
+
+ def start(self):
+ raise NotImplementedError
+
+
+class PollParentTask(AbstractTask):
+ """
+ Run the regular client poll cycle with each of this self's
+ parents, in turn.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] polling parents" % (self.self_handle, self.self_id))
+ rpki.async.iterator(self.parents, self.parent_loop, self.exit)
+
+ def parent_loop(self, parent_iterator, parent):
+ self.parent_iterator = parent_iterator
+ self.parent = parent
+ rpki.up_down.list_pdu.query(parent, self.got_list, self.list_failed)
+
+ def got_list(self, r_msg):
+ self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas)
+ self.gctx.checkpoint()
+ rpki.async.iterator(r_msg.payload.classes, self.class_loop, self.class_done)
+
+ def list_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s (%r)" % (
+ self.parent, e, e))
+ self.parent_iterator()
+
+ def class_loop(self, class_iterator, rc):
+ self.gctx.checkpoint()
+ self.class_iterator = class_iterator
+ try:
+ ca = self.ca_map.pop(rc.class_name)
+ except KeyError:
+ rpki.rpkid.ca_obj.create(self.parent, rc, class_iterator, self.class_create_failed)
+ else:
+ ca.check_for_updates(self.parent, rc, class_iterator, self.class_update_failed)
+
+ def class_update_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't update class, skipping: %s" % e)
+ self.class_iterator()
+
+ def class_create_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't create class, skipping: %s" % e)
+ self.class_iterator()
+
+ def class_done(self):
+ rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done)
+
+ def ca_loop(self, iterator, ca):
+ self.gctx.checkpoint()
+ ca.delete(parent, iterator)
+
+ def ca_done(self):
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.parent_iterator()
+
+
+class UpdateChildrenTask(AbstractTask):
+ """
+ Check for updated IRDB data for all of this self's children and
+ issue new certs as necessary. Must handle changes both in
+ resources and in expiration date.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id))
+
+ self.now = rpki.sundial.now()
+ self.rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
+ self.publisher = rpki.rpkid.publication_queue()
+ rpki.async.iterator(self.children, self.loop, self.done)
+
+ def loop(self, iterator, child):
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.iterator = iterator
+ self.child = child
+ self.child_certs = child.child_certs
+ if self.child_certs:
+ self.gctx.irdb_query_child_resources(child.self.self_handle, child.child_handle, self.got_resources, self.lose)
+ else:
+ iterator()
+
+ def lose(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't update child %r, skipping: %s" % (self.child, e))
+ self.iterator()
+
+ def got_resources(self, irdb_resources):
+ try:
+ for child_cert in self.child_certs:
+ ca_detail = child_cert.ca_detail
+ ca = ca_detail.ca
+ if ca_detail.state == "active":
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = irdb_resources.intersection(old_resources).intersection(ca_detail.latest_ca_cert.get_3779resources())
+
+ if new_resources.empty():
+ rpki.log.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
+ child_cert.revoke(publisher = self.publisher)
+ ca_detail.generate_crl(publisher = self.publisher)
+ ca_detail.generate_manifest(publisher = self.publisher)
+
+ elif old_resources != new_resources or (old_resources.valid_until < self.rsn and irdb_resources.valid_until > self.now):
+ rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
+ child_cert.reissue(
+ ca_detail = ca_detail,
+ resources = new_resources,
+ publisher = self.publisher)
+
+ elif old_resources.valid_until < self.now:
+ rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
+ % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
+ child_cert.sql_delete()
+ self.publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository)
+ ca_detail.generate_manifest(publisher = self.publisher)
+
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ self.gctx.checkpoint()
+ lose(e)
+ else:
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.iterator()
+
+ def done():
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ publisher.call_pubd(self.exit, self.publication_failed)
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+
+class UpdateROAsTask(AbstractTask):
+ """
+ Generate or update ROAs for this self.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ rpki.log.debug("Self %s[%d] updating ROAs" % (self.self_handle, self.self_id))
+
+ rpki.log.debug("Issuing query for ROA requests")
+ self.gctx.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed)
+
+ def got_roa_requests(self, roa_requests):
+ self.gctx.checkpoint()
+ rpki.log.debug("Received response to query for ROA requests")
+
+ if self.gctx.sql.dirty:
+ rpki.log.warn("Unexpected dirty SQL cache, flushing")
+ self.gctx.sql.sweep()
+
+ roas = {}
+ seen = set()
+ self.orphans = []
+ self.updates = []
+ self.publisher = rpki.rpkid.publication_queue()
+ self.ca_details = set()
+
+ for roa in self.roas:
+ k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
+ if k not in roas:
+ roas[k] = roa
+ elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and
+ (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")):
+ self.orphans.append(roas[k])
+ roas[k] = roa
+ else:
+ self.orphans.append(roa)
+
+ for roa_request in roa_requests:
+ rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % (
+ len(roa_requests), len(roas), len(self.orphans), len(self.updates), self.publisher.size,
+ len(self.ca_details), len(seen), len(self.gctx.sql.cache)))
+ k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
+ if k in seen:
+ rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ self.updates.append(roa)
+
+ self.orphans.extend(roas.itervalues())
+
+ self.count = 0
+ rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True)
+
+ def loop(self, iterator, roa):
+ self.gctx.checkpoint()
+ rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % (
+ len(self.updates), len(self.orphans), self.publisher.size, len(self.ca_details), len(self.gctx.sql.cache)))
+ try:
+ roa.update(publisher = self.publisher, fast = True)
+ self.ca_details.add(roa.ca_detail)
+ self.gctx.sql.sweep()
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except rpki.exceptions.NoCoveringCertForROA:
+ rpki.log.warn("No covering certificate for %r, skipping" % roa)
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
+ self.count += 1
+ if max_new_roas_at_once is not None and self.count % max_new_roas_at_once == 0:
+ self.publish(lambda: self.postpone(iterator))
+ else:
+ iterator()
+
+ def publish(self, done):
+ if publisher.size > 0:
+ for ca_detail in self.ca_details:
+ rpki.log.debug("Generating new CRL for %r" % ca_detail)
+ ca_detail.generate_crl(publisher = self.publisher)
+ rpki.log.debug("Generating new manifest for %r" % ca_detail)
+ ca_detail.generate_manifest(publisher = self.publisher)
+ self.ca_details.clear()
+ rpki.log.debug("Sweeping")
+ self.gctx.sql.sweep()
+ rpki.log.debug("Done sweeping")
+ self.gctx.checkpoint()
+ rpki.log.debug("Starting publication")
+ self.publisher.call_pubd(done, self.publication_failed)
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+ def done(self):
+ for roa in orphans:
+ try:
+ ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = publisher, fast = True)
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not revoke %r: %s" % (roa, e))
+ self.gctx.sql.sweep()
+ self.gctx.checkpoint()
+ self.publish(self.exit)
+
+ def roa_requests_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+
+class UpdateGhostbustersTask(AbstractTask):
+ """
+ Generate or update Ghostbuster records for this self.
+
+ This was originally based on the ROA update code. It's possible
+ that both could benefit from refactoring, but at this point the
+ potential scaling issues for ROAs completely dominate structure of
+ the ROA code, and aren't relevant here unless someone is being
+ exceptionally silly.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] updating Ghostbuster records" % (self.self_handle, self.self_id))
+
+ self.gctx.irdb_query_ghostbuster_requests(self.self_handle,
+ (p.parent_handle for p in self.parents),
+ self.got_ghostbuster_requests,
+ self.ghostbuster_requests_failed)
+
+ def got_ghostbuster_requests(self, ghostbuster_requests):
+
+ try:
+ self.gctx.checkpoint()
+ if self.gctx.sql.dirty:
+ rpki.log.warn("Unexpected dirty SQL cache, flushing")
+ self.gctx.sql.sweep()
+
+ ghostbusters = {}
+ orphans = []
+ publisher = rpki.rpkid.publication_queue()
+ ca_details = set()
+ seen = set()
+
+ parents = dict((p.parent_handle, p) for p in self.parents)
+
+ for ghostbuster in self.ghostbusters:
+ k = (ghostbuster.ca_detail_id, ghostbuster.vcard)
+ if ghostbuster.ca_detail.state != "active" or k in ghostbusters:
+ orphans.append(ghostbuster)
+ else:
+ ghostbusters[k] = ghostbuster
+
+ for ghostbuster_request in ghostbuster_requests:
+ if ghostbuster_request.parent_handle not in parents:
+ rpki.log.warn("Unknown parent_handle %r in Ghostbuster request, skipping" % ghostbuster_request.parent_handle)
+ continue
+ k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard)
+ if k in seen:
+ rpki.log.warn("Skipping duplicate Ghostbuster request %r" % ghostbuster_request)
+ continue
+ seen.add(k)
+ for ca in parents[ghostbuster_request.parent_handle].cas:
+ ca_detail = ca.active_ca_detail
+ if ca_detail is not None:
+ ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None)
+ if ghostbuster is None:
+ ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard)
+ rpki.log.debug("Created new Ghostbuster request for %r" % ghostbuster_request.parent_handle)
+ else:
+ rpki.log.debug("Found existing Ghostbuster request for %r" % ghostbuster_request.parent_handle)
+ ghostbuster.update(publisher = publisher, fast = True)
+ ca_details.add(ca_detail)
+
+ orphans.extend(ghostbusters.itervalues())
+ for ghostbuster in orphans:
+ ca_details.add(ghostbuster.ca_detail)
+ ghostbuster.revoke(publisher = publisher, fast = True)
+
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+
+ self.gctx.sql.sweep()
+
+ self.gctx.checkpoint()
+ publisher.call_pubd(self.exit, self.publication_failed)
+
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not update Ghostbuster records for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish Ghostbuster updates for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+ def ghostbuster_requests_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Could not fetch Ghostbuster record requests for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+class RegnerateCRLsAndManifestsTask(AbstractTask):
+ """
+ Generate new CRLs and manifests as necessary for all of this self's
+ CAs. Extracting nextUpdate from a manifest is hard at the moment
+ due to implementation silliness, so for now we generate a new
+ manifest whenever we generate a new CRL
+
+ This code also cleans up tombstones left behind by revoked ca_detail
+ objects, since we're walking through the relevant portions of the
+ database anyway.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id))
+
+ now = rpki.sundial.now()
+ regen_margin = rpki.sundial.timedelta(seconds = self.regen_margin)
+ publisher = rpki.rpkid.publication_queue()
+
+ for parent in self.parents:
+ for ca in parent.cas:
+ try:
+ for ca_detail in ca.revoked_ca_details:
+ if now > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.delete(ca = ca, publisher = publisher)
+ ca_detail = ca.active_ca_detail
+ if ca_detail is not None and now + regen_margin > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e))
+
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ publisher.call_pubd(self.exit, self.lose)
+
+ def lose(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish updated CRLs and manifests for self %r, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()