aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-05-12 03:39:37 +0000
committerRob Austein <sra@hactrn.net>2009-05-12 03:39:37 +0000
commit48244b53cef3acef2d5e726e903ea2ec71bd2136 (patch)
treef68dcf1fba76b02fe4e9b85e11ee0a1350d9af5d
parentc450547374d179e4740c869f9645ed9d1e4aeb48 (diff)
Cleanup some of the litter left behind during conversion to callbacks.
Add rpki.async.iterator.__repr__() so we can figure out where the silly things were created when debugging. Fix sloppy child process shutdown in testbed.main(); I don't know why this only started complaining now, but it did, so I fixed it. svn path=/rpkid/pubd.py; revision=2426
-rwxr-xr-xrpkid/pubd.py13
-rw-r--r--rpkid/rpki/async.py14
-rw-r--r--rpkid/rpki/left_right.py14
-rw-r--r--rpkid/rpki/rpki_engine.py26
-rw-r--r--rpkid/testbed.py115
5 files changed, 111 insertions, 71 deletions
diff --git a/rpkid/pubd.py b/rpkid/pubd.py
index 5fb04dcb..1ea6f00e 100755
--- a/rpkid/pubd.py
+++ b/rpkid/pubd.py
@@ -65,10 +65,14 @@ class pubd_context(object):
"""
Process one PDU from the IRBE.
"""
+
+ def done(x):
+ cb(200, x)
+
rpki.log.trace()
try:
self.sql.ping()
- self.handler_common(query, None, lambda x: cb(200, x), (self.bpki_ta, self.irbe_cert))
+ self.handler_common(query, None, done, (self.bpki_ta, self.irbe_cert))
except rpki.async.ExitNow:
raise
except Exception, data:
@@ -79,6 +83,11 @@ class pubd_context(object):
"""
Process one PDU from a client.
"""
+
+ def done(x):
+ cb(200, x)
+
+
rpki.log.trace()
try:
self.sql.ping()
@@ -91,7 +100,7 @@ class pubd_context(object):
config = rpki.publication.config_elt.fetch(self)
if config is None or config.bpki_crl is None:
raise rpki.exceptions.CMSCRLNotSet
- self.handler_common(query, client, lambda x: cb(200, x), (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl)
+ self.handler_common(query, client, done, (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl)
except rpki.async.ExitNow:
raise
except Exception, data:
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index c581dbb0..1c295f76 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -45,6 +45,8 @@ class iterator(object):
def __init__(self, iterable, item_callback, done_callback):
self.item_callback = item_callback
self.done_callback = done_callback
+ self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
+ #rpki.log.debug("Created iterator id %s file %s line %s function %s" % (id(self), self.caller_file, self.caller_line, self.caller_function))
try:
self.iterator = iter(iterable)
except ExitNow:
@@ -54,13 +56,23 @@ class iterator(object):
raise
self()
- def __call__(self, *ignored):
+ def __repr__(self):
+ return "<asynciterator created at %s:%d %s at 0x%x>" % (self.caller_file, self.caller_line, self.caller_function, id(self))
+
+ def __call__(self, *args):
+ if args != ():
+ rpki.log.warn("Arguments passed to %r: %r" % (self, args))
+ for x in traceback.format_stack():
+ rpki.log.warn(x.strip())
+ assert args == ()
try:
self.item_callback(self, self.iterator.next())
except StopIteration:
if self.done_callback is not None:
self.done_callback()
+ def ignore(self, ignored):
+ self()
class timer(object):
"""
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index c8afa71a..64852a06 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -253,7 +253,7 @@ class self_elt(data_elt):
child_cert.reissue(
ca_detail = ca_detail,
resources = new_resources,
- callback = iterator2,
+ callback = iterator2.ignore,
errback = reissue_failed)
return
@@ -265,7 +265,7 @@ class self_elt(data_elt):
repository = parent.repository()
child_cert.sql_delete()
- def withdraw(*ignored):
+ def withdraw():
repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2, withdraw_failed)
def manifest_failed(e):
@@ -331,7 +331,7 @@ class self_elt(data_elt):
def do_crl():
ca_detail.generate_crl(do_manifest, fail2)
- def do_manifest(*ignored):
+ def do_manifest():
ca_detail.generate_manifest(iterator2, fail2)
if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate():
@@ -861,7 +861,7 @@ class route_origin_elt(data_elt):
def one():
repository.publish(self.cert, self.ee_uri(ca), two, errback)
- def two(*ignored):
+ def two():
ca_detail.generate_manifest(callback, errback)
repository.publish(self.roa, self.roa_uri(ca),
@@ -888,7 +888,7 @@ class route_origin_elt(data_elt):
if ca_detail.state != 'active':
self.ca_detail_id = None
- def one(*ignored):
+ def one():
rpki.log.debug("Withdrawing ROA and revoking its EE cert")
rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
repository.withdraw(roa, roa_uri,
@@ -898,11 +898,11 @@ class route_origin_elt(data_elt):
def two():
repository.withdraw(cert, ee_uri, three, errback)
- def three(*ignored):
+ def three():
self.gctx.sql.sweep()
ca_detail.generate_crl(four, errback)
- def four(*ignored):
+ def four():
ca_detail.generate_manifest(callback, errback)
if regenerate:
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py
index a876c4a8..6e0e96eb 100644
--- a/rpkid/rpki/rpki_engine.py
+++ b/rpkid/rpki/rpki_engine.py
@@ -490,10 +490,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.ca_cert_uri = uri.rsync()
self.generate_manifest_cert(ca)
- def did_crl(*ignored):
+ def did_crl():
self.generate_manifest(callback = did_manifest, errback = errback)
- def did_manifest(*ignored):
+ def did_manifest():
self.state = "active"
self.sql_mark_dirty()
if predecessor is None:
@@ -504,7 +504,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs)
def do_one_child_cert(iterator, child_cert):
- child_cert.reissue(self, iterator, errback)
+ child_cert.reissue(self, iterator.ignore, errback)
def done_child_certs():
rpki.async.iterator(predecessor.route_origins(), do_one_route_origin, callback)
@@ -528,13 +528,13 @@ class ca_detail_obj(rpki.sql.sql_persistent):
def withdraw_one_roa(iterator, route_origin):
route_origin.withdraw_roa(iterator)
- def withdraw_manifest(*ignored):
+ def withdraw_manifest():
repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl, eb)
- def withdraw_crl(*ignored):
+ def withdraw_crl():
repository.withdraw(self.latest_crl, self.crl_uri(ca), done, eb)
- def done(*ignored):
+ def done():
for cert in self.child_certs() + self.revoked_certs():
cert.sql_delete()
self.sql_delete()
@@ -590,10 +590,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.nextUpdate += crl_interval
self.generate_crl(callback = final_manifest, errback = eb, nextUpdate = self.nextUpdate)
- def final_manifest(*ignored):
+ def final_manifest():
self.generate_manifest(callback = done, errback = eb, nextUpdate = self.nextUpdate)
- def done(*ignored):
+ def done():
self.private_key_id = None
self.manifest_private_key_id = None
self.manifest_public_key = None
@@ -622,7 +622,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
child_cert.reissue(
ca_detail = self,
resources = child_resources.intersection(new_resources),
- callback = iterator,
+ callback = iterator.ignore,
errback = errback)
else:
iterator()
@@ -718,10 +718,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
child_cert.sql_store()
- def published(*ignored):
+ def published():
self.generate_manifest(done, errback)
- def done(*ignored):
+ def done():
callback(child_cert)
ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published, errback)
@@ -845,7 +845,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail)
repository = ca.parent().repository()
- def done(*ignored):
+ def done():
self.gctx.sql.sweep()
self.sql_delete()
callback()
@@ -905,7 +905,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
rpki.async.iterator([x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert], do_one_cert, done)
- child_cert = ca_detail.issue(
+ ca_detail.issue(
ca = ca,
child = child,
subject_key = self.cert.getPublicKey(),
diff --git a/rpkid/testbed.py b/rpkid/testbed.py
index 786017ea..097260f3 100644
--- a/rpkid/testbed.py
+++ b/rpkid/testbed.py
@@ -142,6 +142,12 @@ pub_sql_file = cfg.get("pub_sql_file", "pubd.sql")
startup_delay = int(cfg.get("startup_delay", "10"))
+rsyncd_dir = None
+pubd_ta = None
+pubd_irbe_key = None
+pubd_irbe_cert = None
+pubd_pubd_cert = None
+
class main(object):
"""
Main program, implemented as a class to handle asynchronous I/O in
@@ -243,18 +249,19 @@ class main(object):
finally:
- try:
- rpki.log.info("Shutting down")
- for a in self.db.engines:
- a.kill_daemons()
- for p, n in ((self.rootd_process, "rootd"), (self.pubd_process, "pubd"), (self.rsyncd_process, "rsyncd")):
- if p is not None:
- rpki.log.info("Killing %s" % n)
- os.kill(p.pid, signal.SIGTERM)
- except rpki.async.ExitNow:
- raise
- except Exception, data:
- rpki.log.warn("Couldn't clean up daemons (%s), continuing" % data)
+ rpki.log.info("Shutting down")
+ for a in self.db.engines:
+ a.kill_daemons()
+ for proc, name in ((self.rootd_process, "rootd"),
+ (self.pubd_process, "pubd"),
+ (self.rsyncd_process, "rsyncd")):
+ if proc is not None:
+ rpki.log.info("Killing %s" % name)
+ try:
+ os.kill(proc.pid, signal.SIGTERM)
+ except OSError:
+ pass
+ proc.wait()
def created_rpki_objects(self):
@@ -264,9 +271,9 @@ class main(object):
a.setup_yaml_leaf()
# Set pubd's BPKI CRL
- set_pubd_crl(self.yaml_loop)
+ set_pubd_crl(lambda crl: self.yaml_loop())
- def yaml_loop(self, *ignored):
+ def yaml_loop(self):
# This is probably where we should be updating expired BPKI
# objects, particular CRLs
@@ -542,26 +549,38 @@ class allocation(object):
cb()
def apply_rekey(self, target, cb):
+
+ def done(e):
+ if isinstance(e, Exception):
+ raise e
+ cb()
+
if self.is_leaf():
raise RuntimeError, "Can't rekey YAML leaf %s, sorry" % self.name
elif target is None:
rpki.log.info("Rekeying <self/> %s" % self.name)
- self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = cb)
+ self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, rekey = "yes"), cb = done)
else:
rpki.log.info("Rekeying <parent/> %s %s" % (self.name, target))
- self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = cb)
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, rekey = "yes"), cb = done)
def apply_revoke(self, target, cb):
+
+ def done(e):
+ if isinstance(e, Exception):
+ raise e
+ cb()
+
if self.is_leaf():
rpki.log.info("Attempting to revoke YAML leaf %s" % self.name)
subprocess.check_call((prog_python, prog_poke, "-y", self.name + ".yaml", "-r", "revoke"))
cb()
elif target is None:
rpki.log.info("Revoking <self/> %s" % self.name)
- self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = cb)
+ self.call_rpkid(rpki.left_right.self_elt.make_pdu(action = "set", self_id = self.self_id, revoke = "yes"), cb = done)
else:
rpki.log.info("Revoking <parent/> %s %s" % (self.name, target))
- self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = cb)
+ self.call_rpkid(rpki.left_right.parent_elt.make_pdu(action = "set", self_id = self.self_id, parent_id = target, revoke = "yes"), cb = done)
def __str__(self):
s = self.name + "\n"
@@ -695,7 +714,17 @@ class allocation(object):
rpki.log.debug(xml)
url = "https://localhost:%d/left-right" % self.rpki_port
- self.call_rpkid_caller_cb = cb
+ def done(val):
+ rpki.log.info("Callback to rpkid %s" % self.name)
+ if isinstance(val, Exception):
+ raise val
+ msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert),
+ pretty_print = True)
+ rpki.log.debug(xml)
+ assert msg.type == "reply"
+ for pdu in msg:
+ assert not isinstance(pdu, rpki.left_right.report_error_elt)
+ cb(msg[0] if len(msg) == 1 else msg)
rpki.https.client(
client_key = self.irbe_key,
@@ -703,23 +732,11 @@ class allocation(object):
server_ta = self.rpkid_ta,
url = url,
msg = cms,
- callback = self.call_rpkid_cb,
- errback = self.call_rpkid_cb)
+ callback = done,
+ errback = done)
rpki.log.info("Call to rpkid %s returned" % self.name)
- def call_rpkid_cb(self, val):
- rpki.log.info("Callback to rpkid %s" % self.name)
- if isinstance(val, Exception):
- raise val
- msg, xml = rpki.left_right.cms_msg.unwrap(val, (self.rpkid_ta, self.rpkid_cert),
- pretty_print = True)
- rpki.log.debug(xml)
- assert msg.type == "reply"
- for pdu in msg:
- assert not isinstance(pdu, rpki.left_right.report_error_elt)
- self.call_rpkid_caller_cb(msg[0] if len(msg) == 1 else msg)
-
def cross_certify(self, certificant, reverse = False):
"""
Cross-certify and return the resulting certificate.
@@ -925,13 +942,18 @@ class allocation(object):
"""
rpki.log.info("Running cron for %s" % self.name)
+
+ def done(result):
+ assert result == "OK"
+ cb()
+
rpki.https.client(client_key = self.irbe_key,
client_cert = self.irbe_cert,
server_ta = self.rpkid_ta,
url = "https://localhost:%d/cronjob" % self.rpki_port,
msg = "Run cron now, please",
- callback = cb,
- errback = cb)
+ callback = done,
+ errback = done)
def run_yaml(self):
"""
@@ -1063,8 +1085,16 @@ def call_pubd(pdu, cb):
rpki.log.debug(xml)
url = "https://localhost:%d/control" % pubd_port
- global call_pubd_caller_cb
- call_pubd_caller_cb = cb # Global variable, icky
+ def call_pubd_cb(val):
+ if isinstance(val, Exception):
+ raise val
+ msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert),
+ pretty_print = True)
+ rpki.log.debug(xml)
+ assert msg.type == "reply"
+ for pdu in msg:
+ assert not isinstance(pdu, rpki.publication.report_error_elt)
+ cb(msg[0] if len(msg) == 1 else msg)
rpki.https.client(
client_key = pubd_irbe_key,
@@ -1075,17 +1105,6 @@ def call_pubd(pdu, cb):
callback = call_pubd_cb,
errback = call_pubd_cb)
-def call_pubd_cb(val):
- if isinstance(val, Exception):
- raise val
- msg, xml = rpki.publication.cms_msg.unwrap(val, (pubd_ta, pubd_pubd_cert),
- pretty_print = True)
- rpki.log.debug(xml)
- assert msg.type == "reply"
- for pdu in msg:
- assert not isinstance(pdu, rpki.publication.report_error_elt)
- call_pubd_caller_cb(msg[0] if len(msg) == 1 else msg)
-
def set_pubd_crl(cb):
"""
Whack publication daemon's bpki_crl. This must be configured before