diff options
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r-- | rpkid/rpki/left_right.py | 443 |
1 files changed, 242 insertions, 201 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 4769cf0e..363feddb 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -20,7 +20,7 @@ PERFORMANCE OF THIS SOFTWARE. import base64, lxml.etree, time, traceback, os import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils import rpki.https, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa -import rpki.publication +import rpki.publication, rpki.async # Enforce strict checking of XML "sender" field in up-down protocol enforce_strict_up_down_xml_sender = False @@ -105,26 +105,34 @@ class self_elt(data_elt): """Fetch all route_origin objects that link to this self object.""" return route_origin_elt.sql_fetch_where(self.gctx, "self_id = %s", (self.self_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu): + def serve_post_save_hook(self, q_pdu, r_pdu, cb): """Extra server actions for self_elt.""" rpki.log.trace() if q_pdu.rekey: - self.serve_rekey() - if q_pdu.revoke: - self.serve_revoke() - self.unimplemented_control("reissue", "run_now", "publish_world_now") + self.serve_rekey(cb) + elif q_pdu.revoke: + self.serve_revoke(cb) + else: + self.unimplemented_control("reissue", "run_now", "publish_world_now") + cb() - def serve_rekey(self): + def serve_rekey(self, cb): """Handle a left-right rekey action for this self.""" rpki.log.trace() - for parent in self.parents(): - parent.serve_rekey() - def serve_revoke(self): + def loop(iterator, parent): + parent.serve_rekey(iterator) + + rpki.async.iterator(self.parents(), loop, cb) + + def serve_revoke(self, cb): """Handle a left-right revoke action for this self.""" rpki.log.trace() - for parent in self.parents(): - parent.serve_revoke() + + def loop(iterator, parent): + parent.serve_revoke(iterator) + + rpki.async.iterator(self.parents(), loop, cb) def serve_fetch_one(self): """Find the self object upon which a get, set, or destroy action @@ -142,29 +150,35 @@ class self_elt(data_elt): """ return self.sql_fetch_all(self.gctx) - def client_poll(self, cb): + def client_poll(self, callback): """Run the regular client poll cycle with each of this self's parents in turn.""" rpki.log.trace() - for parent in self.parents(): - - # This will need a callback when we go event-driven - r_msg = rpki.up_down.list_pdu.query(parent) - - ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas()) - for rc in r_msg.payload.classes: - if rc.class_name in ca_map: - ca = ca_map[rc.class_name] - del ca_map[rc.class_name] - ca.check_for_updates(parent, rc) - else: - rpki.rpki_engine.ca_obj.create(parent, rc) - for ca in ca_map.values(): - ca.delete(parent) # CA not listed by parent - self.gctx.sql.sweep() + def each(iterator, parent): - cb() + def handle(r_msg): + ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas()) + + def loop(iterator, rc): + if rc.class_name in ca_map: + ca = ca_map[rc.class_name] + del ca_map[rc.class_name] + ca.check_for_updates(parent, rc, iterator) + else: + rpki.rpki_engine.ca_obj.create(parent, rc, iterator) + + def done(): + for ca in ca_map.values(): + ca.delete(parent) # CA not listed by parent + self.gctx.sql.sweep() + iterator() + + rpki.async.iterator(r_msg.payload.classes, loop, done) + + rpki.up_down.list_pdu.query(parent, handle) + + rpki.async.iterator(self.parents(), each, callback) def update_children(self, cb): """Check for updated IRDB data for all of this self's children and @@ -178,38 +192,48 @@ class self_elt(data_elt): rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin) - for child in self.children(): + def loop1(iterator1, child): + + def got_resources(irdb_resources): + + def loop2(iterator2, child_cert): + + ca_detail = child_cert.ca_detail() + + if ca_detail.state == "active": + old_resources = child_cert.cert.get_3779resources() + new_resources = irdb_resources.intersection(old_resources) + + if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now): + rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI()) + return child_cert.reissue( + ca_detail = ca_detail, + resources = new_resources, + callback = iterator2) + + if old_resources.valid_until < now: + rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" + % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) + ca = ca_detail.ca() + parent = ca.parent() + repository = parent.repository() + child_cert.sql_delete() + return ca_detail.generate_manifest(lambda *ignored: repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2)) + + iterator2() + + rpki.async.iterator(child_certs, loop2, iterator1) + child_certs = child.child_certs() - if not child_certs: - continue - - # This will require a callback when we go event-driven - irdb_resources = self.gctx.irdb_query(child.self_id, child.child_id) - - for child_cert in child_certs: - ca_detail = child_cert.ca_detail() - if ca_detail.state != "active": - continue - old_resources = child_cert.cert.get_3779resources() - new_resources = irdb_resources.intersection(old_resources) - if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now): - rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI()) - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources) - elif old_resources.valid_until < now: - rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" - % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) - ca = ca_detail.ca() - parent = ca.parent() - repository = parent.repository() - child_cert.sql_delete() - ca_detail.generate_manifest() - repository.withdraw(child_cert.cert, child_cert.uri(ca)) + if child_certs: + self.gctx.irdb_query(child.self_id, child.child_id, got_resources) + else: + iterator1() + + rpki.async.iterator(self.children(), loop1, cb) - cb() - def regenerate_crls_and_manifests(self): + def regenerate_crls_and_manifests(self, cb): """Generate new CRLs and manifests as necessary for all of this self's CAs. Extracting nextUpdate from a manifest is hard at the moment due to implementation silliness, so for now we generate a @@ -223,22 +247,44 @@ class self_elt(data_elt): rpki.log.trace() now = rpki.sundial.now() - for parent in self.parents(): + + def loop1(iterator1, parent): repository = parent.repository() - for ca in parent.cas(): - for ca_detail in ca.fetch_revoked(): - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.delete(ca, repository) - ca_detail = ca.fetch_active() - if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl() - ca_detail.generate_manifest() - - def update_roas(self): + + def loop2(iterator2, ca): + + def loop3(iterator3, ca_detail): + ca_detail.delete(ca, repository, iterator3) + + def done3(): + + ca_detail = ca.fetch_active() + + def do_crl(): + ca_detail.generate_crl(do_manifest) + + def do_manifest(*ignored): + ca_detail.generate_manifest(iterator2) + + if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate(): + do_crl() + else: + iterator2() + + rpki.async.iterator([x for x in ca.fetch_revoked() if now > x.latest_crl.getNextUpdate()], loop3, done3) + + rpki.async.iterator(parent.cas(), loop2, iterator1) + + rpki.async.iterator(self.parents(), loop1, cb) + + + def update_roas(self, cb): """Generate or update ROAs for this self's route_origin objects.""" - for route_origin in self.route_origins(): - route_origin.update_roa() + def loop(iterator, route_origin): + route_origin.update_roa(iterator) + + rpki.async.iterator(self.route_origins(), loop, cb) class bsc_elt(data_elt): @@ -272,7 +318,7 @@ class bsc_elt(data_elt): """Fetch all child objects that link to this BSC object.""" return child_elt.sql_fetch_where(self.gctx, "bsc_id = %s", (self.bsc_id,)) - def serve_pre_save_hook(self, q_pdu, r_pdu): + def serve_pre_save_hook(self, q_pdu, r_pdu, cb): """Extra server actions for bsc_elt -- handle key generation. For now this only allows RSA with SHA-256. """ @@ -281,6 +327,7 @@ class bsc_elt(data_elt): self.private_key_id = rpki.x509.RSA.generate(keylength = q_pdu.key_length or 2048) self.pkcs10_request = rpki.x509.PKCS10.create(self.private_key_id) r_pdu.pkcs10_request = self.pkcs10_request + cb() class parent_elt(data_elt): """<parent/> element.""" @@ -309,37 +356,34 @@ class parent_elt(data_elt): """Fetch all CA objects that link to this parent object.""" return rpki.rpki_engine.ca_obj.sql_fetch_where(self.gctx, "parent_id = %s", (self.parent_id,)) - def serve_post_save_hook(self, q_pdu, r_pdu): + def serve_post_save_hook(self, q_pdu, r_pdu, cb): """Extra server actions for parent_elt.""" if q_pdu.rekey: - self.serve_rekey() - if q_pdu.revoke: - self.serve_revoke() - self.unimplemented_control("reissue") + self.serve_rekey(cb) + elif q_pdu.revoke: + self.serve_revoke(cb) + else: + self.unimplemented_control("reissue") + cb() - def serve_rekey(self): + def serve_rekey(self, cb): """Handle a left-right rekey action for this parent.""" - for ca in self.cas(): - ca.rekey() - def serve_revoke(self): + def loop(iterator, ca): + ca.rekey(iterator) + + rpki.async.iterator(self.cas(), loop, cb) + + def serve_revoke(self, cb): """Handle a left-right revoke action for this parent.""" - for ca in self.cas(): - ca.revoke() - def query_up_down(self, q_pdu): - """Client code for sending one up-down query PDU to this parent. + def loop(iterator, ca): + ca.revoke(iterator) - I haven't figured out yet whether this method should do something - clever like dispatching via a method in the response PDU payload, - or just hand back the whole response to the caller. In the long - run this will have to become event driven with a context object - that has methods of its own, but as this method is common code for - several different queries and I don't yet know what the response - processing looks like, it's too soon to tell what will make sense. + rpki.async.iterator(self.cas(), loop, cb) - For now, keep this dead simple lock step, rewrite it later. - """ + def query_up_down(self, q_pdu, cb): + """Client code for sending one up-down query PDU to this parent.""" rpki.log.trace() @@ -356,21 +400,21 @@ class parent_elt(data_elt): bsc.signing_cert, bsc.signing_cert_crl) - der = rpki.https.client(server_ta = (self.gctx.bpki_ta, - self.self().bpki_cert, self.self().bpki_glue, - self.bpki_https_cert, self.bpki_https_glue), - client_key = bsc.private_key_id, - client_cert = bsc.signing_cert, - msg = q_cms, - url = self.peer_contact_uri) - - r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta, - self.self().bpki_cert, self.self().bpki_glue, - self.bpki_cms_cert, self.bpki_cms_glue)) - - r_msg.payload.check_response() - return r_msg - + def unwrap(der): + r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta, + self.self().bpki_cert, self.self().bpki_glue, + self.bpki_cms_cert, self.bpki_cms_glue)) + r_msg.payload.check_response() + cb(r_msg) + + rpki.https.client(server_ta = (self.gctx.bpki_ta, + self.self().bpki_cert, self.self().bpki_glue, + self.bpki_https_cert, self.bpki_https_glue), + client_key = bsc.private_key_id, + client_cert = bsc.signing_cert, + msg = q_cms, + url = self.peer_contact_uri, + callback = unwrap) class child_elt(data_elt): """<child/> element.""" @@ -408,12 +452,13 @@ class child_elt(data_elt): raise rpki.exceptions.ClassNameMismatch, "Class name mismatch: child.self_id = %d, parent.self_id = %d" % (self.self_id, parent.self_id) return ca - def serve_post_save_hook(self, q_pdu, r_pdu): + def serve_post_save_hook(self, q_pdu, r_pdu, cb): """Extra server actions for child_elt.""" self.unimplemented_control("reissue") if self.clear_https_ta_cache: self.gctx.clear_https_ta_cache() self.clear_https_ta_cache = False + cb() def endElement(self, stack, name, text): """Handle subelements of <child/> element. These require special @@ -424,7 +469,7 @@ class child_elt(data_elt): if name in self.elements: self.clear_https_ta_cache = True - def serve_up_down(self, query): + def serve_up_down(self, query, callback): """Outer layer of server handling for one up-down PDU from this child.""" rpki.log.trace() @@ -438,19 +483,24 @@ class child_elt(data_elt): q_msg.payload.gctx = self.gctx if enforce_strict_up_down_xml_sender and q_msg.sender != str(self.child_id): raise rpki.exceptions.BadSender, "Unexpected XML sender %s" % q_msg.sender + + def done(r_msg): + # + # Exceptions from this point on are problematic, as we have no + # sane way of reporting errors in the error reporting mechanism. + # May require refactoring, ignore the issue for now. + # + r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id, + bsc.signing_cert, bsc.signing_cert_crl) + callback(r_cms) + try: - r_msg = q_msg.serve_top_level(self) + q_msg.serve_top_level(self, done) + except rpki.exceptions.NoActiveCA, data: + done(q_msg.serve_error(data)) except Exception, data: rpki.log.error(traceback.format_exc()) - r_msg = q_msg.serve_error(data) - # - # Exceptions from this point on are problematic, as we have no - # sane way of reporting errors in the error reporting mechanism. - # May require refactoring, ignore the issue for now. - # - r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id, - bsc.signing_cert, bsc.signing_cert_crl) - return r_cms + done(q_msg.serve_error(data)) class repository_elt(data_elt): """<repository/> element.""" @@ -468,41 +518,11 @@ class repository_elt(data_elt): bpki_https_cert = None bpki_https_glue = None - use_pubd = True - def parents(self): """Fetch all parent objects that link to this repository object.""" return parent_elt.sql_fetch_where(self.gctx, "repository_id = %s", (self.repository_id,)) - @staticmethod - def uri_to_filename(base, uri): - """Convert a URI to a filename. [TEMPORARY]""" - if not uri.startswith("rsync://"): - raise rpki.exceptions.BadURISyntax - filename = base + uri[len("rsync://"):] - if filename.find("//") >= 0 or filename.find("/../") >= 0 or filename.endswith("/.."): - raise rpki.exceptions.BadURISyntax - return filename - - @classmethod - def object_write(cls, base, uri, obj): - """Write an object to disk. [TEMPORARY]""" - rpki.log.trace() - filename = cls.uri_to_filename(base, uri) - dirname = os.path.dirname(filename) - if not os.path.isdir(dirname): - os.makedirs(dirname) - f = open(filename, "wb") - f.write(obj.get_DER()) - f.close() - - @classmethod - def object_delete(cls, base, uri): - """Delete an object from disk. [TEMPORARY]""" - rpki.log.trace() - os.remove(cls.uri_to_filename(base, uri)) - - def call_pubd(self, *pdus): + def call_pubd(self, callback, *pdus): """Send a message to publication daemon and return the response.""" rpki.log.trace() bsc = self.bsc() @@ -510,33 +530,31 @@ class repository_elt(data_elt): q_msg.type = "query" q_cms = rpki.publication.cms_msg.wrap(q_msg, bsc.private_key_id, bsc.signing_cert, bsc.signing_cert_crl) bpki_ta_path = (self.gctx.bpki_ta, self.self().bpki_cert, self.self().bpki_glue, self.bpki_https_cert, self.bpki_https_glue) - r_cms = rpki.https.client( + + def done(r_cms): + r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path) + assert len(r_msg) == 1 + callback(r_msg[0]) + + rpki.https.client( client_key = bsc.private_key_id, client_cert = bsc.signing_cert, server_ta = bpki_ta_path, url = self.peer_contact_uri, - msg = q_cms) - r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path) - assert len(r_msg) == 1 - return r_msg[0] + msg = q_cms, + callback = done) - def publish(self, obj, uri): - """Placeholder for publication operation. [TEMPORARY]""" + def publish(self, obj, uri, callback): + """Publish one object in the repository.""" rpki.log.trace() rpki.log.info("Publishing %s as %s" % (repr(obj), repr(uri))) - if self.use_pubd: - self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) - else: - self.object_write(self.gctx.publication_kludge_base, uri, obj) + self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj)) - def withdraw(self, obj, uri): - """Placeholder for publication withdrawal operation. [TEMPORARY]""" + def withdraw(self, obj, uri, callback): + """Withdraw one object from the repository.""" rpki.log.trace() rpki.log.info("Withdrawing %s from at %s" % (repr(obj), repr(uri))) - if self.use_pubd: - self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) - else: - self.object_delete(self.gctx.publication_kludge_base, uri) + self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri)) class route_origin_elt(data_elt): """<route_origin/> element.""" @@ -590,9 +608,10 @@ class route_origin_elt(data_elt): """Fetch all ca_detail objects that link to this route_origin object.""" return rpki.rpki_engine.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) - def serve_post_save_hook(self, q_pdu, r_pdu): + def serve_post_save_hook(self, q_pdu, r_pdu, cb): """Extra server actions for route_origin_elt.""" self.unimplemented_control("suppress_publication") + cb() def startElement(self, stack, name, attrs): """Handle <route_origin/> element. This requires special @@ -607,35 +626,37 @@ class route_origin_elt(data_elt): if self.ipv6 is not None: self.ipv6 = rpki.resource_set.roa_prefix_set_ipv6(self.ipv6) - def update_roa(self): + def update_roa(self, callback): """Bring this route_origin's ROA up to date if necesssary.""" if self.roa is None: - return self.generate_roa() + return self.generate_roa(callback) ca_detail = self.ca_detail() if ca_detail is None or ca_detail.state != "active": - return self.regenerate_roa() + return self.regenerate_roa(callback) regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin) if rpki.sundial.now() + regen_margin > self.cert.getNotAfter(): - return self.regenerate_roa() + return self.regenerate_roa(callback) ca_resources = ca_detail.latest_ca_cert.get_3779resources() ee_resources = self.cert.get_3779resources() if ee_resources.oversized(ca_resources): - return self.regenerate_roa() + return self.regenerate_roa(callback) v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() if ee_resources.v4 != v4 or ee_resources.v6 != v6: - return self.regenerate_roa() + return self.regenerate_roa(callback) + + callback() - def generate_roa(self): + def generate_roa(self, callback): """Generate a ROA based on this <route_origin/> object. At present this does not support ROAs with multiple signatures @@ -698,12 +719,17 @@ class route_origin_elt(data_elt): self.sql_store() repository = ca.parent().repository() - repository.publish(self.roa, self.roa_uri(ca)) - if self.publish_ee_separately: - repository.publish(self.cert, self.ee_uri(ca)) - ca_detail.generate_manifest() - def withdraw_roa(self, regenerate = False): + def one(): + repository.publish(self.cert, self.ee_uri(ca), two) + + def two(*ignored): + ca_detail.generate_manifest(callback) + + repository.publish(self.roa, self.roa_uri(ca), + one if self.publish_ee_separately else two) + + def withdraw_roa(self, callback, regenerate = False): """Withdraw ROA associated with this route_origin. In order to preserve make-before-break properties without @@ -721,24 +747,34 @@ class route_origin_elt(data_elt): if ca_detail.state != 'active': self.ca_detail_id = None + + def one(*ignored): + rpki.log.debug("Withdrawing ROA and revoking its EE cert") + rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) + repository.withdraw(roa, roa_uri, + two if self.publish_ee_separately else three) + + def two(): + repository.withdraw(cert, ee_uri, three) + + def three(*ignored): + self.gctx.sql.sweep() + ca_detail.generate_crl(four) + + def four(*ignored): + ca_detail.generate_manifest(callback) + if regenerate: - self.generate_roa() - - rpki.log.debug("Withdrawing ROA and revoking its EE cert") - rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - repository.withdraw(roa, roa_uri) - if self.publish_ee_separately: - repository.withdraw(cert, ee_uri) - self.gctx.sql.sweep() - ca_detail.generate_crl() - ca_detail.generate_manifest() - - def regenerate_roa(self): + self.generate_roa(one) + else: + one() + + def regenerate_roa(self, callback): """Reissue ROA associated with this route_origin.""" if self.ca_detail() is None: - self.generate_roa() + self.generate_roa(callback) else: - self.withdraw_roa(regenerate = True) + self.withdraw_roa(callback, regenerate = True) def roa_uri(self, ca, key = None): """Return the publication URI for this route_origin's ROA.""" @@ -814,14 +850,19 @@ class msg(rpki.xml_utils.msg, left_right_namespace): for x in (self_elt, child_elt, parent_elt, bsc_elt, repository_elt, route_origin_elt, list_resources_elt, report_error_elt)) - def serve_top_level(self, gctx): + def serve_top_level(self, gctx, cb): """Serve one msg PDU.""" r_msg = self.__class__() r_msg.type = "reply" - for q_pdu in self: + + def loop(iterator, q_pdu): q_pdu.gctx = gctx - q_pdu.serve_dispatch(r_msg) - return r_msg + q_pdu.serve_dispatch(r_msg, iterator) + + def done(): + cb(r_msg) + + rpki.async.iterator(self, loop, done) class sax_handler(rpki.xml_utils.sax_handler): """SAX handler for Left-Right protocol.""" |