diff options
author | Rob Austein <sra@hactrn.net> | 2009-05-12 03:39:37 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-05-12 03:39:37 +0000 |
commit | 48244b53cef3acef2d5e726e903ea2ec71bd2136 (patch) | |
tree | f68dcf1fba76b02fe4e9b85e11ee0a1350d9af5d | |
parent | c450547374d179e4740c869f9645ed9d1e4aeb48 (diff) |
Cleanup some of the litter left behind during conversion to callbacks.
Add rpki.async.iterator.__repr__() so we can figure out where the
silly things were created when debugging. Fix sloppy child process
shutdown in testbed.main(); I don't know why this only started
complaining now, but it did, so I fixed it.
svn path=/rpkid/pubd.py; revision=2426
-rwxr-xr-x | rpkid/pubd.py | 13 | ||||
-rw-r--r-- | rpkid/rpki/async.py | 14 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 14 | ||||
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 26 | ||||
-rw-r--r-- | rpkid/testbed.py | 115 |
5 files changed, 111 insertions, 71 deletions
diff --git a/rpkid/pubd.py b/rpkid/pubd.py index 5fb04dcb..1ea6f00e 100755 --- a/rpkid/pubd.py +++ b/rpkid/pubd.py @@ -65,10 +65,14 @@ class pubd_context(object): """ Process one PDU from the IRBE. """ + + def done(x): + cb(200, x) + rpki.log.trace() try: self.sql.ping() - self.handler_common(query, None, lambda x: cb(200, x), (self.bpki_ta, self.irbe_cert)) + self.handler_common(query, None, done, (self.bpki_ta, self.irbe_cert)) except rpki.async.ExitNow: raise except Exception, data: @@ -79,6 +83,11 @@ class pubd_context(object): """ Process one PDU from a client. """ + + def done(x): + cb(200, x) + + rpki.log.trace() try: self.sql.ping() @@ -91,7 +100,7 @@ class pubd_context(object): config = rpki.publication.config_elt.fetch(self) if config is None or config.bpki_crl is None: raise rpki.exceptions.CMSCRLNotSet - self.handler_common(query, client, lambda x: cb(200, x), (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl) + self.handler_common(query, client, done, (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl) except rpki.async.ExitNow: raise except Exception, data: diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index c581dbb0..1c295f76 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -45,6 +45,8 @@ class iterator(object): def __init__(self, iterable, item_callback, done_callback): self.item_callback = item_callback self.done_callback = done_callback + self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] + #rpki.log.debug("Created iterator id %s file %s line %s function %s" % (id(self), self.caller_file, self.caller_line, self.caller_function)) try: self.iterator = iter(iterable) except ExitNow: @@ -54,13 +56,23 @@ class iterator(object): raise self() - def __call__(self, *ignored): + def __repr__(self): + return "<asynciterator created at %s:%d %s at 0x%x>" % (self.caller_file, self.caller_line, self.caller_function, id(self)) + + def __call__(self, *args): + if args != (): + rpki.log.warn("Arguments passed to %r: %r" % (self, args)) + for x in traceback.format_stack(): + rpki.log.warn(x.strip()) + assert args == () try: self.item_callback(self, self.iterator.next()) except StopIteration: if self.done_callback is not None: self.done_callback() + def ignore(self, ignored): + self() class timer(object): """ diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index c8afa71a..64852a06 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -253,7 +253,7 @@ class self_elt(data_elt): child_cert.reissue( ca_detail = ca_detail, resources = new_resources, - callback = iterator2, + callback = iterator2.ignore, errback = reissue_failed) return @@ -265,7 +265,7 @@ class self_elt(data_elt): repository = parent.repository() child_cert.sql_delete() - def withdraw(*ignored): + def withdraw(): repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2, withdraw_failed) def manifest_failed(e): @@ -331,7 +331,7 @@ class self_elt(data_elt): def do_crl(): ca_detail.generate_crl(do_manifest, fail2) - def do_manifest(*ignored): + def do_manifest(): ca_detail.generate_manifest(iterator2, fail2) if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate(): @@ -861,7 +861,7 @@ class route_origin_elt(data_elt): def one(): repository.publish(self.cert, self.ee_uri(ca), two, errback) - def two(*ignored): + def two(): ca_detail.generate_manifest(callback, errback) repository.publish(self.roa, self.roa_uri(ca), @@ -888,7 +888,7 @@ class route_origin_elt(data_elt): if ca_detail.state != 'active': self.ca_detail_id = None - def one(*ignored): + def one(): rpki.log.debug("Withdrawing ROA and revoking its EE cert") rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) repository.withdraw(roa, roa_uri, @@ -898,11 +898,11 @@ class route_origin_elt(data_elt): def two(): repository.withdraw(cert, ee_uri, three, errback) - def three(*ignored): + def three(): self.gctx.sql.sweep() ca_detail.generate_crl(four, errback) - def four(*ignored): + def four(): ca_detail.generate_manifest(callback, errback) if regenerate: diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index a876c4a8..6e0e96eb 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -490,10 +490,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.ca_cert_uri = uri.rsync() self.generate_manifest_cert(ca) - def did_crl(*ignored): + def did_crl(): self.generate_manifest(callback = did_manifest, errback = errback) - def did_manifest(*ignored): + def did_manifest(): self.state = "active" self.sql_mark_dirty() if predecessor is None: @@ -504,7 +504,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs) def do_one_child_cert(iterator, child_cert): - child_cert.reissue(self, iterator, errback) + child_cert.reissue(self, iterator.ignore, errback) def done_child_certs(): rpki.async.iterator(predecessor.route_origins(), do_one_route_origin, callback) @@ -528,13 +528,13 @@ class ca_detail_obj(rpki.sql.sql_persistent): def withdraw_one_roa(iterator, route_origin): route_origin.withdraw_roa(iterator) - def withdraw_manifest(*ignored): + def withdraw_manifest(): repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl, eb) - def withdraw_crl(*ignored): + def withdraw_crl(): repository.withdraw(self.latest_crl, self.crl_uri(ca), done, eb) - def done(*ignored): + def done(): for cert in self.child_certs() + self.revoked_certs(): cert.sql_delete() self.sql_delete() @@ -590,10 +590,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.nextUpdate += crl_interval self.generate_crl(callback = final_manifest, errback = eb, nextUpdate = self.nextUpdate) - def final_manifest(*ignored): + def final_manifest(): self.generate_manifest(callback = done, errback = eb, nextUpdate = self.nextUpdate) - def done(*ignored): + def done(): self.private_key_id = None self.manifest_private_key_id = None self.manifest_public_key = None @@ -622,7 +622,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): child_cert.reissue( ca_detail = self, resources = child_resources.intersection(new_resources), - callback = iterator, + callback = iterator.ignore, errback = errback) else: iterator() @@ -718,10 +718,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): child_cert.sql_store() - def published(*ignored): + def published(): self.generate_manifest(done, errback) - def done(*ignored): + def done(): callback(child_cert) ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published, errback) @@ -845,7 +845,7 @@ class child_cert_obj(rpki.sql.sql_persistent): revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) repository = ca.parent().repository() - def done(*ignored): + def done(): self.gctx.sql.sweep() self.sql_delete() callback() @@ -905,7 +905,7 @@ class child_cert_obj(rpki.sql.sql_persistent): rpki.async.iterator([x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert], do_one_cert, done) - child_cert = ca_detail.issue( + ca_detail.issue( ca = ca, child = child, subject_key = self.cert.getPublicKey(), diff --git a/rpkid/testbed.py b/rpkid/testbed.py index 786017ea..097260f3 100644 --- a/rpkid/testbed.py +++ b/rpkid/testbed.py @@ -142,6 +142,12 @@ pub_sql_file = cfg.get("pub_sql_file", "pubd.sql") startup_delay = int(cfg.get("startup_delay", "10")) +rsyncd_dir = None +pubd_ta = None +pubd_irbe_key = None +pubd_irbe_cert = None +pubd_pubd_cert = None + class main(object): """ Main program, implemented as a class to handle asynchronous I/O in @@ -243,18 +249,19 @@ class main(object): finally: - try: - rpki.log.info("Shutting down") - for a in self.db.engines: - a.kill_daemons() - for p, n in ((self.rootd_process, "rootd"), (self.pubd_process, "pubd"), (self.rsyncd_process, "rsyncd")): - if p is not None: - rpki.log.info("Killing %s" % n) - os.kill(p.pid, signal.SIGTERM) - except rpki.async.ExitNow: - raise - except Exception, data: - rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data) + rpki.log.info("Shutting down") + for a in self.db.engines: + a.kill_daemons() + for proc, name in ((self.rootd_process, "rootd"), + (self.pubd_process, "pubd"), + (self.rsyncd_process, "rsyncd")): + if proc is not None: + rpki.log.info("Killing %s" % name) + try: + os.kill(proc.pid, signal.SIGTERM) + except OSError: + pass + proc.wait() def created_rpki_objects(self): @@ -264,9 +271,9 @@ class main(object): a.setup_yaml_leaf() # Set pubd's BPKI CRL - set_pubd_crl(self.yaml_loop) + set_pubd_crl(lambda crl: self.yaml_loop()) - def yaml_loop(self, *ignored): + def yaml_loop(self): # This is probably where we should be updating expired BPKI # objects, particular CRLs @@ -542,26 +549,38 @@ class allocation(object): cb() def apply_rekey(self, target, cb): + + def done(e): + if isinstance(e, Exception): + raise e + cb() + if self.is_leaf(): raise RuntimeError, "Can't rekey YAML leaf %s, sorry" % self.name elif target is None: rpki.log.info("Rekeying <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = cb) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = done) else: rpki.log.info("Rekeying <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = cb) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = done) def apply_revoke(self, target, cb): + + def done(e): + if isinstance(e, Exception): + raise e + cb() + if self.is_leaf(): rpki.log.info("Attempting to revoke YAML leaf %s" % self.name) subprocess.check_call((prog_python, prog_poke, "-y", self.name + ".yaml", "-r", "revoke")) cb() elif target is None: rpki.log.info("Revoking <self/> %s" % self.name) - self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = cb) + self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = done) else: rpki.log.info("Revoking <parent/> %s %s" % (self.name, target)) - self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = cb) + self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = done) def __str__(self): s = self.name + "\n" @@ -695,7 +714,17 @@ class allocation(object): rpki.log.debug(xml) url = "https://localhost:%d/left-right" % self.rpki_port - self.call_rpkid_caller_cb = cb + def done(val): + rpki.log.info("Callback to rpkid %s" % self.name) + if isinstance(val, Exception): + raise val + msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert), + pretty_print = True) + rpki.log.debug(xml) + assert msg.type == "reply" + for pdu in msg: + assert not isinstance(pdu, rpki.left_right.report_error_elt) + cb(msg[0] if len(msg) == 1 else msg) rpki.https.client( client_key = self.irbe_key, @@ -703,23 +732,11 @@ class allocation(object): server_ta = self.rpkid_ta, url = url, msg = cms, - callback = self.call_rpkid_cb, - errback = self.call_rpkid_cb) + callback = done, + errback = done) rpki.log.info("Call to rpkid %s returned" % self.name) - def call_rpkid_cb(self, val): - rpki.log.info("Callback to rpkid %s" % self.name) - if isinstance(val, Exception): - raise val - msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert), - pretty_print = True) - rpki.log.debug(xml) - assert msg.type == "reply" - for pdu in msg: - assert not isinstance(pdu, rpki.left_right.report_error_elt) - self.call_rpkid_caller_cb(msg[0] if len(msg) == 1 else msg) - def cross_certify(self, certificant, reverse = False): """ Cross-certify and return the resulting certificate. @@ -925,13 +942,18 @@ class allocation(object): """ rpki.log.info("Running cron for %s" % self.name) + + def done(result): + assert result == "OK" + cb() + rpki.https.client(client_key = self.irbe_key, client_cert = self.irbe_cert, server_ta = self.rpkid_ta, url = "https://localhost:%d/cronjob" % self.rpki_port, msg = "Run cron now, please", - callback = cb, - errback = cb) + callback = done, + errback = done) def run_yaml(self): """ @@ -1063,8 +1085,16 @@ def call_pubd(pdu, cb): rpki.log.debug(xml) url = "https://localhost:%d/control" % pubd_port - global call_pubd_caller_cb - call_pubd_caller_cb = cb # Global variable, icky + def call_pubd_cb(val): + if isinstance(val, Exception): + raise val + msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert), + pretty_print = True) + rpki.log.debug(xml) + assert msg.type == "reply" + for pdu in msg: + assert not isinstance(pdu, rpki.publication.report_error_elt) + cb(msg[0] if len(msg) == 1 else msg) rpki.https.client( client_key = pubd_irbe_key, @@ -1075,17 +1105,6 @@ def call_pubd(pdu, cb): callback = call_pubd_cb, errback = call_pubd_cb) -def call_pubd_cb(val): - if isinstance(val, Exception): - raise val - msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert), - pretty_print = True) - rpki.log.debug(xml) - assert msg.type == "reply" - for pdu in msg: - assert not isinstance(pdu, rpki.publication.report_error_elt) - call_pubd_caller_cb(msg[0] if len(msg) == 1 else msg) - def set_pubd_crl(cb): """ Whack publication daemon's bpki_crl. This must be configured before |