diff options
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r-- | rpkid/rpki/left_right.py | 188 |
1 files changed, 125 insertions, 63 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index a2f74ecf..99786a29 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -105,32 +105,32 @@ class self_elt(data_elt): """Fetch all route_origin objects that link to this self object.""" return route_origin_elt.sql_fetch_where(self.gctx, "self_id = %s", (self.self_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for self_elt.""" rpki.log.trace() if q_pdu.rekey: - self.serve_rekey(cb) + self.serve_rekey(cb, eb) elif q_pdu.revoke: - self.serve_revoke(cb) + self.serve_revoke(cb, eb) else: self.unimplemented_control("reissue", "run_now", "publish_world_now") cb() - def serve_rekey(self, cb): + def serve_rekey(self, cb, eb): """Handle a left-right rekey action for this self.""" rpki.log.trace() def loop(iterator, parent): - parent.serve_rekey(iterator) + parent.serve_rekey(iterator, eb) rpki.async.iterator(self.parents(), loop, cb) - def serve_revoke(self, cb): + def serve_revoke(self, cb, eb): """Handle a left-right revoke action for this self.""" rpki.log.trace() def loop(iterator, parent): - parent.serve_revoke(iterator) + parent.serve_revoke(iterator, eb) rpki.async.iterator(self.parents(), loop, cb) @@ -155,30 +155,41 @@ class self_elt(data_elt): rpki.log.trace() - def each(iterator, parent): + def parent_loop(parent_iterator, parent): - def handle(r_msg): + def got_list(r_msg): ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas()) - def loop(iterator, rc): + def class_loop(class_iterator, rc): if rc.class_name in ca_map: ca = ca_map[rc.class_name] del ca_map[rc.class_name] - ca.check_for_updates(parent, rc, iterator) + ca.check_for_updates(parent, rc, class_iterator, class_update_failed) else: - rpki.rpki_engine.ca_obj.create(parent, rc, iterator) + rpki.rpki_engine.ca_obj.create(parent, rc, class_iterator, class_create_failed) - def done(): + def class_update_failed(e): + rpki.log.warn("Couldn't update class, skipping: %s" % e) + class_iterator() + + def class_create_failed(e): + rpki.log.warn("Couldn't create class, skipping: %s" % e) + class_iterator() + + def class_done(): for ca in ca_map.values(): ca.delete(parent) # CA not listed by parent self.gctx.sql.sweep() - iterator() + parent_iterator() + + rpki.async.iterator(r_msg.payload.classes, class_loop, class_done) - rpki.async.iterator(r_msg.payload.classes, loop, done) + def list_failed(e): + rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s" % (parent, e)) - rpki.up_down.list_pdu.query(parent, handle) + rpki.up_down.list_pdu.query(parent, got_list, list_failed) - rpki.async.iterator(self.parents(), each, callback) + rpki.async.iterator(self.parents(), parent_loop, callback) def update_children(self, cb): """Check for updated IRDB data for all of this self's children and @@ -206,10 +217,17 @@ class self_elt(data_elt): if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now): rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI()) - return child_cert.reissue( + + def reissue_failed(e): + rpki.log.warn("Couldn't reissue child_cert %r, skipping: %s" % (child_cert, e)) + iterator2() + + child_cert.reissue( ca_detail = ca_detail, resources = new_resources, - callback = iterator2) + callback = iterator2, + errback = reissue_failed) + return if old_resources.valid_until < now: rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" @@ -218,15 +236,32 @@ class self_elt(data_elt): parent = ca.parent() repository = parent.repository() child_cert.sql_delete() - return ca_detail.generate_manifest(lambda *ignored: repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2)) + + def withdraw(*ignored): + repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2, withdraw_failed) + + def manifest_failed(e): + rpki.log.warn("Couldn't reissue manifest for %r, skipping: %s" % (ca_detail, e)) + iterator2() + + def withdraw_failed(e): + rpki.log.warn("Couldn't withdraw old child_cert %r, skipping: %s" % (child_cert, e)) + iterator2() + + ca_detail.generate_manifest(withdraw, manifest_failed) + return iterator2() rpki.async.iterator(child_certs, loop2, iterator1) + def irdb_lookup_failed(e): + rpki.log.warn("Couldn't look up child's resources in IRDB, skipping child %r: %s" % (child, e)) + iterator1() + child_certs = child.child_certs() if child_certs: - self.gctx.irdb_query(child.self_id, child.child_id, got_resources) + self.gctx.irdb_query(child.self_id, child.child_id, got_resources, irdb_lookup_failed) else: iterator1() @@ -253,18 +288,22 @@ class self_elt(data_elt): def loop2(iterator2, ca): + def fail2(e): + rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e)) + iterator2() + def loop3(iterator3, ca_detail): - ca_detail.delete(ca, repository, iterator3) + ca_detail.delete(ca, repository, iterator3, fail2) def done3(): ca_detail = ca.fetch_active() def do_crl(): - ca_detail.generate_crl(do_manifest) + ca_detail.generate_crl(do_manifest, fail2) def do_manifest(*ignored): - ca_detail.generate_manifest(iterator2) + ca_detail.generate_manifest(iterator2, fail2) if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate(): do_crl() @@ -318,7 +357,7 @@ class bsc_elt(data_elt): """Fetch all child objects that link to this BSC object.""" return child_elt.sql_fetch_where(self.gctx, "bsc_id = %s", (self.bsc_id,)) - def serve_pre_save_hook(self, q_pdu, r_pdu, cb): + def serve_pre_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for bsc_elt -- handle key generation. For now this only allows RSA with SHA-256. """ @@ -356,33 +395,33 @@ class parent_elt(data_elt): """Fetch all CA objects that link to this parent object.""" return rpki.rpki_engine.ca_obj.sql_fetch_where(self.gctx, "parent_id = %s", (self.parent_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for parent_elt.""" if q_pdu.rekey: - self.serve_rekey(cb) + self.serve_rekey(cb, eb) elif q_pdu.revoke: - self.serve_revoke(cb) + self.serve_revoke(cb, eb) else: self.unimplemented_control("reissue") cb() - def serve_rekey(self, cb): + def serve_rekey(self, cb, eb): """Handle a left-right rekey action for this parent.""" def loop(iterator, ca): - ca.rekey(iterator) + ca.rekey(iterator, eb) rpki.async.iterator(self.cas(), loop, cb) - def serve_revoke(self, cb): + def serve_revoke(self, cb, eb): """Handle a left-right revoke action for this parent.""" def loop(iterator, ca): - ca.revoke(iterator) + ca.revoke(iterator, eb) rpki.async.iterator(self.cas(), loop, cb) - def query_up_down(self, q_pdu, cb): + def query_up_down(self, q_pdu, cb, eb): """Client code for sending one up-down query PDU to this parent.""" rpki.log.trace() @@ -414,7 +453,8 @@ class parent_elt(data_elt): client_cert = bsc.signing_cert, msg = q_cms, url = self.peer_contact_uri, - callback = unwrap) + callback = unwrap, + errback = eb) class child_elt(data_elt): """<child/> element.""" @@ -452,7 +492,7 @@ class child_elt(data_elt): raise rpki.exceptions.ClassNameMismatch, "Class name mismatch: child.self_id = %d, parent.self_id = %d" % (self.self_id, parent.self_id) return ca - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for child_elt.""" self.unimplemented_control("reissue") if self.clear_https_ta_cache: @@ -522,7 +562,7 @@ class repository_elt(data_elt): """Fetch all parent objects that link to this repository object.""" return parent_elt.sql_fetch_where(self.gctx, "repository_id = %s", (self.repository_id,)) - def call_pubd(self, callback, *pdus): + def call_pubd(self, callback, errback, *pdus): """Send a message to publication daemon and return the response.""" rpki.log.trace() bsc = self.bsc() @@ -544,17 +584,17 @@ class repository_elt(data_elt): msg = q_cms, callback = done) - def publish(self, obj, uri, callback): + def publish(self, obj, uri, callback, errback): """Publish one object in the repository.""" rpki.log.trace() rpki.log.info("Publishing %s as %s" % (repr(obj), repr(uri))) - self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) + self.call_pubd(callback, errback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) - def withdraw(self, obj, uri, callback): + def withdraw(self, obj, uri, callback, errback): """Withdraw one object from the repository.""" rpki.log.trace() rpki.log.info("Withdrawing %s from at %s" % (repr(obj), repr(uri))) - self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) + self.call_pubd(callback, errback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) class route_origin_elt(data_elt): """<route_origin/> element.""" @@ -608,7 +648,7 @@ class route_origin_elt(data_elt): """Fetch all ca_detail objects that link to this route_origin object.""" return rpki.rpki_engine.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) - def serve_post_save_hook(self, q_pdu, r_pdu, cb): + def serve_post_save_hook(self, q_pdu, r_pdu, cb, eb): """Extra server actions for route_origin_elt.""" self.unimplemented_control("suppress_publication") cb() @@ -629,34 +669,44 @@ class route_origin_elt(data_elt): def update_roa(self, callback): """Bring this route_origin's ROA up to date if necesssary.""" + def lose(e): + rpki.log.warn("Could not update ROA %r, skipping: %s" % (self, e)) + callback() + return + if self.roa is None: - return self.generate_roa(callback) + self.generate_roa(callback, lose) + return ca_detail = self.ca_detail() if ca_detail is None or ca_detail.state != "active": - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin) if rpki.sundial.now() + regen_margin > self.cert.getNotAfter(): - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return ca_resources = ca_detail.latest_ca_cert.get_3779resources() ee_resources = self.cert.get_3779resources() if ee_resources.oversized(ca_resources): - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() if ee_resources.v4 != v4 or ee_resources.v6 != v6: - return self.regenerate_roa(callback) + self.regenerate_roa(callback, lose) + return callback() - def generate_roa(self, callback): + def generate_roa(self, callback, errback): """Generate a ROA based on this <route_origin/> object. At present this does not support ROAs with multiple signatures @@ -721,15 +771,16 @@ class route_origin_elt(data_elt): repository = ca.parent().repository() def one(): - repository.publish(self.cert, self.ee_uri(ca), two) + repository.publish(self.cert, self.ee_uri(ca), two, errback) def two(*ignored): - ca_detail.generate_manifest(callback) + ca_detail.generate_manifest(callback, errback) repository.publish(self.roa, self.roa_uri(ca), - one if self.publish_ee_separately else two) + one if self.publish_ee_separately else two, + errback) - def withdraw_roa(self, callback, regenerate = False): + def withdraw_roa(self, callback, errback, regenerate = False): """Withdraw ROA associated with this route_origin. In order to preserve make-before-break properties without @@ -752,29 +803,30 @@ class route_origin_elt(data_elt): rpki.log.debug("Withdrawing ROA and revoking its EE cert") rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) repository.withdraw(roa, roa_uri, - two if self.publish_ee_separately else three) + two if self.publish_ee_separately else three, + errback) def two(): - repository.withdraw(cert, ee_uri, three) + repository.withdraw(cert, ee_uri, three, errback) def three(*ignored): self.gctx.sql.sweep() - ca_detail.generate_crl(four) + ca_detail.generate_crl(four, errback) def four(*ignored): - ca_detail.generate_manifest(callback) + ca_detail.generate_manifest(callback, errback) if regenerate: - self.generate_roa(one) + self.generate_roa(one, errback) else: one() - def regenerate_roa(self, callback): + def regenerate_roa(self, callback, errback): """Reissue ROA associated with this route_origin.""" if self.ca_detail() is None: - self.generate_roa(callback) + self.generate_roa(callback, errback) else: - self.withdraw_roa(callback, regenerate = True) + self.withdraw_roa(callback, errback, regenerate = True) def roa_uri(self, ca, key = None): """Return the publication URI for this route_origin's ROA.""" @@ -830,11 +882,12 @@ class report_error_elt(rpki.xml_utils.base_elt, left_right_namespace): attributes = ("tag", "self_id", "error_code") @classmethod - def from_exception(cls, exc, self_id = None): + def from_exception(cls, e, self_id = None): """Generate a <report_error/> element from an exception.""" self = cls() self.self_id = self_id - self.error_code = exc.__class__.__name__ + self.error_code = e.__class__.__name__ + self.text = str(e) return self class msg(rpki.xml_utils.msg, left_right_namespace): @@ -856,8 +909,17 @@ class msg(rpki.xml_utils.msg, left_right_namespace): r_msg.type = "reply" def loop(iterator, q_pdu): - q_pdu.gctx = gctx - q_pdu.serve_dispatch(r_msg, iterator) + + def fail(e): + rpki.log.error(traceback.format_exc()) + r_msg.append(report_error_elt.from_exception(e, self_id = q_pdu.self_id)) + cb(r_msg) + + try: + q_pdu.gctx = gctx + q_pdu.serve_dispatch(r_msg, iterator, fail) + except Exception, edata: + fail(edata) def done(): cb(r_msg) |