diff options
-rw-r--r-- | rpkid/rpki/https.py | 38 | ||||
-rw-r--r-- | rpkid/testbed.py | 548 |
2 files changed, 399 insertions, 187 deletions
diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index a0443c01..6dcf58be 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -33,6 +33,9 @@ disable_tls_certificate_validation_exceptions = False # Chatter about TLS certificates debug_tls_certs = False +# Debugging hack while converting to event-driven I/O model +trace_synchronous_calls = False + rpki_content_type = "application/x-rpki" def tlslite_certChain(x509): @@ -149,7 +152,7 @@ class httpsClient(tlslite.api.HTTPTLSConnection): self.checker = Checker(trust_anchor = server_ta) -def client(msg, client_key, client_cert, server_ta, url, timeout = 300): +def client(msg, client_key, client_cert, server_ta, url, timeout = 300, callback = None): """Open client HTTPS connection, send a message, wait for response. This function wraps most of what one needs to do to send a message @@ -157,7 +160,11 @@ def client(msg, client_key, client_cert, server_ta, url, timeout = 300): up to snuff; it's better than with the other packages I've found, but doesn't appear to handle subjectAltName extensions (sigh). """ - + + # This is an easy way to find synchronous calls that need conversion + if trace_synchronous_calls and callback is None: + raise RuntimeError, "Syncronous call to rpki.http.client()" + u = urlparse.urlparse(url) assert u.scheme in ("", "https") and \ @@ -186,12 +193,20 @@ def client(msg, client_key, client_cert, server_ta, url, timeout = 300): httpc.sock.settimeout(timeout) httpc.request("POST", u.path, msg, {"Content-Type" : rpki_content_type}) response = httpc.getresponse() - if response.status == httplib.OK: - return response.read() + rpki.log.debug("HTTPS client returned") + r = response.read() + if response.status != httplib.OK: + rpki.log.debug("HTTPS client returned failure") + r = rpki.exceptions.HTTPRequestFailed("HTTP request failed with status %s, response %s" % (response.status, r)) + if callback is not None: + rpki.log.debug("HTTPS client callback supplied, using it") + callback(r) + elif response.status == httplib.OK: + rpki.log.debug("HTTPS no client callback, returning success") + return r else: - r = response.read() - raise rpki.exceptions.HTTPRequestFailed, \ - "HTTP request failed with status %s, response %s" % (response.status, r) + rpki.log.debug("HTTPS no client callback, raising exception") + raise r class requestHandler(BaseHTTPServer.BaseHTTPRequestHandler): """Derived type to supply POST handler and override logging.""" @@ -263,6 +278,15 @@ class httpsServer(tlslite.api.TLSSocketServerMixIn, BaseHTTPServer.HTTPServer): rpki.log.warn("TLS handshake failure: " + str(error)) return False + def handle_error(self, request, client_address): + """Override SOcketServer error handling. This may be wrong in the + long run, but at the moment I'm seeing the server hang while + trying to shut down, because the default handler is intercepting + ServerShuttingDown in certain states, for reasons unknown. + """ + + raise + def server(handlers, server_key, server_cert, port = 4433, host ="", client_ta = None, dynamic_https_trust_anchor = None, catch_signals = (signal.SIGINT, signal.SIGTERM)): """Run an HTTPS server and wait (forever) for connections.""" diff --git a/rpkid/testbed.py b/rpkid/testbed.py index f3afa2d5..16cf2b7c 100644 --- a/rpkid/testbed.py +++ b/rpkid/testbed.py @@ -34,7 +34,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml +import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml, traceback import rpki.resource_set, rpki.sundial, rpki.x509, rpki.https import rpki.log, rpki.left_right, rpki.config, rpki.publication @@ -46,6 +46,9 @@ cfg_file = "testbed.conf" yaml_script = None profile = False +# Debugging hack whiel converting to event-driven I/O +rpki.https.trace_synchronous_calls = True + opts,argv = getopt.getopt(sys.argv[1:], "c:hpy:?", ["config=", "help", "profile", "yaml="]) for o,a in opts: if o in ("-h", "--help", "-?"): @@ -123,136 +126,190 @@ pub_sql_file = cfg.get("pub_sql_file", "pubd.sql") startup_delay = int(cfg.get("startup_delay", "10")) -def main(): - """Main program, up front to make control logic more obvious.""" +class async_iterator(object): + """Experimental iteration construct for event-driven code. This + belongs in the library eventually, but it's easier to debug the + initial version here. + """ + + def __init__(self, iterable, handler_cb, done_cb): + self.handler_cb = handler_cb + self.done_cb = done_cb + self.iterator = iter(iterable) - rpki.log.init(testbed_name) - rpki.log.info("Starting") + def __call__(self): + try: + self.handler_cb(self.iterator.next()) + except StopIteration: + if self.done_cb is not None: + self.done_cb() + +class main(object): + """Main program, implemented as a class to handle asynchronous I/O + in underlying libraries. + """ - signal.signal(signal.SIGALRM, wakeup) + def __init__(self): - pubd_process = None - rootd_process = None - rsyncd_process = None + rpki.log.init(testbed_name) + rpki.log.info("Starting") - rpki_sql = mangle_sql(rpki_sql_file) - irdb_sql = mangle_sql(irdb_sql_file) - pubd_sql = mangle_sql(pub_sql_file) + signal.signal(signal.SIGALRM, wakeup) - rpki.log.info("Initializing test directory") + self.pubd_process = None + self.rootd_process = None + self.rsyncd_process = None + + rpki_sql = mangle_sql(rpki_sql_file) + irdb_sql = mangle_sql(irdb_sql_file) + pubd_sql = mangle_sql(pub_sql_file) + + rpki.log.info("Initializing test directory") + + # Connect to test directory, creating it if necessary + try: + os.chdir(testbed_dir) + except: + os.makedirs(testbed_dir) + os.chdir(testbed_dir) - # Connect to test directory, creating it if necessary - try: - os.chdir(testbed_dir) - except: - os.makedirs(testbed_dir) - os.chdir(testbed_dir) + # Discard everything but keys, which take a while to generate + for root, dirs, files in os.walk(".", topdown = False): + for file in files: + if not file.endswith(".key"): + os.remove(os.path.join(root, file)) + for dir in dirs: + os.rmdir(os.path.join(root, dir)) - # Discard everything but keys, which take a while to generate - for root, dirs, files in os.walk(".", topdown = False): - for file in files: - if not file.endswith(".key"): - os.remove(os.path.join(root, file)) - for dir in dirs: - os.rmdir(os.path.join(root, dir)) + rpki.log.info("Reading master YAML configuration") + y = yaml_script.pop(0) + + rpki.log.info("Constructing internal allocation database") + self.db = allocation_db(y) + + rpki.log.info("Constructing BPKI keys and certs for rootd") + setup_bpki_cert_chain(rootd_name, ee = ("RPKI",)) + + rpki.log.info("Constructing BPKI keys and certs for pubd") + setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE")) + + for a in self.db: + a.setup_bpki_certs() + + setup_publication(pubd_sql) + setup_rootd(self.db.root.name, "SELF-1", y.get("rootd", {})) + setup_rsyncd() + setup_rcynic() + + for a in self.db.engines: + a.setup_conf_file() + a.setup_sql(rpki_sql, irdb_sql) + a.sync_sql() + + try: - rpki.log.info("Reading master YAML configuration") - y = yaml_script.pop(0) + # The changes to make this code run event-driven almost + # certainly break the original intent of this try/finally logic. + # Will need clean up after I/O core change. - rpki.log.info("Constructing internal allocation database") - db = allocation_db(y) + rpki.log.info("Starting rootd") + self.rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf")) - rpki.log.info("Constructing BPKI keys and certs for rootd") - setup_bpki_cert_chain(rootd_name, ee = ("RPKI",)) + rpki.log.info("Starting pubd") + self.pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ())) - rpki.log.info("Constructing BPKI keys and certs for pubd") - setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE")) + rpki.log.info("Starting rsyncd") + self.rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf")) - for a in db: - a.setup_bpki_certs() + # Start rpkid and irdbd instances + for a in self.db.engines: + a.run_daemons() - setup_publication(pubd_sql) - setup_rootd(db.root.name, "SELF-1", y.get("rootd", {})) - setup_rsyncd() - setup_rcynic() + rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay) + time.sleep(startup_delay) - for a in db.engines: - a.setup_conf_file() - a.setup_sql(rpki_sql, irdb_sql) - a.sync_sql() + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.db.engines, self.create_rpki_objects, self.created_rpki_objects) + self.iterator() - try: + # At this point we have gone into (pseudo) event-driven code. + # See comments above about cleanup of this try/finally code - rpki.log.info("Starting rootd") - rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf")) + # Clean up - rpki.log.info("Starting pubd") - pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ())) + finally: - rpki.log.info("Starting rsyncd") - rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf")) + try: + rpki.log.info("Shutting down") + for a in self.db.engines: + a.kill_daemons() + for p,n in ((self.rootd_process, "rootd"), (self.pubd_process, "pubd"), (self.rsyncd_process, "rsyncd")): + if p is not None: + rpki.log.info("Killing %s" % n) + os.kill(p.pid, signal.SIGTERM) + except Exception, data: + rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) - # Start rpkid and irdbd instances - for a in db.engines: - a.run_daemons() - rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay) - time.sleep(startup_delay) + def create_rpki_objects(self, a): + """Create objects in RPKI engines""" + a.create_rpki_objects() + self.iterator() - # Create objects in RPKI engines - for a in db.engines: - a.create_rpki_objects() + def created_rpki_objects(self): + del self.iterator # Setup keys and certs and write YAML files for leaves - for a in db.leaves: + for a in self.db.leaves: a.setup_yaml_leaf() # Set pubd's BPKI CRL - set_pubd_crl() + set_pubd_crl(self.yaml_loop) - # Loop until we run out of control YAML - while True: + def yaml_loop(self, *ignored): - # This is probably where we should be updating expired BPKI - # objects, particular CRLs + # This is probably where we should be updating expired BPKI + # objects, particular CRLs - # Run cron in all RPKI instances - for a in db.engines: - a.run_cron() + # Run cron in all RPKI instances + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.db.engines, self.run_cron, self.run_yaml) + self.iterator() - # Run all YAML clients - for a in db.leaves: - a.run_yaml() + def run_cron(self, a): + a.run_cron(self.run_cron_cb) - # Run rcynic to check results - run_rcynic() + def run_cron_cb(self, *ignored): + self.iterator() - # If we've run out of deltas to apply, we're done - if not yaml_script: - rpki.log.info("No more deltas to apply, done") - break + def run_yaml(self): + del self.iterator - rpki.log.info("Applying deltas") - db.apply_delta(yaml_script.pop(0)) + # Run all YAML clients + for a in self.db.leaves: + a.run_yaml() - # Resync IRDBs - for a in db.engines: - a.sync_sql() + # Run rcynic to check results + run_rcynic() - # Clean up + # If we've run out of deltas to apply, we're done + if not yaml_script: - finally: + rpki.log.info("No more deltas to apply, done") - try: - rpki.log.info("Shutting down") - for a in db.engines: - a.kill_daemons() - for p,n in ((rootd_process, "rootd"), (pubd_process, "pubd"), (rsyncd_process, "rsyncd")): - if p is not None: - rpki.log.info("Killing %s" % n) - os.kill(p.pid, signal.SIGTERM) - except Exception, data: - rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) + else: + + rpki.log.info("Applying deltas") + self.db.apply_delta(yaml_script.pop(0), self.apply_delta_done) + + def apply_delta_done(self): + + # Resync IRDBs + for a in self.db.engines: + a.sync_sql() + + # Loop until we run out of control YAML + self.yaml_loop() def wakeup(signum, frame): """Handler called when we receive a SIGALRM signal.""" @@ -340,17 +397,31 @@ class allocation_db(list): for i, a in zip(range(len(self.engines)), self.engines): a.set_engine_number(i) - def apply_delta(self, delta): + def apply_delta(self, delta, cb): """Apply a delta or run a command.""" if delta is None: - return - for d in delta: - if isinstance(d, str): - c = d.split() - cmds[c[0]](*c[1:]) - else: - self.map[d["name"]].apply_delta(d) + cb() + else: + self.cb = cb + assert not hasattr(self, "iterator") + self.iterator = async_iterator(delta, self.apply_one_delta, self.apply_delta_done) + self.iterator() + + def apply_one_delta(self, d): + if isinstance(d, str): + c = d.split() + cmds[c[0]](*c[1:]) + self.iterator() + else: + self.map[d["name"]].apply_delta(d, self.apply_one_delta_cb) + + def apply_one_delta_cb(self, *ignored): + self.iterator() + + def apply_delta_done(self): + del self.iterator self.root.closure() + self.cb() def dump(self): """Print content of the database.""" @@ -402,53 +473,98 @@ class allocation(object): self.resources = resources return resources - def apply_delta(self, yaml): + def apply_delta(self, yaml, cb): """Apply deltas to this entity.""" rpki.log.info("Applying delta: %s" % yaml) - for k,v in yaml.items(): - if k != "name": - getattr(self, "apply_" + k)(v) - - def apply_add_as(self, text): self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text)) - def apply_add_v4(self, text): self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text)) - def apply_add_v6(self, text): self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text)) - def apply_sub_as(self, text): self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text)) - def apply_sub_v4(self, text): self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text)) - def apply_sub_v6(self, text): self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text)) - - def apply_valid_until(self, stamp): self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp) - def apply_valid_for(self, text): self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text) - def apply_valid_add(self, text): self.base.valid_until += rpki.sundial.timedelta.parse(text) - def apply_valid_sub(self, text): self.base.valid_until -= rpki.sundial.timedelta.parse(text) - - def apply_route_origin_add(self, yaml): + self.apply_delta_caller_cb = cb + assert not hasattr(self, "iterator") + self.iterator = async_iterator(yaml.items(), self.apply_one_delta, self.apply_delta_done) + self.iterator() + + def apply_one_delta(self, kv): + if kv[0] != "name": + getattr(self, "apply_" + kv[0])(kv[1], self.apply_one_delta_cb) + else: + self.apply_one_delta_cb() + + def apply_one_delta_cb(self, *ignored): + self.iterator() + + def apply_delta_done(self): + del self.iterator + self.apply_delta_caller_cb() + + def apply_add_as(self, text, cb): + self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text)) + cb() + + def apply_add_v4(self, text, cb): + self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text)) + cb() + + def apply_add_v6(self, text, cb): + self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text)) + cb() + + def apply_sub_as(self, text, cb): + self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text)) + cb() + + def apply_sub_v4(self, text, cb): + self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text)) + cb() + + def apply_sub_v6(self, text, cb): + self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text)) + cb() + + def apply_valid_until(self, stamp, cb): + self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp) + cb() + + def apply_valid_for(self, text, cb): + self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text) + cb() + + def apply_valid_add(self, text, cb): + self.base.valid_until += rpki.sundial.timedelta.parse(text) + cb() + + def apply_valid_sub(self, text, cb): + self.base.valid_until -= rpki.sundial.timedelta.parse(text) + cb() + + def apply_route_origin_add(self, yaml, cb): for y in yaml: self.route_origins.add(route_origin.parse(y)) + cb() - def apply_route_origin_del(self, yaml): + def apply_route_origin_del(self, yaml, cb): for y in yaml: self.route_origins.remove(route_origin.parse(y)) + cb() - def apply_rekey(self, target): + def apply_rekey(self, target, cb): if self.is_leaf(): raise RuntimeError, "Can't rekey YAML leaf %s, sorry" % self.name elif target is None: rpki.log.info("Rekeying <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes")) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = cb) else: rpki.log.info("Rekeying <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes")) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = cb) - def apply_revoke(self, target): + def apply_revoke(self, target, cb): if self.is_leaf(): rpki.log.info("Attempting to revoke YAML leaf %s" % self.name) subprocess.check_call((prog_python, prog_poke, "-y", self.name + ".yaml", "-r", "revoke")) + cb() elif target is None: rpki.log.info("Revoking <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes")) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = cb) else: rpki.log.info("Revoking <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes")) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = cb) def __str__(self): s = self.name + "\n" @@ -549,35 +665,46 @@ class allocation(object): rpki.log.info("Killing daemons for %s" % self.name) for proc in (self.rpkid_process, self.irdbd_process): try: + rpki.log.info("Killing pid %d" % proc.pid) os.kill(proc.pid, signal.SIGTERM) except: pass proc.wait() - def call_rpkid(self, *pdu): + def call_rpkid(self, pdu, cb): """Send a left-right message to this entity's RPKI daemon and return the response. """ rpki.log.info("Calling rpkid for %s" % self.name) - msg = rpki.left_right.msg(pdu) + msg = rpki.left_right.msg([pdu]) msg.type = "query" cms, xml = rpki.left_right.cms_msg.wrap(msg, self.irbe_key, self.irbe_cert, pretty_print = True) rpki.log.debug(xml) url = "https://localhost:%d/left-right" % self.rpki_port - der = rpki.https.client( + + self.call_rpkid_caller_cb = cb + + rpki.https.client( client_key = self.irbe_key, client_cert = self.irbe_cert, server_ta = self.rpkid_ta, url = url, - msg = cms) - msg, xml = rpki.left_right.cms_msg.unwrap(der, (self.rpkid_ta, self.rpkid_cert), + msg = cms, + callback = self.call_rpkid_cb) + + rpki.log.info("Call to rpkid %s returned" % self.name) + + def call_rpkid_cb(self, val): + if isinstance(val, Exception): + raise val + msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert), pretty_print = True) rpki.log.debug(xml) assert msg.type == "reply" for pdu in msg: assert not isinstance(pdu, rpki.left_right.report_error_elt) - return msg[0] if len(msg) == 1 else msg + self.call_rpkid_caller_cb(msg[0] if len(msg) == 1 else msg) def cross_certify(self, certificant, reverse = False): """Cross-certify and return the resulting certificate.""" @@ -627,18 +754,24 @@ class allocation(object): self_ca = rpki.x509.X509(Auto_file = self.name + "-SELF-1.cer") rpki.log.info("Creating rpkid self object for %s" % self.name) - self.self_id = self.call_rpkid(rpki.left_right.self_elt.make_pdu( - action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca)).self_id + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca), + cb = self.create_rpki_objects_1) + + def create_rpki_objects_1(self, val): + self.self_id = val.self_id rpki.log.info("Creating rpkid BSC object for %s" % self.name) - pdu = self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True)) - self.bsc_id = pdu.bsc_id + self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True), + cb = self.create_rpki_objects_2) + + def create_rpki_objects_2(self, val): + self.bsc_id = val.bsc_id rpki.log.info("Issuing BSC EE cert for %s" % self.name) cmd = (prog_openssl, "x509", "-req", "-sha256", "-extfile", self.name + "-RPKI.conf", "-extensions", "req_x509_ext", "-days", "30", "-CA", self.name + "-SELF-1.cer", "-CAkey", self.name + "-SELF-1.key", "-CAcreateserial", "-text") signer = subprocess.Popen(cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) - signed = signer.communicate(input = pdu.pkcs10_request.get_PEM()) + signed = signer.communicate(input = val.pkcs10_request.get_PEM()) if not signed[0]: rpki.log.error(signed[1]) raise RuntimeError, "Couldn't issue BSC EE certificate" @@ -646,53 +779,99 @@ class allocation(object): bsc_crl = rpki.x509.CRL(PEM_file = self.name + "-SELF-1.crl") rpki.log.info("Installing BSC EE cert for %s" % self.name) - self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl)) + self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl), + cb = self.create_rpki_objects_3) + + def create_rpki_objects_3(self, val): rpki.log.info("Creating pubd client object for %s" % self.name) client_cert = self.cross_certify(pubd_name + "-TA", reverse = True) - client_id = call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert)).client_id + call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert), + cb = self.create_rpki_objects_4) + + def create_rpki_objects_4(self, val): + client_id = val.client_id rpki.log.info("Creating rpkid repository object for %s" % self.name) repository_cert = self.cross_certify(pubd_name + "-TA") - self.repository_id = self.call_rpkid(rpki.left_right.repository_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, - bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert, - peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id))).repository_id + self.call_rpkid(rpki.left_right.repository_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert, + peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id)), + cb = self.create_rpki_objects_5) + + def create_rpki_objects_5(self, val): + self.repository_id = val.repository_id rpki.log.info("Creating rpkid parent object for %s" % self.name) if self.is_root(): rootd_cert = self.cross_certify(rootd_name + "-TA") - self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base, - bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus", - peer_contact_uri = "https://localhost:%s/" % rootd_port)).parent_id + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + repository_id = self.repository_id, sia_base = self.sia_base, + bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus", + peer_contact_uri = "https://localhost:%s/" % rootd_port), + cb = self.create_rpki_objects_6) else: parent_cms_cert = self.cross_certify(self.parent.name + "-SELF-1") parent_https_cert = self.cross_certify(self.parent.name + "-TA") - self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base, - bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert, sender_name = self.name, recipient_name = self.parent.name, - peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id))).parent_id + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + repository_id = self.repository_id, sia_base = self.sia_base, + bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert, + sender_name = self.name, recipient_name = self.parent.name, + peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id)), + cb = self.create_rpki_objects_6) + + def create_rpki_objects_6(self, val): + self.parent_id = val.parent_id rpki.log.info("Creating rpkid child objects for %s" % self.name) - db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) - cur = db.cursor() - for kid in self.kids: - if kid.is_leaf(): - bpki_cert = self.cross_certify(kid.name + "-TA") - else: - bpki_cert = self.cross_certify(kid.name + "-SELF-1") - rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) - kid.child_id = self.call_rpkid(rpki.left_right.child_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert)).child_id - cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, kid.child_id, kid.name)) - db.close() + self.sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) + self.sql_cur = self.sql_db.cursor() + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.kids, self.create_rpki_objects_7, self.create_rpki_objects_8) + self.iterator() + + def create_rpki_objects_7(self, kid): + self.kid = kid + if kid.is_leaf(): + bpki_cert = self.cross_certify(kid.name + "-TA") + else: + bpki_cert = self.cross_certify(kid.name + "-SELF-1") + rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) + self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert), + cb = self.create_rpki_objects_7_cb) + + def create_rpki_objects_7_cb(self, val): + self.kid.child_id = val.child_id + self.sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, self.kid.child_id, self.kid.name)) + self.iterator() + + def create_rpki_objects_8(self): + self.sql_db.close() + del self.iterator + del self.sql_cur + del self.sql_db + if hasattr(self, "kid"): + del self.kid rpki.log.info("Creating rpkid route_origin objects for %s" % self.name) - for ro in self.route_origins: - ro.route_origin_id = self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu( - action = "create", self_id = self.self_id, - as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6)).route_origin_id + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.route_origins, self.create_rpki_objects_9, self.create_rpki_objects_10) + self.iterator() + + def create_rpki_objects_9(self, ro): + self.ro = ro + self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id, + as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6), + cb = self.create_rpki_objects_9_cb) + + def create_rpki_objects_9_cb(self, val): + self.ro.route_origin_id = val.route_origin_id + self.iterator() + + def create_rpki_objects_10(self): + if hasattr(self, "ro"): + del self.ro + del self.iterator def setup_yaml_leaf(self): """Generate certificates and write YAML scripts for leaf nodes. @@ -723,7 +902,7 @@ class allocation(object): "ski" : ski }) f.close() - def run_cron(self): + def run_cron(self, cb): """Trigger cron run for this engine.""" rpki.log.info("Running cron for %s" % self.name) @@ -731,7 +910,8 @@ class allocation(object): client_cert = self.irbe_cert, server_ta = self.rpkid_ta, url = "https://localhost:%d/cronjob" % self.rpki_port, - msg = "Run cron now, please") + msg = "Run cron now, please", + callback = cb) def run_yaml(self): """Run YAML scripts for this leaf entity. Since we're not @@ -839,40 +1019,48 @@ def setup_publication(pubd_sql): pubd_irbe_cert = rpki.x509.X509(Auto_file = pubd_name + "-IRBE.cer") pubd_pubd_cert = rpki.x509.X509(Auto_file = pubd_name + "-PUBD.cer") -def call_pubd(*pdu): +def call_pubd(pdu, cb): """Send a publication message to publication daemon and return the response. """ rpki.log.info("Calling pubd") - msg = rpki.publication.msg(pdu) + msg = rpki.publication.msg([pdu]) msg.type = "query" cms, xml = rpki.publication.cms_msg.wrap(msg, pubd_irbe_key, pubd_irbe_cert, pretty_print = True) rpki.log.debug(xml) url = "https://localhost:%d/control" % pubd_port - der = rpki.https.client( + + global call_pubd_caller_cb + call_pubd_caller_cb = cb # Global variable, icky + + rpki.https.client( client_key = pubd_irbe_key, client_cert = pubd_irbe_cert, server_ta = pubd_ta, url = url, - msg = cms) - msg, xml = rpki.publication.cms_msg.unwrap(der, (pubd_ta, pubd_pubd_cert), + msg = cms, + callback = call_pubd_cb) + +def call_pubd_cb(val): + if isinstance(val, Exception): + raise val + msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert), pretty_print = True) rpki.log.debug(xml) assert msg.type == "reply" for pdu in msg: assert not isinstance(pdu, rpki.publication.report_error_elt) - return msg[0] if len(msg) == 1 else msg + call_pubd_caller_cb(msg[0] if len(msg) == 1 else msg) -def set_pubd_crl(): +def set_pubd_crl(cb): """Whack publication daemon's bpki_crl. This must be configured before publication daemon starts talking to its clients, and must be updated whenever we update the CRL. """ rpki.log.info("Setting pubd's BPKI CRL") - call_pubd(rpki.publication.config_elt.make_pdu( - action = "set", - bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl"))) + call_pubd(rpki.publication.config_elt.make_pdu(action = "set", bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl")), + cb = cb) def run_rcynic(): """Run rcynic to see whether what was published makes sense.""" |