diff options
author | Rob Austein <sra@hactrn.net> | 2009-04-24 23:03:51 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-04-24 23:03:51 +0000 |
commit | 8542127f3bc823bbc7ff76e4ca8dc3bd0969318f (patch) | |
tree | 7b6a4faa5b62a03d432a6fc5a7e0be431b7aa65f /rpkid/rpki/rpki_engine.py | |
parent | 725bc9130dd16ffffa5277382b23cda6c069ef6c (diff) |
Checkpoint
svn path=/rpkid/pubd.py; revision=2355
Diffstat (limited to 'rpkid/rpki/rpki_engine.py')
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 381 |
1 files changed, 225 insertions, 156 deletions
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index 5f89c338..c373e856 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -41,16 +41,8 @@ class rpkid_context(object): self.publication_kludge_base = cfg.get("publication-kludge-base", "publication/") - def irdb_query(self, self_id, child_id = None): - """Perform an IRDB callback query. In the long run this should not - be a blocking routine, it should instead issue a query and set up a - handler to receive the response. For the moment, though, we are - doing simple lock step and damn the torpedos. Not yet doing - anything useful with subject name. Most likely this function should - really be wrapped up in a class that carries both the query result - and also the intermediate state needed for the event-driven code - that this function will need to become. - """ + def irdb_query(self, self_id, child_id, callback): + """Perform an IRDB callback query.""" rpki.log.trace() @@ -60,40 +52,52 @@ class rpkid_context(object): q_msg[0].self_id = self_id q_msg[0].child_id = child_id q_cms = rpki.left_right.cms_msg.wrap(q_msg, self.rpkid_key, self.rpkid_cert) - der = rpki.https.client( + + def unwrap(der): + r_msg = rpki.left_right.cms_msg.unwrap(der, (self.bpki_ta, self.irdb_cert)) + if len(r_msg) == 0 or not isinstance(r_msg[0], rpki.left_right.list_resources_elt) or r_msg.type != "reply": + raise rpki.exceptions.BadIRDBReply, "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii") + callback(rpki.resource_set.resource_bag( + asn = r_msg[0].asn, + v4 = r_msg[0].ipv4, + v6 = r_msg[0].ipv6, + valid_until = r_msg[0].valid_until)) + + rpki.https.client( server_ta = (self.bpki_ta, self.irdb_cert), client_key = self.rpkid_key, client_cert = self.rpkid_cert, url = self.irdb_url, - msg = q_cms) - r_msg = rpki.left_right.cms_msg.unwrap(der, (self.bpki_ta, self.irdb_cert)) - if len(r_msg) == 0 or not isinstance(r_msg[0], rpki.left_right.list_resources_elt) or r_msg.type != "reply": - raise rpki.exceptions.BadIRDBReply, "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii") - return rpki.resource_set.resource_bag( - asn = r_msg[0].asn, - v4 = r_msg[0].ipv4, - v6 = r_msg[0].ipv6, - valid_until = r_msg[0].valid_until) + msg = q_cms, + callback = unwrap) def left_right_handler(self, query, path, cb): """Process one left-right PDU.""" rpki.log.trace() + + def done(r_msg): + reply = rpki.left_right.cms_msg.wrap(r_msg, self.rpkid_key, self.rpkid_cert) + self.sql.sweep() + cb(200, reply) + try: self.sql.ping() q_msg = rpki.left_right.cms_msg.unwrap(query, (self.bpki_ta, self.irbe_cert)) if q_msg.type != "query": raise rpki.exceptions.BadQuery, "Message type is not query" - r_msg = q_msg.serve_top_level(self) - reply = rpki.left_right.cms_msg.wrap(r_msg, self.rpkid_key, self.rpkid_cert) - self.sql.sweep() - return 200, reply + q_msg.serve_top_level(self, done) except Exception, data: rpki.log.error(traceback.format_exc()) - return 500, "Unhandled exception %s" % data + cb(500, "Unhandled exception %s" % data) def up_down_handler(self, query, path, cb): """Process one up-down PDU.""" rpki.log.trace() + + def done(reply): + self.sql.sweep() + cb(200, reply) + try: self.sql.ping() child_id = path.partition("/up-down/")[2] @@ -102,12 +106,10 @@ class rpkid_context(object): child = rpki.left_right.child_elt.sql_fetch(self, long(child_id)) if child is None: raise rpki.exceptions.ChildNotFound, "Could not find child %s" % child_id - reply = child.serve_up_down(query) - self.sql.sweep() - return 200, reply + child.serve_up_down(query, done) except Exception, data: rpki.log.error(traceback.format_exc()) - return 400, "Could not process PDU: %s" % data + cb(400, "Could not process PDU: %s" % data) def cronjob_handler(self, query, path, cb): """Periodic tasks. This will need another rewrite once we have internal timers.""" @@ -115,31 +117,31 @@ class rpkid_context(object): rpki.log.trace() self.sql.ping() - def cronjob_do_one(iterator, s): + def each(iterator, s): - def client_poll(): + def one(): rpki.log.debug("Self %s polling parents" % s.self_id) - s.client_poll(update_children) + s.client_poll(two) - def update_children(): + def two(): rpki.log.debug("Self %s updating children" % s.self_id) - s.update_children(update_roas_crls_and_manifests) + s.update_children(three) - def update_roas_crls_and_manifests(): + def three(): rpki.log.debug("Self %s updating ROAs" % s.self_id) - s.update_roas() + s.update_roas(four) + + def four(): rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id) - s.regenerate_crls_and_manifests() - iterator() + s.regenerate_crls_and_manifests(iterator) - client_poll() + one() - def cronjob_done(): + def done(): self.sql.sweep() cb(200, "OK") - rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), - cronjob_do_one, cronjob_done) + rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), each, done) ## @var https_ta_cache # HTTPS trust anchor cache, to avoid regenerating it for every TLS connection. @@ -223,7 +225,7 @@ class ca_obj(rpki.sql.sql_persistant): raise rpki.exceptions.BadURISyntax, "SIA URI must end with a slash: %s" % sia_uri return sia_uri + str(self.ca_id) + "/" - def check_for_updates(self, parent, rc): + def check_for_updates(self, parent, rc, cb): """Parent has signaled continued existance of a resource class we already knew about, so we need to check for an updated certificate, changes in resource coverage, revocation and reissue @@ -239,37 +241,45 @@ class ca_obj(rpki.sql.sql_persistant): rc_resources = rc.to_resource_bag() cert_map = dict((c.cert.get_SKI(), c) for c in rc.certs) - for ca_detail in ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)): + def loop(iterator, ca_detail): ski = ca_detail.latest_ca_cert.get_SKI() if ski not in cert_map: rpki.log.warn("Certificate in database missing from list_response, class %s, SKI %s, maybe parent certificate went away?" % (repr(rc.class_name), ca_detail.latest_ca_cert.gSKI())) - ca_detail.delete(self, parent.repository()) - continue + return ca_detail.delete(self, parent.repository(), iterator) + + def cleanup(): + del cert_map[ski] + iterator() if ca_detail.state in ("pending", "active"): current_resources = ca_detail.latest_ca_cert.get_3779resources() - if sia_uri_changed or \ - ca_detail.latest_ca_cert != cert_map[ski].cert or \ - current_resources.undersized(rc_resources) or \ - current_resources.oversized(rc_resources): - ca_detail.update( + if (sia_uri_changed or + ca_detail.latest_ca_cert != cert_map[ski].cert or + current_resources.undersized(rc_resources) or + current_resources.oversized(rc_resources)): + return ca_detail.update( parent = parent, ca = self, rc = rc, sia_uri_changed = sia_uri_changed, - old_resources = current_resources) + old_resources = current_resources, + callback = cleanup) + + cleanup() - del cert_map[ski] + def done(): + if cert_map: + rpki.log.warn("Certificates in list_response missing from our database, class %s, SKIs %s" + % (repr(rc.class_name), ", ".join(c.cert.gSKI() for c in cert_map.values()))) + cb() - if cert_map: - rpki.log.warn("Certificates in list_response missing from our database, class %s, SKIs %s" - % (repr(rc.class_name), ", ".join(c.cert.gSKI() for c in cert_map.values()))) + rpki.async.iterator(ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)), loop, done) @classmethod - def create(cls, parent, rc): + def create(cls, parent, rc, cb): """Parent has signaled existance of a new resource class, so we need to create and set up a corresponding CA object. """ @@ -282,13 +292,14 @@ class ca_obj(rpki.sql.sql_persistant): self.sia_uri = self.construct_sia_uri(parent, rc) ca_detail = ca_detail_obj.create(self) - # This will need a callback when we go event-driven - issue_response = rpki.up_down.issue_pdu.query(parent, self, ca_detail) + def done(issue_response): + ca_detail.activate( + ca = self, + cert = issue_response.payload.classes[0].certs[0].cert, + uri = issue_response.payload.classes[0].certs[0].cert_url, + callback = cb) - ca_detail.activate( - ca = self, - cert = issue_response.payload.classes[0].certs[0].cert, - uri = issue_response.payload.classes[0].certs[0].cert_url) + rpki.up_down.issue_pdu.query(parent, self, ca_detail, done) def delete(self, parent): """The list of current resource classes received from parent does @@ -324,7 +335,7 @@ class ca_obj(rpki.sql.sql_persistant): self.sql_mark_dirty() return self.last_crl_sn - def rekey(self): + def rekey(self, cb): """Initiate a rekey operation for this ca. Generate a new keypair. Request cert from parent using new keypair. Mark result as our active ca_detail. Reissue all child certs issued by this @@ -337,22 +348,25 @@ class ca_obj(rpki.sql.sql_persistant): old_detail = self.fetch_active() new_detail = ca_detail_obj.create(self) - # This will need a callback when we go event-driven - issue_response = rpki.up_down.issue_pdu.query(parent, self, new_detail) + def done(issue_response): + new_detail.activate( + ca = self, + cert = issue_response.payload.classes[0].certs[0].cert, + uri = issue_response.payload.classes[0].certs[0].cert_url, + predecessor = old_detail, + callback = cb) - new_detail.activate( - ca = self, - cert = issue_response.payload.classes[0].certs[0].cert, - uri = issue_response.payload.classes[0].certs[0].cert_url, - predecessor = old_detail) + rpki.up_down.issue_pdu.query(parent, self, new_detail, done) - def revoke(self): + def revoke(self, cb): """Revoke deprecated ca_detail objects associated with this ca.""" rpki.log.trace() - for ca_detail in self.fetch_deprecated(): - ca_detail.revoke() + def loop(iterator, ca_detail): + ca_detail.revoke(iterator) + + rpki.async.iterator(self.fetch_deprecated(), loop, cb) class ca_detail_obj(rpki.sql.sql_persistant): """Internal CA detail object.""" @@ -408,40 +422,66 @@ class ca_detail_obj(rpki.sql.sql_persistant): """Return publication URI for this ca_detail's manifest.""" return ca.sia_uri + self.public_key.gSKI() + ".mnf" - def activate(self, ca, cert, uri, predecessor = None): + def activate(self, ca, cert, uri, predecessor = None, callback = None): """Activate this ca_detail.""" + assert callback is not None # hack to catch positional arguments + self.latest_ca_cert = cert self.ca_cert_uri = uri.rsync() self.generate_manifest_cert(ca) - self.generate_crl() - self.generate_manifest() - self.state = "active" - self.sql_mark_dirty() - if predecessor is not None: - predecessor.state = "deprecated" - predecessor.sql_mark_dirty() - for child_cert in predecessor.child_certs(): - child_cert.reissue(self) - for route_origin in predecessor.route_origins(): - route_origin.regenerate_roa() + def did_crl(*ignored): + self.generate_manifest(callback = did_manifest) + + def did_manifest(*ignored): + self.state = "active" + self.sql_mark_dirty() + if predecessor is None: + callback() + else: + predecessor.state = "deprecated" + predecessor.sql_mark_dirty() + rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs) + + def do_one_child_cert(iterator, child_cert): + child_cert.reissue(self, iterator) + + def done_child_certs(): + rpki.async.iterator(predecessor.route_origins(), do_one_route_origin, callback) + + def do_one_route_origin(iterator, route_origin): + route_origin.regenerate_roa(iterator) - def delete(self, ca, repository): + self.generate_crl(callback = did_crl) + + def delete(self, ca, repository, cb): """Delete this ca_detail and all of the certs it issued.""" - for child_cert in self.child_certs(): - repository.withdraw(child_cert.cert, child_cert.uri(ca)) - child_cert.sql_delete() - for revoked_cert in self.revoked_certs(): - revoked_cert.sql_delete() - for route_origin in self.route_origins(): - route_origin.withdraw_roa() - repository.withdraw(self.latest_manifest, self.manifest_uri(ca)) - repository.withdraw(self.latest_crl, self.crl_uri(ca)) - self.sql_delete() + def withdraw_one_child(iterator, child_cert): + repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator) - def revoke(self): + def child_certs_done(): + rpki.async.iterator(self.route_origins(), withdraw_one_roa, withdraw_manifest) + + def withdraw_one_roa(iterator, route_origin): + route_origin.withdraw_roa(iterator) + + def withdraw_manifest(*ignored): + repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl) + + def withdraw_crl(*ignored): + repository.withdraw(self.latest_crl, self.crl_uri(ca), done) + + def done(*ignored): + for cert in self.child_certs() + self.revoked_certs(): + cert.sql_delete() + self.sql_delete() + cb() + + rpki.async.iterator(self.child_certs(), withdraw_one_child, child_certs_done) + + def revoke(self, cb): """Request revocation of all certificates whose SKI matches the key for this ca_detail. Tasks: @@ -454,63 +494,79 @@ class ca_detail_obj(rpki.sql.sql_persistant): the revoked certs, with a next CRL time after the last cert or CRL signed by the old keypair will have expired. - - Destroy old keypair (and manifest keypair). + - Generate a corresponding final manifest. + + - Destroy old keypairs. - - Leave final CRL in place until its next CRL time has passed. + - Leave final CRL and manifest in place until their nextupdate time has passed. """ - # This will need a callback when we go event-driven - r_msg = rpki.up_down.revoke_pdu.query(self) + def parent_revoked(r_msg): - if r_msg.payload.ski != self.latest_ca_cert.gSKI(): - raise rpki.exceptions.SKIMismatch + if r_msg.payload.ski != self.latest_ca_cert.gSKI(): + raise rpki.exceptions.SKIMismatch - ca = self.ca() - parent = ca.parent() - crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval) + ca = self.ca() + parent = ca.parent() + crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval) - nextUpdate = rpki.sundial.now() + self.nextUpdate = rpki.sundial.now() - if self.latest_manifest is not None: - nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate()) + if self.latest_manifest is not None: + self.nextUpdate = self.nextUpdate.later(self.latest_manifest.getNextUpdate()) - if self.latest_crl is not None: - nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate()) + if self.latest_crl is not None: + self.nextUpdate = self.nextUpdate.later(self.latest_crl.getNextUpdate()) - for child_cert in self.child_certs(): - nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter()) - child_cert.revoke() + def revoke_one_child(iterator, child_cert): + self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter()) + child_cert.revoke(iterator) - nextUpdate += crl_interval + def final_crl(): + self.nextUpdate += crl_interval + self.generate_crl(callback = final_manifest, nextUpdate = self.nextUpdate) - self.generate_crl(nextUpdate) - self.generate_manifest(nextUpdate) + def final_manifest(*ignored): + self.generate_manifest(callback = done, nextUpdate = self.nextUpdate) - self.private_key_id = None - self.manifest_private_key_id = None - self.manifest_public_key = None - self.latest_manifest_cert = None - self.state = "revoked" - self.sql_mark_dirty() + def done(*ignored): + self.private_key_id = None + self.manifest_private_key_id = None + self.manifest_public_key = None + self.latest_manifest_cert = None + self.state = "revoked" + self.sql_mark_dirty() + cb() - def update(self, parent, ca, rc, sia_uri_changed, old_resources): + rpki.async.iterator(self.child_certs(), revoke_one_child, final_crl) + + rpki.up_down.revoke_pdu.query(self, parent_revoked) + + def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback): """Need to get a new certificate for this ca_detail and perhaps frob children of this ca_detail. """ - # This will need a callback when we go event-driven - issue_response = rpki.up_down.issue_pdu.query(parent, ca, self) - - self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert - new_resources = self.latest_ca_cert.get_3779resources() + def issued(issue_response): + self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert + new_resources = self.latest_ca_cert.get_3779resources() - if sia_uri_changed or old_resources.oversized(new_resources): - for child_cert in self.child_certs(): + def loop(iterator, child_cert): child_resources = child_cert.cert.get_3779resources() if sia_uri_changed or child_resources.oversized(new_resources): child_cert.reissue( ca_detail = self, - resources = child_resources.intersection(new_resources)) + resources = child_resources.intersection(new_resources), + callback = iterator) + else: + iterator() + + if sia_uri_changed or old_resources.oversized(new_resources): + rpki.async.iterator(self.child_certs(), loop, callback) + else: + callback() + + rpki.up_down.issue_pdu.query(parent, ca, self, issued) @classmethod def create(cls, ca): @@ -554,7 +610,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): self.latest_manifest_cert = self.issue_ee(ca, resources, self.manifest_public_key) - def issue(self, ca, child, subject_key, sia, resources, child_cert = None): + def issue(self, ca, child, subject_key, sia, resources, callback, child_cert = None): """Issue a new certificate to a child. Optional child_cert argument specifies an existing child_cert object to update in place; if not specified, we create a new one. Returns the @@ -589,13 +645,15 @@ class ca_detail_obj(rpki.sql.sql_persistant): child_cert.sql_store() - ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca)) + def published(*ignored): + self.generate_manifest(done) - self.generate_manifest() - - return child_cert + def done(*ignored): + callback(child_cert) + + ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published) - def generate_crl(self, nextUpdate = None): + def generate_crl(self, callback, nextUpdate = None): """Generate a new CRL for this ca_detail. At the moment this is unconditional, that is, it is up to the caller to decide whether a new CRL is needed. @@ -626,9 +684,9 @@ class ca_detail_obj(rpki.sql.sql_persistant): nextUpdate = nextUpdate, revokedCertificates = certlist) - repository.publish(self.latest_crl, self.crl_uri(ca)) + repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback) - def generate_manifest(self, nextUpdate = None): + def generate_manifest(self, callback, nextUpdate = None): """Generate a new manifest for this ca_detail.""" ca = self.ca() @@ -658,7 +716,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): keypair = self.manifest_private_key_id, certs = self.latest_manifest_cert) - repository.publish(self.latest_manifest, self.manifest_uri(ca)) + repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback) class child_cert_obj(rpki.sql.sql_persistant): """Certificate that has been issued to a child.""" @@ -696,18 +754,23 @@ class child_cert_obj(rpki.sql.sql_persistant): """Return the publication URI for this child_cert.""" return ca.sia_uri + self.uri_tail() - def revoke(self): + def revoke(self, callback): """Revoke a child cert.""" + rpki.log.debug("Revoking %s" % repr(self)) ca_detail = self.ca_detail() ca = ca_detail.ca() revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) repository = ca.parent().repository() - repository.withdraw(self.cert, self.uri(ca)) - self.gctx.sql.sweep() - self.sql_delete() - def reissue(self, ca_detail, resources = None, sia = None): + def done(*ignored): + self.gctx.sql.sweep() + self.sql_delete() + callback() + + repository.withdraw(self.cert, self.uri(ca), done) + + def reissue(self, ca_detail, callback = None, resources = None, sia = None): """Reissue an existing cert, reusing the public key. If the cert we would generate is identical to the one we already have, we just return the one we already have. If we have to revoke the old @@ -716,6 +779,8 @@ class child_cert_obj(rpki.sql.sql_persistant): child_cert_obj must use the return value from this method. """ + assert callback is not None + ca = ca_detail.ca() child = self.child() @@ -732,7 +797,7 @@ class child_cert_obj(rpki.sql.sql_persistant): assert resources.valid_until is not None and old_resources.valid_until is not None if resources == old_resources and sia == old_sia and ca_detail == old_ca_detail: - return self + return callback(self) must_revoke = old_resources.oversized(resources) or old_resources.valid_until > resources.valid_until new_issuer = ca_detail != old_ca_detail @@ -745,20 +810,24 @@ class child_cert_obj(rpki.sql.sql_persistant): else: child_cert = self + def revoke(child_cert): + + def do_one_cert(iterator, cert): + cert.revoke(iterator) + + def done(): + callback(child_cert) + + rpki.async.iterator([x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert], do_one_cert, done) + child_cert = ca_detail.issue( ca = ca, child = child, subject_key = self.cert.getPublicKey(), sia = sia, resources = resources, - child_cert = child_cert) - - if must_revoke: - for cert in child.child_certs(ca_detail = ca_detail, ski = self.ski): - if cert is not child_cert: - cert.revoke() - - return child_cert + child_cert = child_cert, + callback = revoke if must_revoke else callback) @classmethod def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False): |