aboutsummaryrefslogtreecommitdiff
path: root/rpkid/testbed.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/testbed.py')
-rw-r--r--rpkid/testbed.py548
1 files changed, 368 insertions, 180 deletions
diff --git a/rpkid/testbed.py b/rpkid/testbed.py
index f3afa2d5..16cf2b7c 100644
--- a/rpkid/testbed.py
+++ b/rpkid/testbed.py
@@ -34,7 +34,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
-import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml
+import os, yaml, MySQLdb, subprocess, signal, time, re, getopt, sys, lxml, traceback
import rpki.resource_set, rpki.sundial, rpki.x509, rpki.https
import rpki.log, rpki.left_right, rpki.config, rpki.publication
@@ -46,6 +46,9 @@ cfg_file = "testbed.conf"
yaml_script = None
profile = False
+# Debugging hack whiel converting to event-driven I/O
+rpki.https.trace_synchronous_calls = True
+
opts,argv = getopt.getopt(sys.argv[1:], "c:hpy:?", ["config=", "help", "profile", "yaml="])
for o,a in opts:
if o in ("-h", "--help", "-?"):
@@ -123,136 +126,190 @@ pub_sql_file = cfg.get("pub_sql_file", "pubd.sql")
startup_delay = int(cfg.get("startup_delay", "10"))
-def main():
- """Main program, up front to make control logic more obvious."""
+class async_iterator(object):
+ """Experimental iteration construct for event-driven code. This
+ belongs in the library eventually, but it's easier to debug the
+ initial version here.
+ """
+
+ def __init__(self, iterable, handler_cb, done_cb):
+ self.handler_cb = handler_cb
+ self.done_cb = done_cb
+ self.iterator = iter(iterable)
- rpki.log.init(testbed_name)
- rpki.log.info("Starting")
+ def __call__(self):
+ try:
+ self.handler_cb(self.iterator.next())
+ except StopIteration:
+ if self.done_cb is not None:
+ self.done_cb()
+
+class main(object):
+ """Main program, implemented as a class to handle asynchronous I/O
+ in underlying libraries.
+ """
- signal.signal(signal.SIGALRM, wakeup)
+ def __init__(self):
- pubd_process = None
- rootd_process = None
- rsyncd_process = None
+ rpki.log.init(testbed_name)
+ rpki.log.info("Starting")
- rpki_sql = mangle_sql(rpki_sql_file)
- irdb_sql = mangle_sql(irdb_sql_file)
- pubd_sql = mangle_sql(pub_sql_file)
+ signal.signal(signal.SIGALRM, wakeup)
- rpki.log.info("Initializing test directory")
+ self.pubd_process = None
+ self.rootd_process = None
+ self.rsyncd_process = None
+
+ rpki_sql = mangle_sql(rpki_sql_file)
+ irdb_sql = mangle_sql(irdb_sql_file)
+ pubd_sql = mangle_sql(pub_sql_file)
+
+ rpki.log.info("Initializing test directory")
+
+ # Connect to test directory, creating it if necessary
+ try:
+ os.chdir(testbed_dir)
+ except:
+ os.makedirs(testbed_dir)
+ os.chdir(testbed_dir)
- # Connect to test directory, creating it if necessary
- try:
- os.chdir(testbed_dir)
- except:
- os.makedirs(testbed_dir)
- os.chdir(testbed_dir)
+ # Discard everything but keys, which take a while to generate
+ for root, dirs, files in os.walk(".", topdown = False):
+ for file in files:
+ if not file.endswith(".key"):
+ os.remove(os.path.join(root, file))
+ for dir in dirs:
+ os.rmdir(os.path.join(root, dir))
- # Discard everything but keys, which take a while to generate
- for root, dirs, files in os.walk(".", topdown = False):
- for file in files:
- if not file.endswith(".key"):
- os.remove(os.path.join(root, file))
- for dir in dirs:
- os.rmdir(os.path.join(root, dir))
+ rpki.log.info("Reading master YAML configuration")
+ y = yaml_script.pop(0)
+
+ rpki.log.info("Constructing internal allocation database")
+ self.db = allocation_db(y)
+
+ rpki.log.info("Constructing BPKI keys and certs for rootd")
+ setup_bpki_cert_chain(rootd_name, ee = ("RPKI",))
+
+ rpki.log.info("Constructing BPKI keys and certs for pubd")
+ setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE"))
+
+ for a in self.db:
+ a.setup_bpki_certs()
+
+ setup_publication(pubd_sql)
+ setup_rootd(self.db.root.name, "SELF-1", y.get("rootd", {}))
+ setup_rsyncd()
+ setup_rcynic()
+
+ for a in self.db.engines:
+ a.setup_conf_file()
+ a.setup_sql(rpki_sql, irdb_sql)
+ a.sync_sql()
+
+ try:
- rpki.log.info("Reading master YAML configuration")
- y = yaml_script.pop(0)
+ # The changes to make this code run event-driven almost
+ # certainly break the original intent of this try/finally logic.
+ # Will need clean up after I/O core change.
- rpki.log.info("Constructing internal allocation database")
- db = allocation_db(y)
+ rpki.log.info("Starting rootd")
+ self.rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf"))
- rpki.log.info("Constructing BPKI keys and certs for rootd")
- setup_bpki_cert_chain(rootd_name, ee = ("RPKI",))
+ rpki.log.info("Starting pubd")
+ self.pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ()))
- rpki.log.info("Constructing BPKI keys and certs for pubd")
- setup_bpki_cert_chain(pubd_name, ee = ("PUBD", "IRBE"))
+ rpki.log.info("Starting rsyncd")
+ self.rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf"))
- for a in db:
- a.setup_bpki_certs()
+ # Start rpkid and irdbd instances
+ for a in self.db.engines:
+ a.run_daemons()
- setup_publication(pubd_sql)
- setup_rootd(db.root.name, "SELF-1", y.get("rootd", {}))
- setup_rsyncd()
- setup_rcynic()
+ rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay)
+ time.sleep(startup_delay)
- for a in db.engines:
- a.setup_conf_file()
- a.setup_sql(rpki_sql, irdb_sql)
- a.sync_sql()
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(self.db.engines, self.create_rpki_objects, self.created_rpki_objects)
+ self.iterator()
- try:
+ # At this point we have gone into (pseudo) event-driven code.
+ # See comments above about cleanup of this try/finally code
- rpki.log.info("Starting rootd")
- rootd_process = subprocess.Popen((prog_python, prog_rootd, "-c", rootd_name + ".conf"))
+ # Clean up
- rpki.log.info("Starting pubd")
- pubd_process = subprocess.Popen((prog_python, prog_pubd, "-c", pubd_name + ".conf") + (("-p", pubd_name + ".prof") if profile else ()))
+ finally:
- rpki.log.info("Starting rsyncd")
- rsyncd_process = subprocess.Popen((prog_rsyncd, "--daemon", "--no-detach", "--config", rsyncd_name + ".conf"))
+ try:
+ rpki.log.info("Shutting down")
+ for a in self.db.engines:
+ a.kill_daemons()
+ for p,n in ((self.rootd_process, "rootd"), (self.pubd_process, "pubd"), (self.rsyncd_process, "rsyncd")):
+ if p is not None:
+ rpki.log.info("Killing %s" % n)
+ os.kill(p.pid, signal.SIGTERM)
+ except Exception, data:
+ rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data)
- # Start rpkid and irdbd instances
- for a in db.engines:
- a.run_daemons()
- rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay)
- time.sleep(startup_delay)
+ def create_rpki_objects(self, a):
+ """Create objects in RPKI engines"""
+ a.create_rpki_objects()
+ self.iterator()
- # Create objects in RPKI engines
- for a in db.engines:
- a.create_rpki_objects()
+ def created_rpki_objects(self):
+ del self.iterator
# Setup keys and certs and write YAML files for leaves
- for a in db.leaves:
+ for a in self.db.leaves:
a.setup_yaml_leaf()
# Set pubd's BPKI CRL
- set_pubd_crl()
+ set_pubd_crl(self.yaml_loop)
- # Loop until we run out of control YAML
- while True:
+ def yaml_loop(self, *ignored):
- # This is probably where we should be updating expired BPKI
- # objects, particular CRLs
+ # This is probably where we should be updating expired BPKI
+ # objects, particular CRLs
- # Run cron in all RPKI instances
- for a in db.engines:
- a.run_cron()
+ # Run cron in all RPKI instances
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(self.db.engines, self.run_cron, self.run_yaml)
+ self.iterator()
- # Run all YAML clients
- for a in db.leaves:
- a.run_yaml()
+ def run_cron(self, a):
+ a.run_cron(self.run_cron_cb)
- # Run rcynic to check results
- run_rcynic()
+ def run_cron_cb(self, *ignored):
+ self.iterator()
- # If we've run out of deltas to apply, we're done
- if not yaml_script:
- rpki.log.info("No more deltas to apply, done")
- break
+ def run_yaml(self):
+ del self.iterator
- rpki.log.info("Applying deltas")
- db.apply_delta(yaml_script.pop(0))
+ # Run all YAML clients
+ for a in self.db.leaves:
+ a.run_yaml()
- # Resync IRDBs
- for a in db.engines:
- a.sync_sql()
+ # Run rcynic to check results
+ run_rcynic()
- # Clean up
+ # If we've run out of deltas to apply, we're done
+ if not yaml_script:
- finally:
+ rpki.log.info("No more deltas to apply, done")
- try:
- rpki.log.info("Shutting down")
- for a in db.engines:
- a.kill_daemons()
- for p,n in ((rootd_process, "rootd"), (pubd_process, "pubd"), (rsyncd_process, "rsyncd")):
- if p is not None:
- rpki.log.info("Killing %s" % n)
- os.kill(p.pid, signal.SIGTERM)
- except Exception, data:
- rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data)
+ else:
+
+ rpki.log.info("Applying deltas")
+ self.db.apply_delta(yaml_script.pop(0), self.apply_delta_done)
+
+ def apply_delta_done(self):
+
+ # Resync IRDBs
+ for a in self.db.engines:
+ a.sync_sql()
+
+ # Loop until we run out of control YAML
+ self.yaml_loop()
def wakeup(signum, frame):
"""Handler called when we receive a SIGALRM signal."""
@@ -340,17 +397,31 @@ class allocation_db(list):
for i, a in zip(range(len(self.engines)), self.engines):
a.set_engine_number(i)
- def apply_delta(self, delta):
+ def apply_delta(self, delta, cb):
"""Apply a delta or run a command."""
if delta is None:
- return
- for d in delta:
- if isinstance(d, str):
- c = d.split()
- cmds[c[0]](*c[1:])
- else:
- self.map[d["name"]].apply_delta(d)
+ cb()
+ else:
+ self.cb = cb
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(delta, self.apply_one_delta, self.apply_delta_done)
+ self.iterator()
+
+ def apply_one_delta(self, d):
+ if isinstance(d, str):
+ c = d.split()
+ cmds[c[0]](*c[1:])
+ self.iterator()
+ else:
+ self.map[d["name"]].apply_delta(d, self.apply_one_delta_cb)
+
+ def apply_one_delta_cb(self, *ignored):
+ self.iterator()
+
+ def apply_delta_done(self):
+ del self.iterator
self.root.closure()
+ self.cb()
def dump(self):
"""Print content of the database."""
@@ -402,53 +473,98 @@ class allocation(object):
self.resources = resources
return resources
- def apply_delta(self, yaml):
+ def apply_delta(self, yaml, cb):
"""Apply deltas to this entity."""
rpki.log.info("Applying delta: %s" % yaml)
- for k,v in yaml.items():
- if k != "name":
- getattr(self, "apply_" + k)(v)
-
- def apply_add_as(self, text): self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text))
- def apply_add_v4(self, text): self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text))
- def apply_add_v6(self, text): self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text))
- def apply_sub_as(self, text): self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text))
- def apply_sub_v4(self, text): self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text))
- def apply_sub_v6(self, text): self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text))
-
- def apply_valid_until(self, stamp): self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp)
- def apply_valid_for(self, text): self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text)
- def apply_valid_add(self, text): self.base.valid_until += rpki.sundial.timedelta.parse(text)
- def apply_valid_sub(self, text): self.base.valid_until -= rpki.sundial.timedelta.parse(text)
-
- def apply_route_origin_add(self, yaml):
+ self.apply_delta_caller_cb = cb
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(yaml.items(), self.apply_one_delta, self.apply_delta_done)
+ self.iterator()
+
+ def apply_one_delta(self, kv):
+ if kv[0] != "name":
+ getattr(self, "apply_" + kv[0])(kv[1], self.apply_one_delta_cb)
+ else:
+ self.apply_one_delta_cb()
+
+ def apply_one_delta_cb(self, *ignored):
+ self.iterator()
+
+ def apply_delta_done(self):
+ del self.iterator
+ self.apply_delta_caller_cb()
+
+ def apply_add_as(self, text, cb):
+ self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text))
+ cb()
+
+ def apply_add_v4(self, text, cb):
+ self.base.v4 = self.base.v4.union(rpki.resource_set.resource_set_ipv4(text))
+ cb()
+
+ def apply_add_v6(self, text, cb):
+ self.base.v6 = self.base.v6.union(rpki.resource_set.resource_set_ipv6(text))
+ cb()
+
+ def apply_sub_as(self, text, cb):
+ self.base.asn = self.base.asn.difference(rpki.resource_set.resource_set_as(text))
+ cb()
+
+ def apply_sub_v4(self, text, cb):
+ self.base.v4 = self.base.v4.difference(rpki.resource_set.resource_set_ipv4(text))
+ cb()
+
+ def apply_sub_v6(self, text, cb):
+ self.base.v6 = self.base.v6.difference(rpki.resource_set.resource_set_ipv6(text))
+ cb()
+
+ def apply_valid_until(self, stamp, cb):
+ self.base.valid_until = rpki.sundial.datetime.fromdatetime(stamp)
+ cb()
+
+ def apply_valid_for(self, text, cb):
+ self.base.valid_until = rpki.sundial.now() + rpki.sundial.timedelta.parse(text)
+ cb()
+
+ def apply_valid_add(self, text, cb):
+ self.base.valid_until += rpki.sundial.timedelta.parse(text)
+ cb()
+
+ def apply_valid_sub(self, text, cb):
+ self.base.valid_until -= rpki.sundial.timedelta.parse(text)
+ cb()
+
+ def apply_route_origin_add(self, yaml, cb):
for y in yaml:
self.route_origins.add(route_origin.parse(y))
+ cb()
- def apply_route_origin_del(self, yaml):
+ def apply_route_origin_del(self, yaml, cb):
for y in yaml:
self.route_origins.remove(route_origin.parse(y))
+ cb()
- def apply_rekey(self, target):
+ def apply_rekey(self, target, cb):
if self.is_leaf():
raise RuntimeError, "Can't rekey YAML leaf %s, sorry" % self.name
elif target is None:
rpki.log.info("Rekeying <self/> %s" % self.name)
- self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"))
+ self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = cb)
else:
rpki.log.info("Rekeying <parent/> %s %s" % (self.name, target))
- self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"))
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = cb)
- def apply_revoke(self, target):
+ def apply_revoke(self, target, cb):
if self.is_leaf():
rpki.log.info("Attempting to revoke YAML leaf %s" % self.name)
subprocess.check_call((prog_python, prog_poke, "-y", self.name + ".yaml", "-r", "revoke"))
+ cb()
elif target is None:
rpki.log.info("Revoking <self/> %s" % self.name)
- self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"))
+ self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = cb)
else:
rpki.log.info("Revoking <parent/> %s %s" % (self.name, target))
- self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"))
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = cb)
def __str__(self):
s = self.name + "\n"
@@ -549,35 +665,46 @@ class allocation(object):
rpki.log.info("Killing daemons for %s" % self.name)
for proc in (self.rpkid_process, self.irdbd_process):
try:
+ rpki.log.info("Killing pid %d" % proc.pid)
os.kill(proc.pid, signal.SIGTERM)
except:
pass
proc.wait()
- def call_rpkid(self, *pdu):
+ def call_rpkid(self, pdu, cb):
"""Send a left-right message to this entity's RPKI daemon and
return the response.
"""
rpki.log.info("Calling rpkid for %s" % self.name)
- msg = rpki.left_right.msg(pdu)
+ msg = rpki.left_right.msg([pdu])
msg.type = "query"
cms, xml = rpki.left_right.cms_msg.wrap(msg, self.irbe_key, self.irbe_cert,
pretty_print = True)
rpki.log.debug(xml)
url = "https://localhost:%d/left-right" % self.rpki_port
- der = rpki.https.client(
+
+ self.call_rpkid_caller_cb = cb
+
+ rpki.https.client(
client_key = self.irbe_key,
client_cert = self.irbe_cert,
server_ta = self.rpkid_ta,
url = url,
- msg = cms)
- msg, xml = rpki.left_right.cms_msg.unwrap(der, (self.rpkid_ta, self.rpkid_cert),
+ msg = cms,
+ callback = self.call_rpkid_cb)
+
+ rpki.log.info("Call to rpkid %s returned" % self.name)
+
+ def call_rpkid_cb(self, val):
+ if isinstance(val, Exception):
+ raise val
+ msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert),
pretty_print = True)
rpki.log.debug(xml)
assert msg.type == "reply"
for pdu in msg:
assert not isinstance(pdu, rpki.left_right.report_error_elt)
- return msg[0] if len(msg) == 1 else msg
+ self.call_rpkid_caller_cb(msg[0] if len(msg) == 1 else msg)
def cross_certify(self, certificant, reverse = False):
"""Cross-certify and return the resulting certificate."""
@@ -627,18 +754,24 @@ class allocation(object):
self_ca = rpki.x509.X509(Auto_file = self.name + "-SELF-1.cer")
rpki.log.info("Creating rpkid self object for %s" % self.name)
- self.self_id = self.call_rpkid(rpki.left_right.self_elt.make_pdu(
- action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca)).self_id
+ self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "create", crl_interval = self.crl_interval, regen_margin = self.regen_margin, bpki_cert = self_ca),
+ cb = self.create_rpki_objects_1)
+
+ def create_rpki_objects_1(self, val):
+ self.self_id = val.self_id
rpki.log.info("Creating rpkid BSC object for %s" % self.name)
- pdu = self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True))
- self.bsc_id = pdu.bsc_id
+ self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "create", self_id = self.self_id, generate_keypair = True),
+ cb = self.create_rpki_objects_2)
+
+ def create_rpki_objects_2(self, val):
+ self.bsc_id = val.bsc_id
rpki.log.info("Issuing BSC EE cert for %s" % self.name)
cmd = (prog_openssl, "x509", "-req", "-sha256", "-extfile", self.name + "-RPKI.conf", "-extensions", "req_x509_ext", "-days", "30",
"-CA", self.name + "-SELF-1.cer", "-CAkey", self.name + "-SELF-1.key", "-CAcreateserial", "-text")
signer = subprocess.Popen(cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- signed = signer.communicate(input = pdu.pkcs10_request.get_PEM())
+ signed = signer.communicate(input = val.pkcs10_request.get_PEM())
if not signed[0]:
rpki.log.error(signed[1])
raise RuntimeError, "Couldn't issue BSC EE certificate"
@@ -646,53 +779,99 @@ class allocation(object):
bsc_crl = rpki.x509.CRL(PEM_file = self.name + "-SELF-1.crl")
rpki.log.info("Installing BSC EE cert for %s" % self.name)
- self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl))
+ self.call_rpkid(rpki.left_right.bsc_elt.make_pdu(action = "set", self_id = self.self_id, bsc_id = self.bsc_id, signing_cert = bsc_ee, signing_cert_crl = bsc_crl),
+ cb = self.create_rpki_objects_3)
+
+ def create_rpki_objects_3(self, val):
rpki.log.info("Creating pubd client object for %s" % self.name)
client_cert = self.cross_certify(pubd_name + "-TA", reverse = True)
- client_id = call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert)).client_id
+ call_pubd(rpki.publication.client_elt.make_pdu(action = "create", base_uri = self.sia_base, bpki_cert = client_cert),
+ cb = self.create_rpki_objects_4)
+
+ def create_rpki_objects_4(self, val):
+ client_id = val.client_id
rpki.log.info("Creating rpkid repository object for %s" % self.name)
repository_cert = self.cross_certify(pubd_name + "-TA")
- self.repository_id = self.call_rpkid(rpki.left_right.repository_elt.make_pdu(
- action = "create", self_id = self.self_id, bsc_id = self.bsc_id,
- bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert,
- peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id))).repository_id
+ self.call_rpkid(rpki.left_right.repository_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id,
+ bpki_cms_cert = repository_cert, bpki_https_cert = repository_cert,
+ peer_contact_uri = "https://localhost:%d/client/%d" % (pubd_port, client_id)),
+ cb = self.create_rpki_objects_5)
+
+ def create_rpki_objects_5(self, val):
+ self.repository_id = val.repository_id
rpki.log.info("Creating rpkid parent object for %s" % self.name)
if self.is_root():
rootd_cert = self.cross_certify(rootd_name + "-TA")
- self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu(
- action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base,
- bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus",
- peer_contact_uri = "https://localhost:%s/" % rootd_port)).parent_id
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id,
+ repository_id = self.repository_id, sia_base = self.sia_base,
+ bpki_cms_cert = rootd_cert, bpki_https_cert = rootd_cert, sender_name = self.name, recipient_name = "Walrus",
+ peer_contact_uri = "https://localhost:%s/" % rootd_port),
+ cb = self.create_rpki_objects_6)
else:
parent_cms_cert = self.cross_certify(self.parent.name + "-SELF-1")
parent_https_cert = self.cross_certify(self.parent.name + "-TA")
- self.parent_id = self.call_rpkid(rpki.left_right.parent_elt.make_pdu(
- action = "create", self_id = self.self_id, bsc_id = self.bsc_id, repository_id = self.repository_id, sia_base = self.sia_base,
- bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert, sender_name = self.name, recipient_name = self.parent.name,
- peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id))).parent_id
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id,
+ repository_id = self.repository_id, sia_base = self.sia_base,
+ bpki_cms_cert = parent_cms_cert, bpki_https_cert = parent_https_cert,
+ sender_name = self.name, recipient_name = self.parent.name,
+ peer_contact_uri = "https://localhost:%s/up-down/%s" % (self.parent.rpki_port, self.child_id)),
+ cb = self.create_rpki_objects_6)
+
+ def create_rpki_objects_6(self, val):
+ self.parent_id = val.parent_id
rpki.log.info("Creating rpkid child objects for %s" % self.name)
- db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass)
- cur = db.cursor()
- for kid in self.kids:
- if kid.is_leaf():
- bpki_cert = self.cross_certify(kid.name + "-TA")
- else:
- bpki_cert = self.cross_certify(kid.name + "-SELF-1")
- rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name))
- kid.child_id = self.call_rpkid(rpki.left_right.child_elt.make_pdu(
- action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert)).child_id
- cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, kid.child_id, kid.name))
- db.close()
+ self.sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass)
+ self.sql_cur = self.sql_db.cursor()
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(self.kids, self.create_rpki_objects_7, self.create_rpki_objects_8)
+ self.iterator()
+
+ def create_rpki_objects_7(self, kid):
+ self.kid = kid
+ if kid.is_leaf():
+ bpki_cert = self.cross_certify(kid.name + "-TA")
+ else:
+ bpki_cert = self.cross_certify(kid.name + "-SELF-1")
+ rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name))
+ self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert),
+ cb = self.create_rpki_objects_7_cb)
+
+ def create_rpki_objects_7_cb(self, val):
+ self.kid.child_id = val.child_id
+ self.sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, self.kid.child_id, self.kid.name))
+ self.iterator()
+
+ def create_rpki_objects_8(self):
+ self.sql_db.close()
+ del self.iterator
+ del self.sql_cur
+ del self.sql_db
+ if hasattr(self, "kid"):
+ del self.kid
rpki.log.info("Creating rpkid route_origin objects for %s" % self.name)
- for ro in self.route_origins:
- ro.route_origin_id = self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(
- action = "create", self_id = self.self_id,
- as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6)).route_origin_id
+ assert not hasattr(self, "iterator")
+ self.iterator = async_iterator(self.route_origins, self.create_rpki_objects_9, self.create_rpki_objects_10)
+ self.iterator()
+
+ def create_rpki_objects_9(self, ro):
+ self.ro = ro
+ self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id,
+ as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6),
+ cb = self.create_rpki_objects_9_cb)
+
+ def create_rpki_objects_9_cb(self, val):
+ self.ro.route_origin_id = val.route_origin_id
+ self.iterator()
+
+ def create_rpki_objects_10(self):
+ if hasattr(self, "ro"):
+ del self.ro
+ del self.iterator
def setup_yaml_leaf(self):
"""Generate certificates and write YAML scripts for leaf nodes.
@@ -723,7 +902,7 @@ class allocation(object):
"ski" : ski })
f.close()
- def run_cron(self):
+ def run_cron(self, cb):
"""Trigger cron run for this engine."""
rpki.log.info("Running cron for %s" % self.name)
@@ -731,7 +910,8 @@ class allocation(object):
client_cert = self.irbe_cert,
server_ta = self.rpkid_ta,
url = "https://localhost:%d/cronjob" % self.rpki_port,
- msg = "Run cron now, please")
+ msg = "Run cron now, please",
+ callback = cb)
def run_yaml(self):
"""Run YAML scripts for this leaf entity. Since we're not
@@ -839,40 +1019,48 @@ def setup_publication(pubd_sql):
pubd_irbe_cert = rpki.x509.X509(Auto_file = pubd_name + "-IRBE.cer")
pubd_pubd_cert = rpki.x509.X509(Auto_file = pubd_name + "-PUBD.cer")
-def call_pubd(*pdu):
+def call_pubd(pdu, cb):
"""Send a publication message to publication daemon and return the
response.
"""
rpki.log.info("Calling pubd")
- msg = rpki.publication.msg(pdu)
+ msg = rpki.publication.msg([pdu])
msg.type = "query"
cms, xml = rpki.publication.cms_msg.wrap(msg, pubd_irbe_key, pubd_irbe_cert,
pretty_print = True)
rpki.log.debug(xml)
url = "https://localhost:%d/control" % pubd_port
- der = rpki.https.client(
+
+ global call_pubd_caller_cb
+ call_pubd_caller_cb = cb # Global variable, icky
+
+ rpki.https.client(
client_key = pubd_irbe_key,
client_cert = pubd_irbe_cert,
server_ta = pubd_ta,
url = url,
- msg = cms)
- msg, xml = rpki.publication.cms_msg.unwrap(der, (pubd_ta, pubd_pubd_cert),
+ msg = cms,
+ callback = call_pubd_cb)
+
+def call_pubd_cb(val):
+ if isinstance(val, Exception):
+ raise val
+ msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert),
pretty_print = True)
rpki.log.debug(xml)
assert msg.type == "reply"
for pdu in msg:
assert not isinstance(pdu, rpki.publication.report_error_elt)
- return msg[0] if len(msg) == 1 else msg
+ call_pubd_caller_cb(msg[0] if len(msg) == 1 else msg)
-def set_pubd_crl():
+def set_pubd_crl(cb):
"""Whack publication daemon's bpki_crl. This must be configured
before publication daemon starts talking to its clients, and must be
updated whenever we update the CRL.
"""
rpki.log.info("Setting pubd's BPKI CRL")
- call_pubd(rpki.publication.config_elt.make_pdu(
- action = "set",
- bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl")))
+ call_pubd(rpki.publication.config_elt.make_pdu(action = "set", bpki_crl = rpki.x509.CRL(Auto_file = pubd_name + "-TA.crl")),
+ cb = cb)
def run_rcynic():
"""Run rcynic to see whether what was published makes sense."""