diff options
-rw-r--r-- | rpkid/rpki/async.py | 6 | ||||
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 40 | ||||
-rw-r--r-- | rpkid/testbed.py | 158 |
3 files changed, 88 insertions, 116 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index 4dc56d41..251dc253 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -25,10 +25,14 @@ class iterator(object): self.handler_cb = handler_cb self.done_cb = done_cb self.iterator = iter(iterable) + self.next() def __call__(self, *ignored): + self.next() + + def next(self): try: - self.handler_cb(self.iterator.next()) + self.handler_cb(self, self.iterator.next()) except StopIteration: if self.done_cb is not None: self.done_cb() diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index 1a9a5026..5f89c338 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -115,33 +115,31 @@ class rpkid_context(object): rpki.log.trace() self.sql.ping() - def cronjob_done(): - self.sql.sweep() - cb(200, "OK") + def cronjob_do_one(iterator, s): - self.cronjob_iterator = rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), - self.cronjob_do_one, cronjob_done) - self.cronjob_iterator() + def client_poll(): + rpki.log.debug("Self %s polling parents" % s.self_id) + s.client_poll(update_children) - def cronjob_do_one(self, s): - """Handle periodic tasks for one <self_elt/>.""" + def update_children(): + rpki.log.debug("Self %s updating children" % s.self_id) + s.update_children(update_roas_crls_and_manifests) - def client_poll(): - rpki.log.debug("Self %s polling parents" % s.self_id) - s.client_poll(update_children) + def update_roas_crls_and_manifests(): + rpki.log.debug("Self %s updating ROAs" % s.self_id) + s.update_roas() + rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id) + s.regenerate_crls_and_manifests() + iterator() - def update_children(): - rpki.log.debug("Self %s updating children" % s.self_id) - s.update_children(update_roas_crls_and_manifests) + client_poll() - def update_roas_crls_and_manifests(): - rpki.log.debug("Self %s updating ROAs" % s.self_id) - s.update_roas() - rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id) - s.regenerate_crls_and_manifests() - self.cronjob_iterator() + def cronjob_done(): + self.sql.sweep() + cb(200, "OK") - client_poll() + rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), + cronjob_do_one, cronjob_done) ## @var https_ta_cache # HTTPS trust anchor cache, to avoid regenerating it for every TLS connection. diff --git a/rpkid/testbed.py b/rpkid/testbed.py index 2b3957c7..395c8155 100644 --- a/rpkid/testbed.py +++ b/rpkid/testbed.py @@ -224,9 +224,10 @@ class main(object): rpki.log.info("Sleeping %d seconds while daemons start up" % startup_delay) time.sleep(startup_delay) - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(self.db.engines, self.create_rpki_objects, self.created_rpki_objects) - self.iterator() + def create_rpki_objects(iterator, a): + a.create_rpki_objects(iterator) + + rpki.async.iterator(self.db.engines, create_rpki_objects, self.created_rpki_objects) # At this point we have gone into (pseudo) event-driven code. # See comments above about cleanup of this try/finally code @@ -249,12 +250,7 @@ class main(object): rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) - def create_rpki_objects(self, a): - """Create objects in RPKI engines""" - a.create_rpki_objects(self.iterator) - def created_rpki_objects(self): - del self.iterator # Setup keys and certs and write YAML files for leaves for a in self.db.leaves: @@ -269,15 +265,13 @@ class main(object): # objects, particular CRLs # Run cron in all RPKI instances - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(self.db.engines, self.run_cron, self.run_yaml) - self.iterator() - def run_cron(self, a): - a.run_cron(self.iterator) + def run_cron(iterator, a): + a.run_cron(iterator) + + rpki.async.iterator(self.db.engines, run_cron, self.run_yaml) def run_yaml(self): - del self.iterator # Run all YAML clients for a in self.db.leaves: @@ -390,29 +384,23 @@ class allocation_db(list): def apply_delta(self, delta, cb): """Apply a delta or run a command.""" + + def once(iterator, d): + if isinstance(d, str): + c = d.split() + cmds[c[0]](*c[1:]) + iterator() + else: + self.map[d["name"]].apply_delta(d, iterator) + + def done(): + self.root.closure() + cb() + if delta is None: cb() else: - self.cb = cb - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(delta, self.apply_one_delta, self.apply_delta_done) - self.iterator() - - def apply_one_delta(self, d): - if isinstance(d, str): - c = d.split() - cmds[c[0]](*c[1:]) - self.iterator() - else: - self.map[d["name"]].apply_delta(d, self.apply_one_delta_cb) - - def apply_one_delta_cb(self, *ignored): - self.iterator() - - def apply_delta_done(self): - del self.iterator - self.root.closure() - self.cb() + rpki.async.iterator(delta, once, done) def dump(self): """Print content of the database.""" @@ -466,24 +454,16 @@ class allocation(object): def apply_delta(self, yaml, cb): """Apply deltas to this entity.""" + rpki.log.info("Applying delta: %s" % yaml) - self.apply_delta_caller_cb = cb - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(yaml.items(), self.apply_one_delta, self.apply_delta_done) - self.iterator() - - def apply_one_delta(self, kv): - if kv[0] != "name": - getattr(self, "apply_" + kv[0])(kv[1], self.apply_one_delta_cb) - else: - self.apply_one_delta_cb() - def apply_one_delta_cb(self, *ignored): - self.iterator() + def once(iterator, kv): + if kv[0] == "name": + iterator() + else: + getattr(self, "apply_" + kv[0])(kv[1], iterator) - def apply_delta_done(self): - del self.iterator - self.apply_delta_caller_cb() + rpki.async.iterator(yaml.items(), once, cb) def apply_add_as(self, text, cb): self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text)) @@ -820,56 +800,46 @@ class allocation(object): self.parent_id = val.parent_id rpki.log.info("Creating rpkid child objects for %s" % self.name) - self.sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) - self.sql_cur = self.sql_db.cursor() - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(self.kids, do_one_kid, kids_done) - self.iterator() - - def do_one_kid(kid): - self.kid = kid - if kid.is_leaf(): - bpki_cert = self.cross_certify(kid.name + "-TA") - else: - bpki_cert = self.cross_certify(kid.name + "-SELF-1") - rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) + sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass) + sql_cur = sql_db.cursor() + + def once(iterator, kid): + + if kid.is_leaf(): + bpki_cert = self.cross_certify(kid.name + "-TA") + else: + bpki_cert = self.cross_certify(kid.name + "-SELF-1") + + rpki.log.info("Creating rpkid child object for %s as child of %s" % (kid.name, self.name)) - def do_one_kid_cb(val): - self.kid.child_id = val.child_id - self.sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, self.kid.child_id, self.kid.name)) - self.iterator() + def save(val): + kid.child_id = val.child_id + sql_cur.execute("UPDATE registrant SET rpki_self_id = %s, rpki_child_id = %s WHERE IRBE_mapped_id = %s", (self.self_id, kid.child_id, kid.name)) + iterator() - self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert), - cb = do_one_kid_cb) + self.call_rpkid(rpki.left_right.child_elt.make_pdu(action = "create", self_id = self.self_id, bsc_id = self.bsc_id, bpki_cert = bpki_cert), + cb = save) - def kids_done(): - self.sql_db.close() - del self.iterator - del self.sql_cur - del self.sql_db - if hasattr(self, "kid"): - del self.kid + def done(): + sql_db.close() + do_route_origins() + rpki.async.iterator(self.kids, once, done) + + def do_route_origins(): rpki.log.info("Creating rpkid route_origin objects for %s" % self.name) - assert not hasattr(self, "iterator") - self.iterator = rpki.async.iterator(self.route_origins, do_one_ro, cleanup) - self.iterator() - - def do_one_ro(ro): - self.ro = ro - self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id, - as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6), - cb = do_one_ro_cb) - - def do_one_ro_cb(val): - self.ro.route_origin_id = val.route_origin_id - self.iterator() - - def cleanup(): - if hasattr(self, "ro"): - del self.ro - del self.iterator - cb() + + def do_one_ro(iterator, ro): + + def do_one_ro_cb(val): + ro.route_origin_id = val.route_origin_id + iterator() + + self.call_rpkid(rpki.left_right.route_origin_elt.make_pdu(action = "create", self_id = self.self_id, + as_number = ro.asn, ipv4 = ro.v4, ipv6 = ro.v6), + cb = do_one_ro_cb) + + rpki.async.iterator(self.route_origins, do_one_ro, cb) start() |