diff options
author | Rob Austein <sra@hactrn.net> | 2009-05-10 16:52:44 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-05-10 16:52:44 +0000 |
commit | 18b0cb5714699d1069e0458410af92a39dbdfe8b (patch) | |
tree | b2f0cb030dc05b468c4393be560fc2f31466e3f8 | |
parent | bbeefb2108cebcca5bf065db71bbb8c36c718853 (diff) |
Checkpoint. Conversion to errback() mechanism started, not yet
complete.
svn path=/rpkid/rootd.py; revision=2418
-rwxr-xr-x | rpkid/rootd.py | 6 | ||||
-rw-r--r-- | rpkid/rpki/https.py | 15 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 188 | ||||
-rw-r--r-- | rpkid/rpki/publication.py | 31 | ||||
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 108 | ||||
-rw-r--r-- | rpkid/rpki/up_down.py | 43 | ||||
-rw-r--r-- | rpkid/rpki/xml_utils.py | 26 | ||||
-rw-r--r-- | rpkid/testbed.py | 9 |
8 files changed, 261 insertions, 165 deletions
diff --git a/rpkid/rootd.py b/rpkid/rootd.py index 65af010b..ce5f9f75 100755 --- a/rpkid/rootd.py +++ b/rpkid/rootd.py @@ -144,13 +144,13 @@ def compose_response(r_msg): rc.certs[0].cert = subject_cert class list_pdu(rpki.up_down.list_pdu): - def serve_pdu(self, q_msg, r_msg, ignored, callback): + def serve_pdu(self, q_msg, r_msg, ignored, callback, errback): r_msg.payload = rpki.up_down.list_response_pdu() compose_response(r_msg) callback() class issue_pdu(rpki.up_down.issue_pdu): - def serve_pdu(self, q_msg, r_msg, ignored, callback): + def serve_pdu(self, q_msg, r_msg, ignored, callback, errback): self.pkcs10.check_valid_rpki() set_subject_pkcs10(self.pkcs10) r_msg.payload = rpki.up_down.issue_response_pdu() @@ -158,7 +158,7 @@ class issue_pdu(rpki.up_down.issue_pdu): callback() class revoke_pdu(rpki.up_down.revoke_pdu): - def serve_pdu(self, q_msg, r_msg, ignored, callback): + def serve_pdu(self, q_msg, r_msg, ignored, callback, errback): subject_cert = get_subject_cert() if subject_cert is None or subject_cert.gSKI() != self.ski: raise rpki.exceptions.NotInDatabase diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index 1eae7f2d..82fa3a49 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -54,7 +54,7 @@ debug = True want_persistent_client = True want_persistent_server = True -idle_timeout_default = rpki.sundial.timedelta(seconds = 300) +idle_timeout_default = rpki.sundial.timedelta(seconds = 30) active_timeout_default = idle_timeout_default default_http_version = (1, 0) @@ -318,6 +318,7 @@ class http_server(http_stream): except asyncore.ExitNow: raise except Exception, edata: + print traceback.format_exc() self.send_error(500, "Unhandled exception %s" % edata) else: self.send_error(*error) @@ -533,12 +534,12 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback = Non USE IN PRODUCTION UNTIL TLS SUPPORT HAS BEEN ADDED. """ - if errback is None: - if False: - raise RuntimeError, "rpki.https.client() call with no errback" - else: - def errback(e): - raise e + if errback is not None: + pass + elif False: + raise RuntimeError, "rpki.https.client() call with no errback" + else: + def errback(e): raise e u = urlparse.urlparse(url) diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index a2f74ecf..99786a29 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -105,32 +105,32 @@ class self_elt(data_elt): """Fetch all route_origin objects that link to this self object.""" return route_origin_elt.sql_fetch_where(self.gctx, "self_id = %s", (self.self_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for self_elt.""" rpki.log.trace() if q_pdu.rekey: - self.serve_rekey(cb) + self.serve_rekey(cb, eb) elif q_pdu.revoke: - self.serve_revoke(cb) + self.serve_revoke(cb, eb) else: self.unimplemented_control("reissue", "run_now", "publish_world_now") cb() - def serve_rekey(self, cb): + def serve_rekey(self, cb, eb): """Handle a left-right rekey action for this self.""" rpki.log.trace() def loop(iterator, parent): - parent.serve_rekey(iterator) + parent.serve_rekey(iterator, eb) rpki.async.iterator(self.parents(), loop, cb) - def serve_revoke(self, cb): + def serve_revoke(self, cb, eb): """Handle a left-right revoke action for this self.""" rpki.log.trace() def loop(iterator, parent): - parent.serve_revoke(iterator) + parent.serve_revoke(iterator, eb) rpki.async.iterator(self.parents(), loop, cb) @@ -155,30 +155,41 @@ class self_elt(data_elt): rpki.log.trace() - def each(iterator, parent): + def parent_loop(parent_iterator, parent): - def handle(r_msg): + def got_list(r_msg): ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas()) - def loop(iterator, rc): + def class_loop(class_iterator, rc): if rc.class_name in ca_map: ca = ca_map[rc.class_name] del ca_map[rc.class_name] - ca.check_for_updates(parent, rc, iterator) + ca.check_for_updates(parent, rc, class_iterator, class_update_failed) else: - rpki.rpki_engine.ca_obj.create(parent, rc, iterator) + rpki.rpki_engine.ca_obj.create(parent, rc, class_iterator, class_create_failed) - def done(): + def class_update_failed(e): + rpki.log.warn("Couldn't update class, skipping: %s" % e) + class_iterator() + + def class_create_failed(e): + rpki.log.warn("Couldn't create class, skipping: %s" % e) + class_iterator() + + def class_done(): for ca in ca_map.values(): ca.delete(parent) # CA not listed by parent self.gctx.sql.sweep() - iterator() + parent_iterator() + + rpki.async.iterator(r_msg.payload.classes, class_loop, class_done) - rpki.async.iterator(r_msg.payload.classes, loop, done) + def list_failed(e): + rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s" % (parent, e)) - rpki.up_down.list_pdu.query(parent, handle) + rpki.up_down.list_pdu.query(parent, got_list, list_failed) - rpki.async.iterator(self.parents(), each, callback) + rpki.async.iterator(self.parents(), parent_loop, callback) def update_children(self, cb): """Check for updated IRDB data for all of this self's children and @@ -206,10 +217,17 @@ class self_elt(data_elt): if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now): rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI()) - return child_cert.reissue( + + def reissue_failed(e): + rpki.log.warn("Couldn't reissue child_cert %r, skipping: %s" % (child_cert, e)) + iterator2() + + child_cert.reissue( ca_detail = ca_detail, resources = new_resources, - callback = iterator2) + callback = iterator2, + errback = reissue_failed) + return if old_resources.valid_until < now: rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" @@ -218,15 +236,32 @@ class self_elt(data_elt): parent = ca.parent() repository = parent.repository() child_cert.sql_delete() - return ca_detail.generate_manifest(lambda *ignored: repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2)) + + def withdraw(*ignored): + repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2, withdraw_failed) + + def manifest_failed(e): + rpki.log.warn("Couldn't reissue manifest for %r, skipping: %s" % (ca_detail, e)) + iterator2() + + def withdraw_failed(e): + rpki.log.warn("Couldn't withdraw old child_cert %r, skipping: %s" % (child_cert, e)) + iterator2() + + ca_detail.generate_manifest(withdraw, manifest_failed) + return iterator2() rpki.async.iterator(child_certs, loop2, iterator1) + def irdb_lookup_failed(e): + rpki.log.warn("Couldn't look up child's resources in IRDB, skipping child %r: %s" % (child, e)) + iterator1() + child_certs = child.child_certs() if child_certs: - self.gctx.irdb_query(child.self_id, child.child_id, got_resources) + self.gctx.irdb_query(child.self_id, child.child_id, got_resources, irdb_lookup_failed) else: iterator1() @@ -253,18 +288,22 @@ class self_elt(data_elt): def loop2(iterator2, ca): + def fail2(e): + rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e)) + iterator2() + def loop3(iterator3, ca_detail): - ca_detail.delete(ca, repository, iterator3) + ca_detail.delete(ca, repository, iterator3, fail2) def done3(): ca_detail = ca.fetch_active() def do_crl(): - ca_detail.generate_crl(do_manifest) + ca_detail.generate_crl(do_manifest, fail2) def do_manifest(*ignored): - ca_detail.generate_manifest(iterator2) + ca_detail.generate_manifest(iterator2, fail2) if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate(): do_crl() @@ -318,7 +357,7 @@ class bsc_elt(data_elt): """Fetch all child objects that link to this BSC object.""" return child_elt.sql_fetch_where(self.gctx, "bsc_id = %s", (self.bsc_id,)) - def serve_pre_save_hook(self, q_pdu, r_pdu, cb): + def serve_pre_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for bsc_elt -- handle key generation. For now this only allows RSA with SHA-256. """ @@ -356,33 +395,33 @@ class parent_elt(data_elt): """Fetch all CA objects that link to this parent object.""" return rpki.rpki_engine.ca_obj.sql_fetch_where(self.gctx, "parent_id = %s", (self.parent_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for parent_elt.""" if q_pdu.rekey: - self.serve_rekey(cb) + self.serve_rekey(cb, eb) elif q_pdu.revoke: - self.serve_revoke(cb) + self.serve_revoke(cb, eb) else: self.unimplemented_control("reissue") cb() - def serve_rekey(self, cb): + def serve_rekey(self, cb, eb): """Handle a left-right rekey action for this parent.""" def loop(iterator, ca): - ca.rekey(iterator) + ca.rekey(iterator, eb) rpki.async.iterator(self.cas(), loop, cb) - def serve_revoke(self, cb): + def serve_revoke(self, cb, eb): """Handle a left-right revoke action for this parent.""" def loop(iterator, ca): - ca.revoke(iterator) + ca.revoke(iterator, eb) rpki.async.iterator(self.cas(), loop, cb) - def query_up_down(self, q_pdu, cb): + def query_up_down(self, q_pdu, cb, eb): """Client code for sending one up-down query PDU to this parent.""" rpki.log.trace() @@ -414,7 +453,8 @@ class parent_elt(data_elt): client_cert = bsc.signing_cert, msg = q_cms, url = self.peer_contact_uri, - callback = unwrap) + callback = unwrap, + errback = eb) class child_elt(data_elt): """<child/> element.""" @@ -452,7 +492,7 @@ class child_elt(data_elt): raise rpki.exceptions.ClassNameMismatch, "Class name mismatch: child.self_id = %d, parent.self_id = %d" % (self.self_id, parent.self_id) return ca - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for child_elt.""" self.unimplemented_control("reissue") if self.clear_https_ta_cache: @@ -522,7 +562,7 @@ class repository_elt(data_elt): """Fetch all parent objects that link to this repository object.""" return parent_elt.sql_fetch_where(self.gctx, "repository_id = %s", (self.repository_id,)) - def call_pubd(self, callback, *pdus): + def call_pubd(self, callback, errback, *pdus): """Send a message to publication daemon and return the response.""" rpki.log.trace() bsc = self.bsc() @@ -544,17 +584,17 @@ class repository_elt(data_elt): msg = q_cms, callback = done) - def publish(self, obj, uri, callback): + def publish(self, obj, uri, callback, errback): """Publish one object in the repository.""" rpki.log.trace() rpki.log.info("Publishing %s as %s" % (repr(obj), repr(uri))) - self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) + self.call_pubd(callback, errback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) - def withdraw(self, obj, uri, callback): + def withdraw(self, obj, uri, callback, errback): """Withdraw one object from the repository.""" rpki.log.trace() rpki.log.info("Withdrawing %s from at %s" % (repr(obj), repr(uri))) - self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) + self.call_pubd(callback, errback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) class route_origin_elt(data_elt): """<route_origin/> element.""" @@ -608,7 +648,7 @@ class route_origin_elt(data_elt): """Fetch all ca_detail objects that link to this route_origin object.""" return rpki.rpki_engine.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for route_origin_elt.""" self.unimplemented_control("suppress_publication") cb() @@ -629,34 +669,44 @@ class route_origin_elt(data_elt): def update_roa(self, callback): """Bring this route_origin's ROA up to date if necesssary.""" + def lose(e): + rpki.log.warn("Could not update ROA %r, skipping: %s" % (self, e)) + callback() + return + if self.roa is None: - return self.generate_roa(callback) + self.generate_roa(callback, lose) + return ca_detail = self.ca_detail() if ca_detail is None or ca_detail.state != "active": - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin) if rpki.sundial.now() + regen_margin > self.cert.getNotAfter(): - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return ca_resources = ca_detail.latest_ca_cert.get_3779resources() ee_resources = self.cert.get_3779resources() if ee_resources.oversized(ca_resources): - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() if ee_resources.v4 != v4 or ee_resources.v6 != v6: - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return callback() - def generate_roa(self, callback): + def generate_roa(self, callback, errback): """Generate a ROA based on this <route_origin/> object. At present this does not support ROAs with multiple signatures @@ -721,15 +771,16 @@ class route_origin_elt(data_elt): repository = ca.parent().repository() def one(): - repository.publish(self.cert, self.ee_uri(ca), two) + repository.publish(self.cert, self.ee_uri(ca), two, errback) def two(*ignored): - ca_detail.generate_manifest(callback) + ca_detail.generate_manifest(callback, errback) repository.publish(self.roa, self.roa_uri(ca), - one if self.publish_ee_separately else two) + one if self.publish_ee_separately else two, + errback) - def withdraw_roa(self, callback, regenerate = False): + def withdraw_roa(self, callback, errback, regenerate = False): """Withdraw ROA associated with this route_origin. In order to preserve make-before-break properties without @@ -752,29 +803,30 @@ class route_origin_elt(data_elt): rpki.log.debug("Withdrawing ROA and revoking its EE cert") rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) repository.withdraw(roa, roa_uri, - two if self.publish_ee_separately else three) + two if self.publish_ee_separately else three, + errback) def two(): - repository.withdraw(cert, ee_uri, three) + repository.withdraw(cert, ee_uri, three, errback) def three(*ignored): self.gctx.sql.sweep() - ca_detail.generate_crl(four) + ca_detail.generate_crl(four, errback) def four(*ignored): - ca_detail.generate_manifest(callback) + ca_detail.generate_manifest(callback, errback) if regenerate: - self.generate_roa(one) + self.generate_roa(one, errback) else: one() - def regenerate_roa(self, callback): + def regenerate_roa(self, callback, errback): """Reissue ROA associated with this route_origin.""" if self.ca_detail() is None: - self.generate_roa(callback) + self.generate_roa(callback, errback) else: - self.withdraw_roa(callback, regenerate = True) + self.withdraw_roa(callback, errback, regenerate = True) def roa_uri(self, ca, key = None): """Return the publication URI for this route_origin's ROA.""" @@ -830,11 +882,12 @@ class report_error_elt(rpki.xml_utils.base_elt, left_right_namespace): attributes = ("tag", "self_id", "error_code") @classmethod - def from_exception(cls, exc, self_id = None): + def from_exception(cls, e, self_id = None): """Generate a <report_error/> element from an exception.""" self = cls() self.self_id = self_id - self.error_code = exc.__class__.__name__ + self.error_code = e.__class__.__name__ + self.text = str(e) return self class msg(rpki.xml_utils.msg, left_right_namespace): @@ -856,8 +909,17 @@ class msg(rpki.xml_utils.msg, left_right_namespace): r_msg.type = "reply" def loop(iterator, q_pdu): - q_pdu.gctx = gctx - q_pdu.serve_dispatch(r_msg, iterator) + + def fail(e): + rpki.log.error(traceback.format_exc()) + r_msg.append(report_error_elt.from_exception(e, self_id = q_pdu.self_id)) + cb(r_msg) + + try: + q_pdu.gctx = gctx + q_pdu.serve_dispatch(r_msg, iterator, fail) + except Exception, edata: + fail(edata) def done(): cb(r_msg) diff --git a/rpkid/rpki/publication.py b/rpkid/rpki/publication.py index 0b784e97..e9b525fd 100644 --- a/rpkid/rpki/publication.py +++ b/rpkid/rpki/publication.py @@ -17,7 +17,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import base64, os +import base64, os, traceback import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils import rpki.https, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa @@ -30,13 +30,13 @@ class publication_namespace(object): class control_elt(rpki.xml_utils.data_elt, rpki.sql.sql_persistant, publication_namespace): """Virtual class for control channel objects.""" - def serve_dispatch(self, r_msg, cb): + def serve_dispatch(self, r_msg, cb, eb): """Action dispatch handler. This needs special handling because we need to make sure that this PDU arrived via the control channel. """ if self.client is not None: raise rpki.exceptions.BadQuery, "Control query received on client channel" - rpki.xml_utils.data_elt.serve_dispatch(self, r_msg, cb) + rpki.xml_utils.data_elt.serve_dispatch(self, r_msg, cb, eb) class config_elt(control_elt): """<config/> element. This is a little weird because there should @@ -70,14 +70,14 @@ class config_elt(control_elt): """ return cls.sql_fetch(gctx, cls.wired_in_config_id) - def serve_set(self, r_msg, cb): + def serve_set(self, r_msg, cb, eb): """Handle a set action. This requires special handling because config doesn't support the create method. """ if self.sql_fetch(self.gctx, self.config_id) is None: - control_elt.serve_create(self, r_msg, cb) + control_elt.serve_create(self, r_msg, cb, eb) else: - control_elt.serve_set(self, r_msg, cb) + control_elt.serve_set(self, r_msg, cb, eb) def serve_fetch_one(self): """Find the config object on which a get or set method should @@ -112,7 +112,7 @@ class client_elt(control_elt): if name in self.elements: self.clear_https_ta_cache = True - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for client_elt.""" if self.clear_https_ta_cache: self.gctx.clear_https_ta_cache() @@ -162,7 +162,7 @@ class publication_object_elt(rpki.xml_utils.base_elt, publication_namespace): elt.text = base64.b64encode(self.payload.get_DER()) return elt - def serve_dispatch(self, r_msg, cb): + def serve_dispatch(self, r_msg, cb, eb): """Action dispatch handler.""" if self.client is None: raise rpki.exceptions.BadQuery, "Client query received on control channel" @@ -266,9 +266,18 @@ class msg(rpki.xml_utils.msg, publication_namespace): r_msg.type = "reply" def loop(iterator, q_pdu): - q_pdu.gctx = gctx - q_pdu.client = client - q_pdu.serve_dispatch(r_msg, iterator) + + def fail(e): + rpki.log.error(traceback.format_exc()) + r_msg.append(report_error_elt.from_exception(e)) + cb(r_msg) + + try: + q_pdu.gctx = gctx + q_pdu.client = client + q_pdu.serve_dispatch(r_msg, iterator, fail) + except Exception, edata: + fail(edata) def done(): cb(r_msg) diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index 1d443738..97fae857 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -41,7 +41,7 @@ class rpkid_context(object): self.publication_kludge_base = cfg.get("publication-kludge-base", "publication/") - def irdb_query(self, self_id, child_id, callback): + def irdb_query(self, self_id, child_id, callback, errback): """Perform an IRDB callback query.""" rpki.log.trace() @@ -56,12 +56,14 @@ class rpkid_context(object): def unwrap(der): r_msg = rpki.left_right.cms_msg.unwrap(der, (self.bpki_ta, self.irdb_cert)) if len(r_msg) == 0 or not isinstance(r_msg[0], rpki.left_right.list_resources_elt) or r_msg.type != "reply": - raise rpki.exceptions.BadIRDBReply, "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii") - callback(rpki.resource_set.resource_bag( - asn = r_msg[0].asn, - v4 = r_msg[0].ipv4, - v6 = r_msg[0].ipv6, - valid_until = r_msg[0].valid_until)) + errback(rpki.exceptions.BadIRDBReply( + "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii"))) + else: + callback(rpki.resource_set.resource_bag( + asn = r_msg[0].asn, + v4 = r_msg[0].ipv4, + v6 = r_msg[0].ipv6, + valid_until = r_msg[0].valid_until)) rpki.https.client( server_ta = (self.bpki_ta, self.irdb_cert), @@ -69,7 +71,8 @@ class rpkid_context(object): client_cert = self.rpkid_cert, url = self.irdb_url, msg = q_cms, - callback = unwrap) + callback = unwrap, + errback = errback) def left_right_handler(self, query, path, cb): """Process one left-right PDU.""" @@ -224,7 +227,7 @@ class ca_obj(rpki.sql.sql_persistant): raise rpki.exceptions.BadURISyntax, "SIA URI must end with a slash: %s" % sia_uri return sia_uri + str(self.ca_id) + "/" - def check_for_updates(self, parent, rc, cb): + def check_for_updates(self, parent, rc, cb, eb): """Parent has signaled continued existance of a resource class we already knew about, so we need to check for an updated certificate, changes in resource coverage, revocation and reissue @@ -247,7 +250,8 @@ class ca_obj(rpki.sql.sql_persistant): if ski not in cert_map: rpki.log.warn("Certificate in database missing from list_response, class %s, SKI %s, maybe parent certificate went away?" % (repr(rc.class_name), ca_detail.latest_ca_cert.gSKI())) - return ca_detail.delete(self, parent.repository(), iterator) + ca_detail.delete(self, parent.repository(), iterator, eb) + return def cleanup(): del cert_map[ski] @@ -259,13 +263,15 @@ class ca_obj(rpki.sql.sql_persistant): ca_detail.latest_ca_cert != cert_map[ski].cert or current_resources.undersized(rc_resources) or current_resources.oversized(rc_resources)): - return ca_detail.update( + ca_detail.update( parent = parent, ca = self, rc = rc, sia_uri_changed = sia_uri_changed, old_resources = current_resources, - callback = cleanup) + callback = cleanup, + errback = eb) + return cleanup() @@ -278,7 +284,7 @@ class ca_obj(rpki.sql.sql_persistant): rpki.async.iterator(ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)), loop, done) @classmethod - def create(cls, parent, rc, cb): + def create(cls, parent, rc, cb, eb): """Parent has signaled existance of a new resource class, so we need to create and set up a corresponding CA object. """ @@ -296,9 +302,10 @@ class ca_obj(rpki.sql.sql_persistant): ca = self, cert = issue_response.payload.classes[0].certs[0].cert, uri = issue_response.payload.classes[0].certs[0].cert_url, - callback = cb) + callback = cb, + errback = eb) - rpki.up_down.issue_pdu.query(parent, self, ca_detail, done) + rpki.up_down.issue_pdu.query(parent, self, ca_detail, done, eb) def delete(self, parent): """The list of current resource classes received from parent does @@ -334,7 +341,7 @@ class ca_obj(rpki.sql.sql_persistant): self.sql_mark_dirty() return self.last_crl_sn - def rekey(self, cb): + def rekey(self, cb, eb): """Initiate a rekey operation for this ca. Generate a new keypair. Request cert from parent using new keypair. Mark result as our active ca_detail. Reissue all child certs issued by this @@ -353,17 +360,18 @@ class ca_obj(rpki.sql.sql_persistant): cert = issue_response.payload.classes[0].certs[0].cert, uri = issue_response.payload.classes[0].certs[0].cert_url, predecessor = old_detail, - callback = cb) + callback = cb, + errback = eb) - rpki.up_down.issue_pdu.query(parent, self, new_detail, done) + rpki.up_down.issue_pdu.query(parent, self, new_detail, done, eb) - def revoke(self, cb): + def revoke(self, cb, eb): """Revoke deprecated ca_detail objects associated with this ca.""" rpki.log.trace() def loop(iterator, ca_detail): - ca_detail.revoke(iterator) + ca_detail.revoke(iterator, eb) rpki.async.iterator(self.fetch_deprecated(), loop, cb) @@ -421,7 +429,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): """Return publication URI for this ca_detail's manifest.""" return ca.sia_uri + self.public_key.gSKI() + ".mnf" - def activate(self, ca, cert, uri, callback, predecessor = None): + def activate(self, ca, cert, uri, callback, errback, predecessor = None): """Activate this ca_detail.""" self.latest_ca_cert = cert @@ -429,7 +437,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): self.generate_manifest_cert(ca) def did_crl(*ignored): - self.generate_manifest(callback = did_manifest) + self.generate_manifest(callback = did_manifest, errback = errback) def did_manifest(*ignored): self.state = "active" @@ -442,21 +450,21 @@ class ca_detail_obj(rpki.sql.sql_persistant): rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs) def do_one_child_cert(iterator, child_cert): - child_cert.reissue(self, iterator) + child_cert.reissue(self, iterator, errback) def done_child_certs(): rpki.async.iterator(predecessor.route_origins(), do_one_route_origin, callback) def do_one_route_origin(iterator, route_origin): - route_origin.regenerate_roa(iterator) + route_origin.regenerate_roa(iterator, errback) - self.generate_crl(callback = did_crl) + self.generate_crl(callback = did_crl, errback = errback) - def delete(self, ca, repository, cb): + def delete(self, ca, repository, cb, eb): """Delete this ca_detail and all of the certs it issued.""" def withdraw_one_child(iterator, child_cert): - repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator) + repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator, eb) def child_certs_done(): rpki.async.iterator(self.route_origins(), withdraw_one_roa, withdraw_manifest) @@ -465,10 +473,10 @@ class ca_detail_obj(rpki.sql.sql_persistant): route_origin.withdraw_roa(iterator) def withdraw_manifest(*ignored): - repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl) + repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl, eb) def withdraw_crl(*ignored): - repository.withdraw(self.latest_crl, self.crl_uri(ca), done) + repository.withdraw(self.latest_crl, self.crl_uri(ca), done, eb) def done(*ignored): for cert in self.child_certs() + self.revoked_certs(): @@ -478,7 +486,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): rpki.async.iterator(self.child_certs(), withdraw_one_child, child_certs_done) - def revoke(self, cb): + def revoke(self, cb, eb): """Request revocation of all certificates whose SKI matches the key for this ca_detail. Tasks: @@ -517,14 +525,14 @@ class ca_detail_obj(rpki.sql.sql_persistant): def revoke_one_child(iterator, child_cert): self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter()) - child_cert.revoke(iterator) + child_cert.revoke(iterator, eb) def final_crl(): self.nextUpdate += crl_interval - self.generate_crl(callback = final_manifest, nextUpdate = self.nextUpdate) + self.generate_crl(callback = final_manifest, errback = eb, nextUpdate = self.nextUpdate) def final_manifest(*ignored): - self.generate_manifest(callback = done, nextUpdate = self.nextUpdate) + self.generate_manifest(callback = done, errback = eb, nextUpdate = self.nextUpdate) def done(*ignored): self.private_key_id = None @@ -537,9 +545,9 @@ class ca_detail_obj(rpki.sql.sql_persistant): rpki.async.iterator(self.child_certs(), revoke_one_child, final_crl) - rpki.up_down.revoke_pdu.query(self, parent_revoked) + rpki.up_down.revoke_pdu.query(self, parent_revoked, eb) - def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback): + def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback, errback): """Need to get a new certificate for this ca_detail and perhaps frob children of this ca_detail. """ @@ -554,7 +562,8 @@ class ca_detail_obj(rpki.sql.sql_persistant): child_cert.reissue( ca_detail = self, resources = child_resources.intersection(new_resources), - callback = iterator) + callback = iterator, + errback = errback) else: iterator() @@ -563,7 +572,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): else: callback() - rpki.up_down.issue_pdu.query(parent, ca, self, issued) + rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback) @classmethod def create(cls, ca): @@ -607,7 +616,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): self.latest_manifest_cert = self.issue_ee(ca, resources, self.manifest_public_key) - def issue(self, ca, child, subject_key, sia, resources, callback, child_cert = None): + def issue(self, ca, child, subject_key, sia, resources, callback, errback, child_cert = None): """Issue a new certificate to a child. Optional child_cert argument specifies an existing child_cert object to update in place; if not specified, we create a new one. Returns the @@ -643,14 +652,14 @@ class ca_detail_obj(rpki.sql.sql_persistant): child_cert.sql_store() def published(*ignored): - self.generate_manifest(done) + self.generate_manifest(done, errback) def done(*ignored): callback(child_cert) - ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published) + ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published, errback) - def generate_crl(self, callback, nextUpdate = None): + def generate_crl(self, callback, errback, nextUpdate = None): """Generate a new CRL for this ca_detail. At the moment this is unconditional, that is, it is up to the caller to decide whether a new CRL is needed. @@ -681,9 +690,9 @@ class ca_detail_obj(rpki.sql.sql_persistant): nextUpdate = nextUpdate, revokedCertificates = certlist) - repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback) + repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback, errback = errback) - def generate_manifest(self, callback, nextUpdate = None): + def generate_manifest(self, callback, errback, nextUpdate = None): """Generate a new manifest for this ca_detail.""" ca = self.ca() @@ -713,7 +722,7 @@ class ca_detail_obj(rpki.sql.sql_persistant): keypair = self.manifest_private_key_id, certs = self.latest_manifest_cert) - repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback) + repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback, errback = errback) class child_cert_obj(rpki.sql.sql_persistant): """Certificate that has been issued to a child.""" @@ -751,7 +760,7 @@ class child_cert_obj(rpki.sql.sql_persistant): """Return the publication URI for this child_cert.""" return ca.sia_uri + self.uri_tail() - def revoke(self, callback): + def revoke(self, callback, errback): """Revoke a child cert.""" rpki.log.debug("Revoking %s" % repr(self)) @@ -765,9 +774,9 @@ class child_cert_obj(rpki.sql.sql_persistant): self.sql_delete() callback() - repository.withdraw(self.cert, self.uri(ca), done) + repository.withdraw(self.cert, self.uri(ca), done, errback) - def reissue(self, ca_detail, callback = None, resources = None, sia = None): + def reissue(self, ca_detail, callback = None, errback = None, resources = None, sia = None): """Reissue an existing cert, reusing the public key. If the cert we would generate is identical to the one we already have, we just return the one we already have. If we have to revoke the old @@ -776,7 +785,9 @@ class child_cert_obj(rpki.sql.sql_persistant): child_cert_obj must use the return value from this method. """ + # Hack during conversion, remove default values and these assertions eventually assert callback is not None + assert errback is not None ca = ca_detail.ca() child = self.child() @@ -824,7 +835,8 @@ class child_cert_obj(rpki.sql.sql_persistant): sia = sia, resources = resources, child_cert = child_cert, - callback = revoke if must_revoke else callback) + callback = revoke if must_revoke else callback, + errback = errback) @classmethod def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False): diff --git a/rpkid/rpki/up_down.py b/rpkid/rpki/up_down.py index e5031c00..5eeda02d 100644 --- a/rpkid/rpki/up_down.py +++ b/rpkid/rpki/up_down.py @@ -17,7 +17,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import base64, lxml.etree, time +import base64, lxml.etree, time, traceback import rpki.resource_set, rpki.x509, rpki.exceptions import rpki.xml_utils, rpki.relaxng @@ -62,7 +62,7 @@ class base_elt(object): if value is not None: lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(value) - def serve_pdu(self, q_msg, r_msg, child, callback): + def serve_pdu(self, q_msg, r_msg, child, callback, errback): """Default PDU handler to catch unexpected types.""" raise rpki.exceptions.BadQuery, "Unexpected query type %s" % q_msg.type @@ -185,7 +185,7 @@ class list_pdu(base_elt): """Generate (empty) payload of "list" PDU.""" return [] - def serve_pdu(self, q_msg, r_msg, child, callback): + def serve_pdu(self, q_msg, r_msg, child, callback, errback): """Serve one "list" PDU.""" def handle(irdb_resources): @@ -215,12 +215,12 @@ class list_pdu(base_elt): r_msg.payload.classes.append(rc) callback() - self.gctx.irdb_query(child.self_id, child.child_id, handle) + self.gctx.irdb_query(child.self_id, child.child_id, handle, errback) @classmethod - def query(cls, parent, cb): + def query(cls, parent, cb, eb): """Send a "list" query to parent.""" - return parent.query_up_down(cls(), cb) + parent.query_up_down(cls(), cb, eb) class class_response_syntax(base_elt): """Syntax for Up-Down protocol "list_response" and "issue_response" PDUs.""" @@ -269,7 +269,7 @@ class issue_pdu(base_elt): elt.text = self.pkcs10.get_Base64() return [elt] - def serve_pdu(self, q_msg, r_msg, child, callback): + def serve_pdu(self, q_msg, r_msg, child, callback, errback): """Serve one issue request PDU.""" # Subsetting not yet implemented, this is the one place where we @@ -322,18 +322,20 @@ class issue_pdu(base_elt): subject_key = req_key, sia = req_sia, resources = resources, - callback = got_child_cert) + callback = got_child_cert, + errback = errback) else: child_cert.reissue( ca_detail = ca_detail, sia = req_sia, resources = resources, - callback = got_child_cert) + callback = got_child_cert, + errback = errback) - self.gctx.irdb_query(child.self_id, child.child_id, got_resources) + self.gctx.irdb_query(child.self_id, child.child_id, got_resources, errback) @classmethod - def query(cls, parent, ca, ca_detail, callback): + def query(cls, parent, ca, ca_detail, callback, errback): """Send an "issue" request to parent associated with ca.""" assert ca_detail is not None and ca_detail.state in ("pending", "active") sia = ((rpki.oids.name2oid["id-ad-caRepository"], ("uri", ca.sia_uri)), @@ -341,7 +343,7 @@ class issue_pdu(base_elt): self = cls() self.class_name = ca.parent_resource_class self.pkcs10 = rpki.x509.PKCS10.create_ca(ca_detail.private_key_id, sia) - parent.query_up_down(self, callback) + parent.query_up_down(self, callback, errback) class issue_response_pdu(class_response_syntax): """Up-Down protocol "issue_response" PDU.""" @@ -372,13 +374,13 @@ class revoke_pdu(revoke_syntax): """Convert g(SKI) encoding from PDU back to raw SKI.""" return base64.urlsafe_b64decode(self.ski + "=") - def serve_pdu(self, q_msg, r_msg, child, cb): + def serve_pdu(self, q_msg, r_msg, child, cb, eb): """Serve one revoke request PDU.""" def loop1(iterator1, ca_detail): def loop2(iterator2, child_cert): - child_cert.revoke(iterator2) + child_cert.revoke(iterator2, eb) rpki.async.iterator(child.child_certs(ca_detail = ca_detail, ski = self.get_SKI()), loop2, iterator1) @@ -392,14 +394,14 @@ class revoke_pdu(revoke_syntax): rpki.async.iterator(child.ca_from_class_name(self.class_name).ca_details(), loop1, done) @classmethod - def query(cls, ca_detail, cb): + def query(cls, ca_detail, cb, eb): """Send a "revoke" request to parent associated with ca_detail.""" ca = ca_detail.ca() parent = ca.parent() self = cls() self.class_name = ca.parent_resource_class self.ski = ca_detail.latest_ca_cert.gSKI() - return parent.query_up_down(self, cb) + parent.query_up_down(self, cb, eb) class revoke_response_pdu(revoke_syntax): """Up-Down protocol "revoke_response" PDU.""" @@ -514,7 +516,14 @@ class message_pdu(base_elt): r_msg.type = self.type2name[type(r_msg.payload)] callback(r_msg) - self.payload.serve_pdu(self, r_msg, child, done) + def lose(e): + rpki.log.error(traceback.format_exc()) + callback(self.serve_error(e)) + + try: + self.payload.serve_pdu(self, r_msg, child, done, lose) + except Exception, edata: + lose(edata) def serve_error(self, exception): """Generate an error_response message PDU.""" diff --git a/rpkid/rpki/xml_utils.py b/rpkid/rpki/xml_utils.py index 566c9a50..803429e7 100644 --- a/rpkid/rpki/xml_utils.py +++ b/rpkid/rpki/xml_utils.py @@ -220,30 +220,30 @@ class data_elt(base_elt): """Overridable hook.""" pass - def serve_pre_save_hook(self, q_pdu, r_pdu, cb): + def serve_pre_save_hook(self, q_pdu, r_pdu, cb, eb): """Overridable hook.""" cb() - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Overridable hook.""" cb() - def serve_create(self, r_msg, cb): + def serve_create(self, r_msg, cb, eb): """Handle a create action.""" r_pdu = self.make_reply() def one(): self.sql_store() setattr(r_pdu, self.sql_template.index, getattr(self, self.sql_template.index)) - self.serve_post_save_hook(self, r_pdu, two) + self.serve_post_save_hook(self, r_pdu, two, eb) def two(): r_msg.append(r_pdu) cb() - self.serve_pre_save_hook(self, r_pdu, one) + self.serve_pre_save_hook(self, r_pdu, one, eb) - def serve_set(self, r_msg, cb): + def serve_set(self, r_msg, cb, eb): """Handle a set action.""" db_pdu = self.serve_fetch_one() r_pdu = self.make_reply() @@ -255,36 +255,36 @@ class data_elt(base_elt): def one(): db_pdu.sql_store() - db_pdu.serve_post_save_hook(self, r_pdu, two) + db_pdu.serve_post_save_hook(self, r_pdu, two, eb) def two(): r_msg.append(r_pdu) cb() - db_pdu.serve_pre_save_hook(self, r_pdu, one) + db_pdu.serve_pre_save_hook(self, r_pdu, one, eb) - def serve_get(self, r_msg, cb): + def serve_get(self, r_msg, cb, eb): """Handle a get action.""" r_pdu = self.serve_fetch_one() self.make_reply(r_pdu) r_msg.append(r_pdu) cb() - def serve_list(self, r_msg, cb): + def serve_list(self, r_msg, cb, eb): """Handle a list action for non-self objects.""" for r_pdu in self.serve_fetch_all(): self.make_reply(r_pdu) r_msg.append(r_pdu) cb() - def serve_destroy(self, r_msg, cb): + def serve_destroy(self, r_msg, cb, eb): """Handle a destroy action.""" db_pdu = self.serve_fetch_one() db_pdu.sql_delete() r_msg.append(self.make_reply()) cb() - def serve_dispatch(self, r_msg, cb): + def serve_dispatch(self, r_msg, cb, eb): """Action dispatch handler.""" dispatch = { "create" : self.serve_create, "set" : self.serve_set, @@ -293,7 +293,7 @@ class data_elt(base_elt): "destroy" : self.serve_destroy } if self.action not in dispatch: raise rpki.exceptions.BadQuery, "Unexpected query: action %s" % self.action - dispatch[self.action](r_msg, cb) + dispatch[self.action](r_msg, cb, eb) def unimplemented_control(self, *controls): """Uniform handling for unimplemented control operations.""" diff --git a/rpkid/testbed.py b/rpkid/testbed.py index 98dcc550..16cee892 100644 --- a/rpkid/testbed.py +++ b/rpkid/testbed.py @@ -665,7 +665,8 @@ class allocation(object): server_ta = self.rpkid_ta, url = url, msg = cms, - callback = self.call_rpkid_cb) + callback = self.call_rpkid_cb, + errback = self.call_rpkid_cb) rpki.log.info("Call to rpkid %s returned" % self.name) @@ -885,7 +886,8 @@ class allocation(object): server_ta = self.rpkid_ta, url = "https://localhost:%d/cronjob" % self.rpki_port, msg = "Run cron now, please", - callback = cb) + callback = cb, + errback = cb) def run_yaml(self): """Run YAML scripts for this leaf entity. Since we're not @@ -1014,7 +1016,8 @@ def call_pubd(pdu, cb): server_ta = pubd_ta, url = url, msg = cms, - callback = call_pubd_cb) + callback = call_pubd_cb, + errback = call_pubd_cb) def call_pubd_cb(val): if isinstance(val, Exception): |