aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpkid/rpki/async.py6
-rw-r--r--rpkid/rpki/rpki_engine.py40
-rw-r--r--rpkid/testbed.py158
3 files changed, 88 insertions, 116 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index 4dc56d41..251dc253 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -25,10 +25,14 @@ class iterator(object):
self.handler_cb = handler_cb
self.done_cb = done_cb
self.iterator = iter(iterable)
+ self.next()
def __call__(self, *ignored):
+ self.next()
+
+ def next(self):
try:
- self.handler_cb(self.iterator.next())
+ self.handler_cb(self, self.iterator.next())
except StopIteration:
if self.done_cb is not None:
self.done_cb()
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py
index 1a9a5026..5f89c338 100644
--- a/rpkid/rpki/rpki_engine.py
+++ b/rpkid/rpki/rpki_engine.py
@@ -115,33 +115,31 @@ class rpkid_context(object):
rpki.log.trace()
self.sql.ping()
- def cronjob_done():
- self.sql.sweep()
- cb(200, "OK")
+ def cronjob_do_one(iterator, s):
- self.cronjob_iterator = rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self),
- self.cronjob_do_one, cronjob_done)
- self.cronjob_iterator()
+ def client_poll():
+ rpki.log.debug("Self %s polling parents" % s.self_id)
+ s.client_poll(update_children)
- def cronjob_do_one(self, s):
- """Handle periodic tasks for one <self_elt/>."""
+ def update_children():
+ rpki.log.debug("Self %s updating children" % s.self_id)
+ s.update_children(update_roas_crls_and_manifests)
- def client_poll():
- rpki.log.debug("Self %s polling parents" % s.self_id)
- s.client_poll(update_children)
+ def update_roas_crls_and_manifests():
+ rpki.log.debug("Self %s updating ROAs" % s.self_id)
+ s.update_roas()
+ rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id)
+ s.regenerate_crls_and_manifests()
+ iterator()
- def update_children():
- rpki.log.debug("Self %s updating children" % s.self_id)
- s.update_children(update_roas_crls_and_manifests)
+ client_poll()
- def update_roas_crls_and_manifests():
- rpki.log.debug("Self %s updating ROAs" % s.self_id)
- s.update_roas()
- rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id)
- s.regenerate_crls_and_manifests()
- self.cronjob_iterator()
+ def cronjob_done():
+ self.sql.sweep()
+ cb(200, "OK")
- client_poll()
+ rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self),
+ cronjob_do_one, cronjob_done)
## @var https_ta_cache
# HTTPS trust anchor cache, to avoid regenerating it for every TLS connection.
diff --git a/rpkid/testbed.py b/rpkid/testbed.py
index 2b3957c7..395c8155 100644
--- a/rpkid/testbed.py
+++ b/rpkid/testbed.py
@@ -224,9 +224,10 @@ class main(object):
rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay)
time.sleep(startup_delay)
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(self.db.engines, self.create_rpki_objects, self.created_rpki_objects)
- self.iterator()
+ def create_rpki_objects(iterator, a):
+ a.create_rpki_objects(iterator)
+
+ rpki.async.iterator(self.db.engines, create_rpki_objects, self.created_rpki_objects)
# At this point we have gone into (pseudo) event-driven code.
# See comments above about cleanup of this try/finally code
@@ -249,12 +250,7 @@ class main(object):
rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data)
- def create_rpki_objects(self, a):
- """Create objects in RPKI engines"""
- a.create_rpki_objects(self.iterator)
-
def created_rpki_objects(self):
- del self.iterator
# Setup keys and certs and write YAML files for leaves
for a in self.db.leaves:
@@ -269,15 +265,13 @@ class main(object):
# objects, particular CRLs
# Run cron in all RPKI instances
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(self.db.engines, self.run_cron, self.run_yaml)
- self.iterator()
- def run_cron(self, a):
- a.run_cron(self.iterator)
+ def run_cron(iterator, a):
+ a.run_cron(iterator)
+
+ rpki.async.iterator(self.db.engines, run_cron, self.run_yaml)
def run_yaml(self):
- del self.iterator
# Run all YAML clients
for a in self.db.leaves:
@@ -390,29 +384,23 @@ class allocation_db(list):
def apply_delta(self, delta, cb):
"""Apply a delta or run a command."""
+
+ def once(iterator, d):
+ if isinstance(d, str):
+ c = d.split()
+ cmds[c[0]](*c[1:])
+ iterator()
+ else:
+ self.map[d["name"]].apply_delta(d, iterator)
+
+ def done():
+ self.root.closure()
+ cb()
+
if delta is None:
cb()
else:
- self.cb = cb
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(delta, self.apply_one_delta, self.apply_delta_done)
- self.iterator()
-
- def apply_one_delta(self, d):
- if isinstance(d, str):
- c = d.split()
- cmds[c[0]](*c[1:])
- self.iterator()
- else:
- self.map[d["name"]].apply_delta(d, self.apply_one_delta_cb)
-
- def apply_one_delta_cb(self, *ignored):
- self.iterator()
-
- def apply_delta_done(self):
- del self.iterator
- self.root.closure()
- self.cb()
+ rpki.async.iterator(delta, once, done)
def dump(self):
"""Print content of the database."""
@@ -466,24 +454,16 @@ class allocation(object):
def apply_delta(self, yaml, cb):
"""Apply deltas to this entity."""
+
rpki.log.info("Applying delta: %s" % yaml)
- self.apply_delta_caller_cb = cb
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(yaml.items(), self.apply_one_delta, self.apply_delta_done)
- self.iterator()
-
- def apply_one_delta(self, kv):
- if kv[0] != "name":
- getattr(self, "apply_" + kv[0])(kv[1], self.apply_one_delta_cb)
- else:
- self.apply_one_delta_cb()
- def apply_one_delta_cb(self, *ignored):
- self.iterator()
+ def once(iterator, kv):
+ if kv[0] == "name":
+ iterator()
+ else:
+ getattr(self, "apply_" + kv[0])(kv[1], iterator)
- def apply_delta_done(self):
- del self.iterator
- self.apply_delta_caller_cb()
+ rpki.async.iterator(yaml.items(), once, cb)
def apply_add_as(self, text, cb):
self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text))
@@ -820,56 +800,46 @@ class allocation(object):
self.parent_id = val.parent_id
rpki.log.info("Creating rpkid child objects for %s" % self.name)
- self.sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass)
- self.sql_cur = self.sql_db.cursor()
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(self.kids, do_one_kid, kids_done)
- self.iterator()
-
- def do_one_kid(kid):
- self.kid = kid
- if kid.is_leaf():
- bpki_cert = self.cross_certify(kid.name + "-TA")
- else:
- bpki_cert = self.cross_certify(kid.name + "-SELF-1")
- rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name))
+ sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass)
+ sql_cur = sql_db.cursor()
+
+ def once(iterator, kid):
+
+ if kid.is_leaf():
+ bpki_cert = self.cross_certify(kid.name + "-TA")
+ else:
+ bpki_cert = self.cross_certify(kid.name + "-SELF-1")
+
+ rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name))
- def do_one_kid_cb(val):
- self.kid.child_id = val.child_id
- self.sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, self.kid.child_id, self.kid.name))
- self.iterator()
+ def save(val):
+ kid.child_id = val.child_id
+ sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, kid.child_id, kid.name))
+ iterator()
- self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert),
- cb = do_one_kid_cb)
+ self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert),
+ cb = save)
- def kids_done():
- self.sql_db.close()
- del self.iterator
- del self.sql_cur
- del self.sql_db
- if hasattr(self, "kid"):
- del self.kid
+ def done():
+ sql_db.close()
+ do_route_origins()
+ rpki.async.iterator(self.kids, once, done)
+
+ def do_route_origins():
rpki.log.info("Creating rpkid route_origin objects for %s" % self.name)
- assert not hasattr(self, "iterator")
- self.iterator = rpki.async.iterator(self.route_origins, do_one_ro, cleanup)
- self.iterator()
-
- def do_one_ro(ro):
- self.ro = ro
- self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id,
- as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6),
- cb = do_one_ro_cb)
-
- def do_one_ro_cb(val):
- self.ro.route_origin_id = val.route_origin_id
- self.iterator()
-
- def cleanup():
- if hasattr(self, "ro"):
- del self.ro
- del self.iterator
- cb()
+
+ def do_one_ro(iterator, ro):
+
+ def do_one_ro_cb(val):
+ ro.route_origin_id = val.route_origin_id
+ iterator()
+
+ self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id,
+ as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6),
+ cb = do_one_ro_cb)
+
+ rpki.async.iterator(self.route_origins, do_one_ro, cb)
start()