diff options
author | Rob Austein <sra@hactrn.net> | 2009-04-18 02:04:34 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-04-18 02:04:34 +0000 |
commit | 6075022dacc8dba19eb51c6a95d0db428f629243 (patch) | |
tree | 3323735b79bc6e138e556303a75e6cf5800f7586 | |
parent | 34daddbfddac735566edf9c16beaf1f8d7ccbbeb (diff) |
Checkpoint. Beginning of refactoring into a callback-based
architecture, to support an event-driven I/O core.
WARNING: At this point in the conversion, some of the programs will
not run indefinitely, because the partial conversion keeps the call
stack from ever unwinding all the way. I'm willing to tolerate this
temporarily as this allows me to keep running regression tests during
the conversion process, but it does mean that as of this checkin the
code is not even remotely suitable for non-testing use until the I/O
core rewrite is finished.
svn path=/rpkid/rpki/https.py; revision=2345
-rw-r--r-- | rpkid/rpki/https.py | 38 | ||||
-rw-r--r-- | rpkid/testbed.py | 548 |
2 files changed, 399 insertions, 187 deletions
diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index a0443c01..6dcf58be 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -33,6 +33,9 @@ disable_tls_certificate_validation_exceptions = False # Chatter about TLS certificates debug_tls_certs = False +# Debugging hack while converting to event-driven I/O model +trace_synchronous_calls = False + rpki_content_type = "application/x-rpki" def tlslite_certChain(x509): @@ -149,7 +152,7 @@ class httpsClient(tlslite.api.HTTPTLSConnection): self.checker = Checker(trust_anchor = server_ta) -def client(msg, client_key, client_cert, server_ta, url, timeout = 300): +def client(msg, client_key, client_cert, server_ta, url, timeout = 300, callback = None): """Open client HTTPS connection, send a message, wait for response. This function wraps most of what one needs to do to send a message @@ -157,7 +160,11 @@ def client(msg, client_key, client_cert, server_ta, url, timeout = 300): up to snuff; it's better than with the other packages I've found, but doesn't appear to handle subjectAltName extensions (sigh). """ - + + # This is an easy way to find synchronous calls that need conversion + if trace_synchronous_calls and callback is None: + raise RuntimeError, "Syncronous call to rpki.http.client()" + u = urlparse.urlparse(url) assert u.scheme in ("", "https") and \ @@ -186,12 +193,20 @@ def client(msg, client_key, client_cert, server_ta, url, timeout = 300): httpc.sock.settimeout(timeout) httpc.request("POST", u.path, msg, {"Content-Type" : rpki_content_type}) response = httpc.getresponse() - if response.status == httplib.OK: - return response.read() + rpki.log.debug("HTTPS client returned") + r = response.read() + if response.status != httplib.OK: + rpki.log.debug("HTTPS client returned failure") + r = rpki.exceptions.HTTPRequestFailed("HTTP request failed with status %s, response %s" % (response.status, r)) + if callback is not None: + rpki.log.debug("HTTPS client callback supplied, using it") + callback(r) + elif response.status == httplib.OK: + rpki.log.debug("HTTPS no client callback, returning success") + return r else: - r = response.read() - raise rpki.exceptions.HTTPRequestFailed, \ - "HTTP request failed with status %s, response %s" % (response.status, r) + rpki.log.debug("HTTPS no client callback, raising exception") + raise r class requestHandler(BaseHTTPServer.BaseHTTPRequestHandler): """Derived type to supply POST handler and override logging.""" @@ -263,6 +278,15 @@ class httpsServer(tlslite.api.TLSSocketServerMixIn, BaseHTTPServer.HTTPServer): rpki.log.warn("TLS handshake failure: " + str(error)) return False + def handle_error(self, request, client_address): + """Override SOcketServer error handling. This may be wrong in the + long run, but at the moment I'm seeing the server hang while + trying to shut down, because the default handler is intercepting + ServerShuttingDown in certain states, for reasons unknown. + """ + + raise + def server(handlers, server_key, server_cert, port = 4433, host ="", client_ta = None, dynamic_https_trust_anchor = None, catch_signals = (signal.SIGINT, signal.SIGTERM)): """Run an HTTPS server and wait (forever) for connections.""" diff --git a/rpkid/testbed.py b/rpkid/testbed.py index f3afa2d5..16cf2b7c 100644 --- a/rpkid/testbed.py +++ b/rpkid/testbed.py @@ -34,7 +34,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml +import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml, traceback import rpki.resource_set, rpki.sundial, rpki.x509, rpki.https import rpki.log, rpki.left_right, rpki.config, rpki.publication @@ -46,6 +46,9 @@ cfg_file = "testbed.conf" yaml_script = None profile = False +# Debugging hack whiel converting to event-driven I/O +rpki.https.trace_synchronous_calls = True + opts,argv = getopt.getopt(sys.argv[1:], "c:hpy:?", ["config=", "help", "profile", "yaml="]) for o,a in opts: if o in ("-h", "--help", "-?"): @@ -123,136 +126,190 @@ pub_sql_file = cfg.get("pub_sql_file", "pubd.sql") startup_delay = int(cfg.get("startup_delay", "10")) -def main(): - """Main program, up front to make control logic more obvious.""" +class async_iterator(object): + """Experimental iteration construct for event-driven code. This + belongs in the library eventually, but it's easier to debug the + initial version here. + """ + + def __init__(self, iterable, handler_cb, done_cb): + self.handler_cb = handler_cb + self.done_cb = done_cb + self.iterator = iter(iterable) - rpki.log.init(testbed_name) - rpki.log.info("Starting") + def __call__(self): + try: + self.handler_cb(self.iterator.next()) + except StopIteration: + if self.done_cb is not None: + self.done_cb() + +class main(object): + """Main program, implemented as a class to handle asynchronous I/O + in underlying libraries. + """ - signal.signal(signal.SIGALRM, wakeup) + def __init__(self): - pubd_process = None - rootd_process = None - rsyncd_process = None + rpki.log.init(testbed_name) + rpki.log.info("Starting") - rpki_sql = mangle_sql(rpki_sql_file) - irdb_sql = mangle_sql(irdb_sql_file) - pubd_sql = mangle_sql(pub_sql_file) + signal.signal(signal.SIGALRM, wakeup) - rpki.log.info("Initializing test directory") + self.pubd_process = None + self.rootd_process = None + self.rsyncd_process = None + + rpki_sql = mangle_sql(rpki_sql_file) + irdb_sql = mangle_sql(irdb_sql_file) + pubd_sql = mangle_sql(pub_sql_file) + + rpki.log.info("Initializing test directory") + + # Connect to test directory, creating it if necessary + try: + os.chdir(testbed_dir) + except: + os.makedirs(testbed_dir) + os.chdir(testbed_dir) - # Connect to test directory, creating it if necessary - try: - os.chdir(testbed_dir) - except: - os.makedirs(testbed_dir) - os.chdir(testbed_dir) + # Discard everything but keys, which take a while to generate + for root, dirs, files in os.walk(".", topdown = False): + for file in files: + if not file.endswith(".key"): + os.remove(os.path.join(root, file)) + for dir in dirs: + os.rmdir(os.path.join(root, dir)) - # Discard everything but keys, which take a while to generate - for root, dirs, files in os.walk(".", topdown = False): - for file in files: - if not file.endswith(".key"): - os.remove(os.path.join(root, file)) - for dir in dirs: - os.rmdir(os.path.join(root, dir)) + rpki.log.info("Reading master YAML configuration") + y = yaml_script.pop(0) + + rpki.log.info("Constructing internal allocation database") + self.db = allocation_db(y) + + rpki.log.info("Constructing BPKI keys and certs for rootd") + setup_bpki_cert_chain(rootd_name, ee = ("RPKI",)) + + rpki.log.info("Constructing BPKI keys and certs for pubd") + setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE")) + + for a in self.db: + a.setup_bpki_certs() + + setup_publication(pubd_sql) + setup_rootd(self.db.root.name, "SELF-1", y.get("rootd", {})) + setup_rsyncd() + setup_rcynic() + + for a in self.db.engines: + a.setup_conf_file() + a.setup_sql(rpki_sql, irdb_sql) + a.sync_sql() + + try: - rpki.log.info("Reading master YAML configuration") - y = yaml_script.pop(0) + # The changes to make this code run event-driven almost + # certainly break the original intent of this try/finally logic. + # Will need clean up after I/O core change. - rpki.log.info("Constructing internal allocation database") - db = allocation_db(y) + rpki.log.info("Starting rootd") + self.rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf")) - rpki.log.info("Constructing BPKI keys and certs for rootd") - setup_bpki_cert_chain(rootd_name, ee = ("RPKI",)) + rpki.log.info("Starting pubd") + self.pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ())) - rpki.log.info("Constructing BPKI keys and certs for pubd") - setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE")) + rpki.log.info("Starting rsyncd") + self.rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf")) - for a in db: - a.setup_bpki_certs() + # Start rpkid and irdbd instances + for a in self.db.engines: + a.run_daemons() - setup_publication(pubd_sql) - setup_rootd(db.root.name, "SELF-1", y.get("rootd", {})) - setup_rsyncd() - setup_rcynic() + rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay) + time.sleep(startup_delay) - for a in db.engines: - a.setup_conf_file() - a.setup_sql(rpki_sql, irdb_sql) - a.sync_sql() + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.db.engines, self.create_rpki_objects, self.created_rpki_objects) + self.iterator() - try: + # At this point we have gone into (pseudo) event-driven code. + # See comments above about cleanup of this try/finally code - rpki.log.info("Starting rootd") - rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf")) + # Clean up - rpki.log.info("Starting pubd") - pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ())) + finally: - rpki.log.info("Starting rsyncd") - rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf")) + try: + rpki.log.info("Shutting down") + for a in self.db.engines: + a.kill_daemons() + for p,n in ((self.rootd_process, "rootd"), (self.pubd_process, "pubd"), (self.rsyncd_process, "rsyncd")): + if p is not None: + rpki.log.info("Killing %s" % n) + os.kill(p.pid, signal.SIGTERM) + except Exception, data: + rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) - # Start rpkid and irdbd instances - for a in db.engines: - a.run_daemons() - rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay) - time.sleep(startup_delay) + def create_rpki_objects(self, a): + """Create objects in RPKI engines""" + a.create_rpki_objects() + self.iterator() - # Create objects in RPKI engines - for a in db.engines: - a.create_rpki_objects() + def created_rpki_objects(self): + del self.iterator # Setup keys and certs and write YAML files for leaves - for a in db.leaves: + for a in self.db.leaves: a.setup_yaml_leaf() # Set pubd's BPKI CRL - set_pubd_crl() + set_pubd_crl(self.yaml_loop) - # Loop until we run out of control YAML - while True: + def yaml_loop(self, *ignored): - # This is probably where we should be updating expired BPKI - # objects, particular CRLs + # This is probably where we should be updating expired BPKI + # objects, particular CRLs - # Run cron in all RPKI instances - for a in db.engines: - a.run_cron() + # Run cron in all RPKI instances + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.db.engines, self.run_cron, self.run_yaml) + self.iterator() - # Run all YAML clients - for a in db.leaves: - a.run_yaml() + def run_cron(self, a): + a.run_cron(self.run_cron_cb) - # Run rcynic to check results - run_rcynic() + def run_cron_cb(self, *ignored): + self.iterator() - # If we've run out of deltas to apply, we're done - if not yaml_script: - rpki.log.info("No more deltas to apply, done") - break + def run_yaml(self): + del self.iterator - rpki.log.info("Applying deltas") - db.apply_delta(yaml_script.pop(0)) + # Run all YAML clients + for a in self.db.leaves: + a.run_yaml() - # Resync IRDBs - for a in db.engines: - a.sync_sql() + # Run rcynic to check results + run_rcynic() - # Clean up + # If we've run out of deltas to apply, we're done + if not yaml_script: - finally: + rpki.log.info("No more deltas to apply, done") - try: - rpki.log.info("Shutting down") - for a in db.engines: - a.kill_daemons() - for p,n in ((rootd_process, "rootd"), (pubd_process, "pubd"), (rsyncd_process, "rsyncd")): - if p is not None: - rpki.log.info("Killing %s" % n) - os.kill(p.pid, signal.SIGTERM) - except Exception, data: - rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) + else: + + rpki.log.info("Applying deltas") + self.db.apply_delta(yaml_script.pop(0), self.apply_delta_done) + + def apply_delta_done(self): + + # Resync IRDBs + for a in self.db.engines: + a.sync_sql() + + # Loop until we run out of control YAML + self.yaml_loop() def wakeup(signum, frame): """Handler called when we receive a SIGALRM signal.""" @@ -340,17 +397,31 @@ class allocation_db(list): for i, a in zip(range(len(self.engines)), self.engines): a.set_engine_number(i) - def apply_delta(self, delta): + def apply_delta(self, delta, cb): """Apply a delta or run a command.""" if delta is None: - return - for d in delta: - if isinstance(d, str): - c = d.split() - cmds[c[0]](*c[1:]) - else: - self.map[d["name"]].apply_delta(d) + cb() + else: + self.cb = cb + assert not hasattr(self, "iterator") + self.iterator = async_iterator(delta, self.apply_one_delta, self.apply_delta_done) + self.iterator() + + def apply_one_delta(self, d): + if isinstance(d, str): + c = d.split() + cmds[c[0]](*c[1:]) + self.iterator() + else: + self.map[d["name"]].apply_delta(d, self.apply_one_delta_cb) + + def apply_one_delta_cb(self, *ignored): + self.iterator() + + def apply_delta_done(self): + del self.iterator self.root.closure() + self.cb() def dump(self): """Print content of the database.""" @@ -402,53 +473,98 @@ class allocation(object): self.resources = resources return resources - def apply_delta(self, yaml): + def apply_delta(self, yaml, cb): """Apply deltas to this entity.""" rpki.log.info("Applying delta: %s" % yaml) - for k,v in yaml.items(): - if k != "name": - getattr(self, "apply_" + k)(v) - - def apply_add_as(self, text): self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text)) - def apply_add_v4(self, text): self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text)) - def apply_add_v6(self, text): self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text)) - def apply_sub_as(self, text): self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text)) - def apply_sub_v4(self, text): self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text)) - def apply_sub_v6(self, text): self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text)) - - def apply_valid_until(self, stamp): self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp) - def apply_valid_for(self, text): self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text) - def apply_valid_add(self, text): self.base.valid_until += rpki.sundial.timedelta.parse(text) - def apply_valid_sub(self, text): self.base.valid_until -= rpki.sundial.timedelta.parse(text) - - def apply_route_origin_add(self, yaml): + self.apply_delta_caller_cb = cb + assert not hasattr(self, "iterator") + self.iterator = async_iterator(yaml.items(), self.apply_one_delta, self.apply_delta_done) + self.iterator() + + def apply_one_delta(self, kv): + if kv[0] != "name": + getattr(self, "apply_" + kv[0])(kv[1], self.apply_one_delta_cb) + else: + self.apply_one_delta_cb() + + def apply_one_delta_cb(self, *ignored): + self.iterator() + + def apply_delta_done(self): + del self.iterator + self.apply_delta_caller_cb() + + def apply_add_as(self, text, cb): + self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text)) + cb() + + def apply_add_v4(self, text, cb): + self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text)) + cb() + + def apply_add_v6(self, text, cb): + self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text)) + cb() + + def apply_sub_as(self, text, cb): + self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text)) + cb() + + def apply_sub_v4(self, text, cb): + self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text)) + cb() + + def apply_sub_v6(self, text, cb): + self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text)) + cb() + + def apply_valid_until(self, stamp, cb): + self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp) + cb() + + def apply_valid_for(self, text, cb): + self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text) + cb() + + def apply_valid_add(self, text, cb): + self.base.valid_until += rpki.sundial.timedelta.parse(text) + cb() + + def apply_valid_sub(self, text, cb): + self.base.valid_until -= rpki.sundial.timedelta.parse(text) + cb() + + def apply_route_origin_add(self, yaml, cb): for y in yaml: self.route_origins.add(route_origin.parse(y)) + cb() - def apply_route_origin_del(self, yaml): + def apply_route_origin_del(self, yaml, cb): for y in yaml: self.route_origins.remove(route_origin.parse(y)) + cb() - def apply_rekey(self, target): + def apply_rekey(self, target, cb): if self.is_leaf(): raise RuntimeError, "Can't rekey YAML leaf %s, sorry" % self.name elif target is None: rpki.log.info("Rekeying <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes")) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = cb) else: rpki.log.info("Rekeying <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes")) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = cb) - def apply_revoke(self, target): + def apply_revoke(self, target, cb): if self.is_leaf(): rpki.log.info("Attempting to revoke YAML leaf %s" % self.name) subprocess.check_call((prog_python, prog_poke, "-y", self.name + ".yaml", "-r", "revoke")) + cb() elif target is None: rpki.log.info("Revoking <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes")) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = cb) else: rpki.log.info("Revoking <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes")) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = cb) def __str__(self): s = self.name + "\n" @@ -549,35 +665,46 @@ class allocation(object): rpki.log.info("Killing daemons for %s" % self.name) for proc in (self.rpkid_process, self.irdbd_process): try: + rpki.log.info("Killing pid %d" % proc.pid) os.kill(proc.pid, signal.SIGTERM) except: pass proc.wait() - def call_rpkid(self, *pdu): + def call_rpkid(self, pdu, cb): """Send a left-right message to this entity's RPKI daemon and return the response. """ rpki.log.info("Calling rpkid for %s" % self.name) - msg = rpki.left_right.msg(pdu) + msg = rpki.left_right.msg([pdu]) msg.type = "query" cms, xml = rpki.left_right.cms_msg.wrap(msg, self.irbe_key, self.irbe_cert, pretty_print = True) rpki.log.debug(xml) url = "https://localhost:%d/left-right" % self.rpki_port - der = rpki.https.client( + + self.call_rpkid_caller_cb = cb + + rpki.https.client( client_key = self.irbe_key, client_cert = self.irbe_cert, server_ta = self.rpkid_ta, url = url, - msg = cms) - msg, xml = rpki.left_right.cms_msg.unwrap(der, (self.rpkid_ta, self.rpkid_cert), + msg = cms, + callback = self.call_rpkid_cb) + + rpki.log.info("Call to rpkid %s returned" % self.name) + + def call_rpkid_cb(self, val): + if isinstance(val, Exception): + raise val + msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert), pretty_print = True) rpki.log.debug(xml) assert msg.type == "reply" for pdu in msg: assert not isinstance(pdu, rpki.left_right.report_error_elt) - return msg[0] if len(msg) == 1 else msg + self.call_rpkid_caller_cb(msg[0] if len(msg) == 1 else msg) def cross_certify(self, certificant, reverse = False): """Cross-certify and return the resulting certificate.""" @@ -627,18 +754,24 @@ class allocation(object): self_ca = rpki.x509.X509(Auto_file = self.name + "-SELF-1.cer") rpki.log.info("Creating rpkid self object for %s" % self.name) - self.self_id = self.call_rpkid(rpki.left_right.self_elt.make_pdu( - action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca)).self_id + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca), + cb = self.create_rpki_objects_1) + + def create_rpki_objects_1(self, val): + self.self_id = val.self_id rpki.log.info("Creating rpkid BSC object for %s" % self.name) - pdu = self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True)) - self.bsc_id = pdu.bsc_id + self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True), + cb = self.create_rpki_objects_2) + + def create_rpki_objects_2(self, val): + self.bsc_id = val.bsc_id rpki.log.info("Issuing BSC EE cert for %s" % self.name) cmd = (prog_openssl, "x509", "-req", "-sha256", "-extfile", self.name + "-RPKI.conf", "-extensions", "req_x509_ext", "-days", "30", "-CA", self.name + "-SELF-1.cer", "-CAkey", self.name + "-SELF-1.key", "-CAcreateserial", "-text") signer = subprocess.Popen(cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) - signed = signer.communicate(input = pdu.pkcs10_request.get_PEM()) + signed = signer.communicate(input = val.pkcs10_request.get_PEM()) if not signed[0]: rpki.log.error(signed[1]) raise RuntimeError, "Couldn't issue BSC EE certificate" @@ -646,53 +779,99 @@ class allocation(object): bsc_crl = rpki.x509.CRL(PEM_file = self.name + "-SELF-1.crl") rpki.log.info("Installing BSC EE cert for %s" % self.name) - self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl)) + self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl), + cb = self.create_rpki_objects_3) + + def create_rpki_objects_3(self, val): rpki.log.info("Creating pubd client object for %s" % self.name) client_cert = self.cross_certify(pubd_name + "-TA", reverse = True) - client_id = call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert)).client_id + call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert), + cb = self.create_rpki_objects_4) + + def create_rpki_objects_4(self, val): + client_id = val.client_id rpki.log.info("Creating rpkid repository object for %s" % self.name) repository_cert = self.cross_certify(pubd_name + "-TA") - self.repository_id = self.call_rpkid(rpki.left_right.repository_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, - bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert, - peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id))).repository_id + self.call_rpkid(rpki.left_right.repository_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert, + peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id)), + cb = self.create_rpki_objects_5) + + def create_rpki_objects_5(self, val): + self.repository_id = val.repository_id rpki.log.info("Creating rpkid parent object for %s" % self.name) if self.is_root(): rootd_cert = self.cross_certify(rootd_name + "-TA") - self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base, - bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus", - peer_contact_uri = "https://localhost:%s/" % rootd_port)).parent_id + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + repository_id = self.repository_id, sia_base = self.sia_base, + bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus", + peer_contact_uri = "https://localhost:%s/" % rootd_port), + cb = self.create_rpki_objects_6) else: parent_cms_cert = self.cross_certify(self.parent.name + "-SELF-1") parent_https_cert = self.cross_certify(self.parent.name + "-TA") - self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base, - bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert, sender_name = self.name, recipient_name = self.parent.name, - peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id))).parent_id + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, + repository_id = self.repository_id, sia_base = self.sia_base, + bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert, + sender_name = self.name, recipient_name = self.parent.name, + peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id)), + cb = self.create_rpki_objects_6) + + def create_rpki_objects_6(self, val): + self.parent_id = val.parent_id rpki.log.info("Creating rpkid child objects for %s" % self.name) - db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) - cur = db.cursor() - for kid in self.kids: - if kid.is_leaf(): - bpki_cert = self.cross_certify(kid.name + "-TA") - else: - bpki_cert = self.cross_certify(kid.name + "-SELF-1") - rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) - kid.child_id = self.call_rpkid(rpki.left_right.child_elt.make_pdu( - action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert)).child_id - cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, kid.child_id, kid.name)) - db.close() + self.sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) + self.sql_cur = self.sql_db.cursor() + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.kids, self.create_rpki_objects_7, self.create_rpki_objects_8) + self.iterator() + + def create_rpki_objects_7(self, kid): + self.kid = kid + if kid.is_leaf(): + bpki_cert = self.cross_certify(kid.name + "-TA") + else: + bpki_cert = self.cross_certify(kid.name + "-SELF-1") + rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) + self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert), + cb = self.create_rpki_objects_7_cb) + + def create_rpki_objects_7_cb(self, val): + self.kid.child_id = val.child_id + self.sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, self.kid.child_id, self.kid.name)) + self.iterator() + + def create_rpki_objects_8(self): + self.sql_db.close() + del self.iterator + del self.sql_cur + del self.sql_db + if hasattr(self, "kid"): + del self.kid rpki.log.info("Creating rpkid route_origin objects for %s" % self.name) - for ro in self.route_origins: - ro.route_origin_id = self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu( - action = "create", self_id = self.self_id, - as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6)).route_origin_id + assert not hasattr(self, "iterator") + self.iterator = async_iterator(self.route_origins, self.create_rpki_objects_9, self.create_rpki_objects_10) + self.iterator() + + def create_rpki_objects_9(self, ro): + self.ro = ro + self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id, + as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6), + cb = self.create_rpki_objects_9_cb) + + def create_rpki_objects_9_cb(self, val): + self.ro.route_origin_id = val.route_origin_id + self.iterator() + + def create_rpki_objects_10(self): + if hasattr(self, "ro"): + del self.ro + del self.iterator def setup_yaml_leaf(self): """Generate certificates and write YAML scripts for leaf nodes. @@ -723,7 +902,7 @@ class allocation(object): "ski" : ski }) f.close() - def run_cron(self): + def run_cron(self, cb): """Trigger cron run for this engine.""" rpki.log.info("Running cron for %s" % self.name) @@ -731,7 +910,8 @@ class allocation(object): client_cert = self.irbe_cert, server_ta = self.rpkid_ta, url = "https://localhost:%d/cronjob" % self.rpki_port, - msg = "Run cron now, please") + msg = "Run cron now, please", + callback = cb) def run_yaml(self): """Run YAML scripts for this leaf entity. Since we're not @@ -839,40 +1019,48 @@ def setup_publication(pubd_sql): pubd_irbe_cert = rpki.x509.X509(Auto_file = pubd_name + "-IRBE.cer") pubd_pubd_cert = rpki.x509.X509(Auto_file = pubd_name + "-PUBD.cer") -def call_pubd(*pdu): +def call_pubd(pdu, cb): """Send a publication message to publication daemon and return the response. """ rpki.log.info("Calling pubd") - msg = rpki.publication.msg(pdu) + msg = rpki.publication.msg([pdu]) msg.type = "query" cms, xml = rpki.publication.cms_msg.wrap(msg, pubd_irbe_key, pubd_irbe_cert, pretty_print = True) rpki.log.debug(xml) url = "https://localhost:%d/control" % pubd_port - der = rpki.https.client( + + global call_pubd_caller_cb + call_pubd_caller_cb = cb # Global variable, icky + + rpki.https.client( client_key = pubd_irbe_key, client_cert = pubd_irbe_cert, server_ta = pubd_ta, url = url, - msg = cms) - msg, xml = rpki.publication.cms_msg.unwrap(der, (pubd_ta, pubd_pubd_cert), + msg = cms, + callback = call_pubd_cb) + +def call_pubd_cb(val): + if isinstance(val, Exception): + raise val + msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert), pretty_print = True) rpki.log.debug(xml) assert msg.type == "reply" for pdu in msg: assert not isinstance(pdu, rpki.publication.report_error_elt) - return msg[0] if len(msg) == 1 else msg + call_pubd_caller_cb(msg[0] if len(msg) == 1 else msg) -def set_pubd_crl(): +def set_pubd_crl(cb): """Whack publication daemon's bpki_crl. This must be configured before publication daemon starts talking to its clients, and must be updated whenever we update the CRL. """ rpki.log.info("Setting pubd's BPKI CRL") - call_pubd(rpki.publication.config_elt.make_pdu( - action = "set", - bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl"))) + call_pubd(rpki.publication.config_elt.make_pdu(action = "set", bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl")), + cb = cb) def run_rcynic(): """Run rcynic to see whether what was published makes sense.""" |