aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xrpkid/pubd.py21
-rwxr-xr-xrpkid/rootd.py26
-rw-r--r--rpkid/rpki/async.py8
-rw-r--r--rpkid/rpki/left_right.py443
-rw-r--r--rpkid/rpki/log.py22
-rw-r--r--rpkid/rpki/publication.py31
-rw-r--r--rpkid/rpki/rpki_engine.py381
-rw-r--r--rpkid/rpki/up_down.py184
-rw-r--r--rpkid/rpki/xml_utils.py55
-rw-r--r--rpkid/testbed.py12
-rw-r--r--rpkid/testpoke.py3
11 files changed, 686 insertions, 500 deletions
diff --git a/rpkid/pubd.py b/rpkid/pubd.py
index 7882095e..0392db87 100755
--- a/rpkid/pubd.py
+++ b/rpkid/pubd.py
@@ -46,23 +46,26 @@ class pubd_context(object):
self.publication_base = cfg.get("publication-base", "publication/")
- def handler_common(self, query, client, certs, crl = None):
+ def handler_common(self, query, client, cb, certs, crl = None):
"""Common PDU handler code."""
+
+ def done(r_msg):
+ reply = rpki.publication.cms_msg.wrap(r_msg, self.pubd_key, self.pubd_cert, crl)
+ self.sql.sweep()
+ cb(reply)
+
q_msg = rpki.publication.cms_msg.unwrap(query, certs)
- r_msg = q_msg.serve_top_level(self, client)
- reply = rpki.publication.cms_msg.wrap(r_msg, self.pubd_key, self.pubd_cert, crl)
- self.sql.sweep()
- return reply
+ q_msg.serve_top_level(self, client, done)
def control_handler(self, query, path, cb):
"""Process one PDU from the IRBE."""
rpki.log.trace()
try:
self.sql.ping()
- return 200, self.handler_common(query, None, (self.bpki_ta, self.irbe_cert))
+ self.handler_common(query, None, lambda x: cb(200, x), (self.bpki_ta, self.irbe_cert))
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 500, "Unhandled exception %s" % data
+ cb(500, "Unhandled exception %s" % data)
def client_handler(self, query, path, cb):
"""Process one PDU from a client."""
@@ -78,10 +81,10 @@ class pubd_context(object):
config = rpki.publication.config_elt.fetch(self)
if config is None or config.bpki_crl is None:
raise rpki.exceptions.CMSCRLNotSet
- return 200, self.handler_common(query, client, (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl)
+ self.handler_common(query, client, lambda x: cb(200, x), (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl)
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 500, "Could not process PDU: %s" % data
+ cb(500, "Could not process PDU: %s" % data)
## @var https_ta_cache
# HTTPS trust anchor cache, to avoid regenerating it for every TLS connection.
diff --git a/rpkid/rootd.py b/rpkid/rootd.py
index c04a827c..21dc996a 100755
--- a/rpkid/rootd.py
+++ b/rpkid/rootd.py
@@ -144,19 +144,21 @@ def compose_response(r_msg):
rc.certs[0].cert = subject_cert
class list_pdu(rpki.up_down.list_pdu):
- def serve_pdu(self, q_msg, r_msg, ignored):
+ def serve_pdu(self, q_msg, r_msg, ignored, callback):
r_msg.payload = rpki.up_down.list_response_pdu()
compose_response(r_msg)
+ callback()
class issue_pdu(rpki.up_down.issue_pdu):
- def serve_pdu(self, q_msg, r_msg, ignored):
+ def serve_pdu(self, q_msg, r_msg, ignored, callback):
self.pkcs10.check_valid_rpki()
set_subject_pkcs10(self.pkcs10)
r_msg.payload = rpki.up_down.issue_response_pdu()
compose_response(r_msg)
+ callback()
class revoke_pdu(rpki.up_down.revoke_pdu):
- def serve_pdu(self, q_msg, r_msg, ignored):
+ def serve_pdu(self, q_msg, r_msg, ignored, callback):
subject_cert = get_subject_cert()
if subject_cert is None or subject_cert.gSKI() != self.ski:
raise rpki.exceptions.NotInDatabase
@@ -164,6 +166,7 @@ class revoke_pdu(rpki.up_down.revoke_pdu):
r_msg.payload = rpki.up_down.revoke_response_pdu()
r_msg.payload.class_name = self.class_name
r_msg.payload.ski = self.ski
+ callback()
class message_pdu(rpki.up_down.message_pdu):
name2type = {
@@ -187,20 +190,21 @@ def up_down_handler(query, path, cb):
q_msg = cms_msg.unwrap(query, (bpki_ta, child_bpki_cert))
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 400, "Could not process PDU: %s" % data
- try:
- r_msg = q_msg.serve_top_level(None)
+ return cb(400, "Could not process PDU: %s" % data)
+
+ def done(r_msg):
r_cms = cms_msg.wrap(r_msg, rootd_bpki_key, rootd_bpki_cert, rootd_bpki_crl)
- return 200, r_cms
+ cb(200, r_cms)
+
+ try:
+ q_msg.serve_top_level(None, done)
except Exception, data:
rpki.log.error(traceback.format_exc())
try:
- r_msg = q_msg.serve_error(data)
- r_cms = cms_msg.wrap(r_msg, rootd_bpki_key, rootd_bpki_cert, rootd_bpki_crl)
- return 200, r_cms
+ done(q_msg.serve_error(data))
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 500, "Could not process PDU: %s" % data
+ cb(500, "Could not process PDU: %s" % data)
os.environ["TZ"] = "UTC"
time.tzset()
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index 1c0b5a02..74a23385 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -18,6 +18,8 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
+import rpki.log
+
class iterator(object):
"""Iteration construct for event-driven code. Takes three
arguments:
@@ -39,7 +41,11 @@ class iterator(object):
def __init__(self, iterable, item_callback, done_callback):
self.item_callback = item_callback
self.done_callback = done_callback
- self.iterator = iter(iterable)
+ try:
+ self.iterator = iter(iterable)
+ except:
+ rpki.log.debug("Problem constructing iterator for %s" % repr(iterable))
+ raise
self()
def __call__(self, *ignored):
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 4769cf0e..363feddb 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -20,7 +20,7 @@ PERFORMANCE OF THIS SOFTWARE.
import base64, lxml.etree, time, traceback, os
import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils
import rpki.https, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa
-import rpki.publication
+import rpki.publication, rpki.async
# Enforce strict checking of XML "sender" field in up-down protocol
enforce_strict_up_down_xml_sender = False
@@ -105,26 +105,34 @@ class self_elt(data_elt):
"""Fetch all route_origin objects that link to this self object."""
return route_origin_elt.sql_fetch_where(self.gctx, "self_id = %s", (self.self_id,))
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for self_elt."""
rpki.log.trace()
if q_pdu.rekey:
- self.serve_rekey()
- if q_pdu.revoke:
- self.serve_revoke()
- self.unimplemented_control("reissue", "run_now", "publish_world_now")
+ self.serve_rekey(cb)
+ elif q_pdu.revoke:
+ self.serve_revoke(cb)
+ else:
+ self.unimplemented_control("reissue", "run_now", "publish_world_now")
+ cb()
- def serve_rekey(self):
+ def serve_rekey(self, cb):
"""Handle a left-right rekey action for this self."""
rpki.log.trace()
- for parent in self.parents():
- parent.serve_rekey()
- def serve_revoke(self):
+ def loop(iterator, parent):
+ parent.serve_rekey(iterator)
+
+ rpki.async.iterator(self.parents(), loop, cb)
+
+ def serve_revoke(self, cb):
"""Handle a left-right revoke action for this self."""
rpki.log.trace()
- for parent in self.parents():
- parent.serve_revoke()
+
+ def loop(iterator, parent):
+ parent.serve_revoke(iterator)
+
+ rpki.async.iterator(self.parents(), loop, cb)
def serve_fetch_one(self):
"""Find the self object upon which a get, set, or destroy action
@@ -142,29 +150,35 @@ class self_elt(data_elt):
"""
return self.sql_fetch_all(self.gctx)
- def client_poll(self, cb):
+ def client_poll(self, callback):
"""Run the regular client poll cycle with each of this self's parents in turn."""
rpki.log.trace()
- for parent in self.parents():
-
- # This will need a callback when we go event-driven
- r_msg = rpki.up_down.list_pdu.query(parent)
-
- ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas())
- for rc in r_msg.payload.classes:
- if rc.class_name in ca_map:
- ca = ca_map[rc.class_name]
- del ca_map[rc.class_name]
- ca.check_for_updates(parent, rc)
- else:
- rpki.rpki_engine.ca_obj.create(parent, rc)
- for ca in ca_map.values():
- ca.delete(parent) # CA not listed by parent
- self.gctx.sql.sweep()
+ def each(iterator, parent):
- cb()
+ def handle(r_msg):
+ ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas())
+
+ def loop(iterator, rc):
+ if rc.class_name in ca_map:
+ ca = ca_map[rc.class_name]
+ del ca_map[rc.class_name]
+ ca.check_for_updates(parent, rc, iterator)
+ else:
+ rpki.rpki_engine.ca_obj.create(parent, rc, iterator)
+
+ def done():
+ for ca in ca_map.values():
+ ca.delete(parent) # CA not listed by parent
+ self.gctx.sql.sweep()
+ iterator()
+
+ rpki.async.iterator(r_msg.payload.classes, loop, done)
+
+ rpki.up_down.list_pdu.query(parent, handle)
+
+ rpki.async.iterator(self.parents(), each, callback)
def update_children(self, cb):
"""Check for updated IRDB data for all of this self's children and
@@ -178,38 +192,48 @@ class self_elt(data_elt):
rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
- for child in self.children():
+ def loop1(iterator1, child):
+
+ def got_resources(irdb_resources):
+
+ def loop2(iterator2, child_cert):
+
+ ca_detail = child_cert.ca_detail()
+
+ if ca_detail.state == "active":
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = irdb_resources.intersection(old_resources)
+
+ if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now):
+ rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI())
+ return child_cert.reissue(
+ ca_detail = ca_detail,
+ resources = new_resources,
+ callback = iterator2)
+
+ if old_resources.valid_until < now:
+ rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
+ % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
+ ca = ca_detail.ca()
+ parent = ca.parent()
+ repository = parent.repository()
+ child_cert.sql_delete()
+ return ca_detail.generate_manifest(lambda *ignored: repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2))
+
+ iterator2()
+
+ rpki.async.iterator(child_certs, loop2, iterator1)
+
child_certs = child.child_certs()
- if not child_certs:
- continue
-
- # This will require a callback when we go event-driven
- irdb_resources = self.gctx.irdb_query(child.self_id, child.child_id)
-
- for child_cert in child_certs:
- ca_detail = child_cert.ca_detail()
- if ca_detail.state != "active":
- continue
- old_resources = child_cert.cert.get_3779resources()
- new_resources = irdb_resources.intersection(old_resources)
- if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now):
- rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI())
- child_cert.reissue(
- ca_detail = ca_detail,
- resources = new_resources)
- elif old_resources.valid_until < now:
- rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
- % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
- ca = ca_detail.ca()
- parent = ca.parent()
- repository = parent.repository()
- child_cert.sql_delete()
- ca_detail.generate_manifest()
- repository.withdraw(child_cert.cert, child_cert.uri(ca))
+ if child_certs:
+ self.gctx.irdb_query(child.self_id, child.child_id, got_resources)
+ else:
+ iterator1()
+
+ rpki.async.iterator(self.children(), loop1, cb)
- cb()
- def regenerate_crls_and_manifests(self):
+ def regenerate_crls_and_manifests(self, cb):
"""Generate new CRLs and manifests as necessary for all of this
self's CAs. Extracting nextUpdate from a manifest is hard at the
moment due to implementation silliness, so for now we generate a
@@ -223,22 +247,44 @@ class self_elt(data_elt):
rpki.log.trace()
now = rpki.sundial.now()
- for parent in self.parents():
+
+ def loop1(iterator1, parent):
repository = parent.repository()
- for ca in parent.cas():
- for ca_detail in ca.fetch_revoked():
- if now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.delete(ca, repository)
- ca_detail = ca.fetch_active()
- if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.generate_crl()
- ca_detail.generate_manifest()
-
- def update_roas(self):
+
+ def loop2(iterator2, ca):
+
+ def loop3(iterator3, ca_detail):
+ ca_detail.delete(ca, repository, iterator3)
+
+ def done3():
+
+ ca_detail = ca.fetch_active()
+
+ def do_crl():
+ ca_detail.generate_crl(do_manifest)
+
+ def do_manifest(*ignored):
+ ca_detail.generate_manifest(iterator2)
+
+ if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate():
+ do_crl()
+ else:
+ iterator2()
+
+ rpki.async.iterator([x for x in ca.fetch_revoked() if now > x.latest_crl.getNextUpdate()], loop3, done3)
+
+ rpki.async.iterator(parent.cas(), loop2, iterator1)
+
+ rpki.async.iterator(self.parents(), loop1, cb)
+
+
+ def update_roas(self, cb):
"""Generate or update ROAs for this self's route_origin objects."""
- for route_origin in self.route_origins():
- route_origin.update_roa()
+ def loop(iterator, route_origin):
+ route_origin.update_roa(iterator)
+
+ rpki.async.iterator(self.route_origins(), loop, cb)
class bsc_elt(data_elt):
@@ -272,7 +318,7 @@ class bsc_elt(data_elt):
"""Fetch all child objects that link to this BSC object."""
return child_elt.sql_fetch_where(self.gctx, "bsc_id = %s", (self.bsc_id,))
- def serve_pre_save_hook(self, q_pdu, r_pdu):
+ def serve_pre_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for bsc_elt -- handle key generation.
For now this only allows RSA with SHA-256.
"""
@@ -281,6 +327,7 @@ class bsc_elt(data_elt):
self.private_key_id = rpki.x509.RSA.generate(keylength = q_pdu.key_length or 2048)
self.pkcs10_request = rpki.x509.PKCS10.create(self.private_key_id)
r_pdu.pkcs10_request = self.pkcs10_request
+ cb()
class parent_elt(data_elt):
"""<parent/> element."""
@@ -309,37 +356,34 @@ class parent_elt(data_elt):
"""Fetch all CA objects that link to this parent object."""
return rpki.rpki_engine.ca_obj.sql_fetch_where(self.gctx, "parent_id = %s", (self.parent_id,))
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for parent_elt."""
if q_pdu.rekey:
- self.serve_rekey()
- if q_pdu.revoke:
- self.serve_revoke()
- self.unimplemented_control("reissue")
+ self.serve_rekey(cb)
+ elif q_pdu.revoke:
+ self.serve_revoke(cb)
+ else:
+ self.unimplemented_control("reissue")
+ cb()
- def serve_rekey(self):
+ def serve_rekey(self, cb):
"""Handle a left-right rekey action for this parent."""
- for ca in self.cas():
- ca.rekey()
- def serve_revoke(self):
+ def loop(iterator, ca):
+ ca.rekey(iterator)
+
+ rpki.async.iterator(self.cas(), loop, cb)
+
+ def serve_revoke(self, cb):
"""Handle a left-right revoke action for this parent."""
- for ca in self.cas():
- ca.revoke()
- def query_up_down(self, q_pdu):
- """Client code for sending one up-down query PDU to this parent.
+ def loop(iterator, ca):
+ ca.revoke(iterator)
- I haven't figured out yet whether this method should do something
- clever like dispatching via a method in the response PDU payload,
- or just hand back the whole response to the caller. In the long
- run this will have to become event driven with a context object
- that has methods of its own, but as this method is common code for
- several different queries and I don't yet know what the response
- processing looks like, it's too soon to tell what will make sense.
+ rpki.async.iterator(self.cas(), loop, cb)
- For now, keep this dead simple lock step, rewrite it later.
- """
+ def query_up_down(self, q_pdu, cb):
+ """Client code for sending one up-down query PDU to this parent."""
rpki.log.trace()
@@ -356,21 +400,21 @@ class parent_elt(data_elt):
bsc.signing_cert,
bsc.signing_cert_crl)
- der = rpki.https.client(server_ta = (self.gctx.bpki_ta,
- self.self().bpki_cert, self.self().bpki_glue,
- self.bpki_https_cert, self.bpki_https_glue),
- client_key = bsc.private_key_id,
- client_cert = bsc.signing_cert,
- msg = q_cms,
- url = self.peer_contact_uri)
-
- r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta,
- self.self().bpki_cert, self.self().bpki_glue,
- self.bpki_cms_cert, self.bpki_cms_glue))
-
- r_msg.payload.check_response()
- return r_msg
-
+ def unwrap(der):
+ r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta,
+ self.self().bpki_cert, self.self().bpki_glue,
+ self.bpki_cms_cert, self.bpki_cms_glue))
+ r_msg.payload.check_response()
+ cb(r_msg)
+
+ rpki.https.client(server_ta = (self.gctx.bpki_ta,
+ self.self().bpki_cert, self.self().bpki_glue,
+ self.bpki_https_cert, self.bpki_https_glue),
+ client_key = bsc.private_key_id,
+ client_cert = bsc.signing_cert,
+ msg = q_cms,
+ url = self.peer_contact_uri,
+ callback = unwrap)
class child_elt(data_elt):
"""<child/> element."""
@@ -408,12 +452,13 @@ class child_elt(data_elt):
raise rpki.exceptions.ClassNameMismatch, "Class name mismatch: child.self_id = %d, parent.self_id = %d" % (self.self_id, parent.self_id)
return ca
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for child_elt."""
self.unimplemented_control("reissue")
if self.clear_https_ta_cache:
self.gctx.clear_https_ta_cache()
self.clear_https_ta_cache = False
+ cb()
def endElement(self, stack, name, text):
"""Handle subelements of <child/> element. These require special
@@ -424,7 +469,7 @@ class child_elt(data_elt):
if name in self.elements:
self.clear_https_ta_cache = True
- def serve_up_down(self, query):
+ def serve_up_down(self, query, callback):
"""Outer layer of server handling for one up-down PDU from this child."""
rpki.log.trace()
@@ -438,19 +483,24 @@ class child_elt(data_elt):
q_msg.payload.gctx = self.gctx
if enforce_strict_up_down_xml_sender and q_msg.sender != str(self.child_id):
raise rpki.exceptions.BadSender, "Unexpected XML sender %s" % q_msg.sender
+
+ def done(r_msg):
+ #
+ # Exceptions from this point on are problematic, as we have no
+ # sane way of reporting errors in the error reporting mechanism.
+ # May require refactoring, ignore the issue for now.
+ #
+ r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id,
+ bsc.signing_cert, bsc.signing_cert_crl)
+ callback(r_cms)
+
try:
- r_msg = q_msg.serve_top_level(self)
+ q_msg.serve_top_level(self, done)
+ except rpki.exceptions.NoActiveCA, data:
+ done(q_msg.serve_error(data))
except Exception, data:
rpki.log.error(traceback.format_exc())
- r_msg = q_msg.serve_error(data)
- #
- # Exceptions from this point on are problematic, as we have no
- # sane way of reporting errors in the error reporting mechanism.
- # May require refactoring, ignore the issue for now.
- #
- r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id,
- bsc.signing_cert, bsc.signing_cert_crl)
- return r_cms
+ done(q_msg.serve_error(data))
class repository_elt(data_elt):
"""<repository/> element."""
@@ -468,41 +518,11 @@ class repository_elt(data_elt):
bpki_https_cert = None
bpki_https_glue = None
- use_pubd = True
-
def parents(self):
"""Fetch all parent objects that link to this repository object."""
return parent_elt.sql_fetch_where(self.gctx, "repository_id = %s", (self.repository_id,))
- @staticmethod
- def uri_to_filename(base, uri):
- """Convert a URI to a filename. [TEMPORARY]"""
- if not uri.startswith("rsync://"):
- raise rpki.exceptions.BadURISyntax
- filename = base + uri[len("rsync://"):]
- if filename.find("//") >= 0 or filename.find("/../") >= 0 or filename.endswith("/.."):
- raise rpki.exceptions.BadURISyntax
- return filename
-
- @classmethod
- def object_write(cls, base, uri, obj):
- """Write an object to disk. [TEMPORARY]"""
- rpki.log.trace()
- filename = cls.uri_to_filename(base, uri)
- dirname = os.path.dirname(filename)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
- f = open(filename, "wb")
- f.write(obj.get_DER())
- f.close()
-
- @classmethod
- def object_delete(cls, base, uri):
- """Delete an object from disk. [TEMPORARY]"""
- rpki.log.trace()
- os.remove(cls.uri_to_filename(base, uri))
-
- def call_pubd(self, *pdus):
+ def call_pubd(self, callback, *pdus):
"""Send a message to publication daemon and return the response."""
rpki.log.trace()
bsc = self.bsc()
@@ -510,33 +530,31 @@ class repository_elt(data_elt):
q_msg.type = "query"
q_cms = rpki.publication.cms_msg.wrap(q_msg, bsc.private_key_id, bsc.signing_cert, bsc.signing_cert_crl)
bpki_ta_path = (self.gctx.bpki_ta, self.self().bpki_cert, self.self().bpki_glue, self.bpki_https_cert, self.bpki_https_glue)
- r_cms = rpki.https.client(
+
+ def done(r_cms):
+ r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path)
+ assert len(r_msg) == 1
+ callback(r_msg[0])
+
+ rpki.https.client(
client_key = bsc.private_key_id,
client_cert = bsc.signing_cert,
server_ta = bpki_ta_path,
url = self.peer_contact_uri,
- msg = q_cms)
- r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path)
- assert len(r_msg) == 1
- return r_msg[0]
+ msg = q_cms,
+ callback = done)
- def publish(self, obj, uri):
- """Placeholder for publication operation. [TEMPORARY]"""
+ def publish(self, obj, uri, callback):
+ """Publish one object in the repository."""
rpki.log.trace()
rpki.log.info("Publishing %s as %s" % (repr(obj), repr(uri)))
- if self.use_pubd:
- self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj))
- else:
- self.object_write(self.gctx.publication_kludge_base, uri, obj)
+ self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj))
- def withdraw(self, obj, uri):
- """Placeholder for publication withdrawal operation. [TEMPORARY]"""
+ def withdraw(self, obj, uri, callback):
+ """Withdraw one object from the repository."""
rpki.log.trace()
rpki.log.info("Withdrawing %s from at %s" % (repr(obj), repr(uri)))
- if self.use_pubd:
- self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri))
- else:
- self.object_delete(self.gctx.publication_kludge_base, uri)
+ self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri))
class route_origin_elt(data_elt):
"""<route_origin/> element."""
@@ -590,9 +608,10 @@ class route_origin_elt(data_elt):
"""Fetch all ca_detail objects that link to this route_origin object."""
return rpki.rpki_engine.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for route_origin_elt."""
self.unimplemented_control("suppress_publication")
+ cb()
def startElement(self, stack, name, attrs):
"""Handle <route_origin/> element. This requires special
@@ -607,35 +626,37 @@ class route_origin_elt(data_elt):
if self.ipv6 is not None:
self.ipv6 = rpki.resource_set.roa_prefix_set_ipv6(self.ipv6)
- def update_roa(self):
+ def update_roa(self, callback):
"""Bring this route_origin's ROA up to date if necesssary."""
if self.roa is None:
- return self.generate_roa()
+ return self.generate_roa(callback)
ca_detail = self.ca_detail()
if ca_detail is None or ca_detail.state != "active":
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin)
if rpki.sundial.now() + regen_margin > self.cert.getNotAfter():
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
ca_resources = ca_detail.latest_ca_cert.get_3779resources()
ee_resources = self.cert.get_3779resources()
if ee_resources.oversized(ca_resources):
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4()
v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6()
if ee_resources.v4 != v4 or ee_resources.v6 != v6:
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
+
+ callback()
- def generate_roa(self):
+ def generate_roa(self, callback):
"""Generate a ROA based on this <route_origin/> object.
At present this does not support ROAs with multiple signatures
@@ -698,12 +719,17 @@ class route_origin_elt(data_elt):
self.sql_store()
repository = ca.parent().repository()
- repository.publish(self.roa, self.roa_uri(ca))
- if self.publish_ee_separately:
- repository.publish(self.cert, self.ee_uri(ca))
- ca_detail.generate_manifest()
- def withdraw_roa(self, regenerate = False):
+ def one():
+ repository.publish(self.cert, self.ee_uri(ca), two)
+
+ def two(*ignored):
+ ca_detail.generate_manifest(callback)
+
+ repository.publish(self.roa, self.roa_uri(ca),
+ one if self.publish_ee_separately else two)
+
+ def withdraw_roa(self, callback, regenerate = False):
"""Withdraw ROA associated with this route_origin.
In order to preserve make-before-break properties without
@@ -721,24 +747,34 @@ class route_origin_elt(data_elt):
if ca_detail.state != 'active':
self.ca_detail_id = None
+
+ def one(*ignored):
+ rpki.log.debug("Withdrawing ROA and revoking its EE cert")
+ rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
+ repository.withdraw(roa, roa_uri,
+ two if self.publish_ee_separately else three)
+
+ def two():
+ repository.withdraw(cert, ee_uri, three)
+
+ def three(*ignored):
+ self.gctx.sql.sweep()
+ ca_detail.generate_crl(four)
+
+ def four(*ignored):
+ ca_detail.generate_manifest(callback)
+
if regenerate:
- self.generate_roa()
-
- rpki.log.debug("Withdrawing ROA and revoking its EE cert")
- rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
- repository.withdraw(roa, roa_uri)
- if self.publish_ee_separately:
- repository.withdraw(cert, ee_uri)
- self.gctx.sql.sweep()
- ca_detail.generate_crl()
- ca_detail.generate_manifest()
-
- def regenerate_roa(self):
+ self.generate_roa(one)
+ else:
+ one()
+
+ def regenerate_roa(self, callback):
"""Reissue ROA associated with this route_origin."""
if self.ca_detail() is None:
- self.generate_roa()
+ self.generate_roa(callback)
else:
- self.withdraw_roa(regenerate = True)
+ self.withdraw_roa(callback, regenerate = True)
def roa_uri(self, ca, key = None):
"""Return the publication URI for this route_origin's ROA."""
@@ -814,14 +850,19 @@ class msg(rpki.xml_utils.msg, left_right_namespace):
for x in (self_elt, child_elt, parent_elt, bsc_elt, repository_elt,
route_origin_elt, list_resources_elt, report_error_elt))
- def serve_top_level(self, gctx):
+ def serve_top_level(self, gctx, cb):
"""Serve one msg PDU."""
r_msg = self.__class__()
r_msg.type = "reply"
- for q_pdu in self:
+
+ def loop(iterator, q_pdu):
q_pdu.gctx = gctx
- q_pdu.serve_dispatch(r_msg)
- return r_msg
+ q_pdu.serve_dispatch(r_msg, iterator)
+
+ def done():
+ cb(r_msg)
+
+ rpki.async.iterator(self, loop, done)
class sax_handler(rpki.xml_utils.sax_handler):
"""SAX handler for Left-Right protocol."""
diff --git a/rpkid/rpki/log.py b/rpkid/rpki/log.py
index efaba10c..7eaee040 100644
--- a/rpkid/rpki/log.py
+++ b/rpkid/rpki/log.py
@@ -17,17 +17,30 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
-import syslog, traceback
+import syslog, traceback, sys, os
## @var enable_trace
# Whether call tracing is enabled.
enable_trace = False
+## @var use_syslog
+# Whether to use syslog
+
+use_syslog = False
+
+tag = ""
+pid = 0
+
def init(ident = "rpki", flags = syslog.LOG_PID | syslog.LOG_PERROR, facility = syslog.LOG_DAEMON):
"""Initialize logging system."""
- return syslog.openlog(ident, flags, facility)
+ if use_syslog:
+ return syslog.openlog(ident, flags, facility)
+ else:
+ global tag, pid
+ tag = ident
+ pid = os.getpid()
def set_trace(trace):
"""Enable or disable call tracing."""
@@ -42,7 +55,10 @@ class logger(object):
self.priority = priority
def __call__(self, message):
- return syslog.syslog(self.priority, message)
+ if use_syslog:
+ return syslog.syslog(self.priority, message)
+ else:
+ sys.stderr.write("%s[%d]: %s\n" % (tag, pid, message))
error = logger(syslog.LOG_ERR)
warn = logger(syslog.LOG_WARNING)
diff --git a/rpkid/rpki/publication.py b/rpkid/rpki/publication.py
index fe52b631..8d5ff267 100644
--- a/rpkid/rpki/publication.py
+++ b/rpkid/rpki/publication.py
@@ -30,13 +30,13 @@ class publication_namespace(object):
class control_elt(rpki.xml_utils.data_elt, rpki.sql.sql_persistant, publication_namespace):
"""Virtual class for control channel objects."""
- def serve_dispatch(self, r_msg, client):
+ def serve_dispatch(self, r_msg, client, cb):
"""Action dispatch handler. This needs special handling because
we need to make sure that this PDU arrived via the control channel.
"""
if client is not None:
raise rpki.exceptions.BadQuery, "Control query received on client channel"
- rpki.xml_utils.data_elt.serve_dispatch(self, r_msg)
+ rpki.xml_utils.data_elt.serve_dispatch(self, r_msg, cb)
class config_elt(control_elt):
"""<config/> element. This is a little weird because there should
@@ -70,14 +70,14 @@ class config_elt(control_elt):
"""
return cls.sql_fetch(gctx, cls.wired_in_config_id)
- def serve_set(self, r_msg):
+ def serve_set(self, r_msg, cb):
"""Handle a set action. This requires special handling because
- config we don't support the create method.
+ config doesn't support the create method.
"""
if self.sql_fetch(self.gctx, self.config_id) is None:
- control_elt.serve_create(self, r_msg)
+ control_elt.serve_create(self, r_msg, cb)
else:
- control_elt.serve_set(self, r_msg)
+ control_elt.serve_set(self, r_msg, cb)
def serve_fetch_one(self):
"""Find the config object on which a get or set method should
@@ -112,11 +112,12 @@ class client_elt(control_elt):
if name in self.elements:
self.clear_https_ta_cache = True
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for client_elt."""
if self.clear_https_ta_cache:
self.gctx.clear_https_ta_cache()
self.clear_https_ta_cache = False
+ cb()
def serve_fetch_one(self):
"""Find the client object on which a get, set, or destroy method
@@ -161,7 +162,7 @@ class publication_object_elt(rpki.xml_utils.base_elt, publication_namespace):
elt.text = base64.b64encode(self.payload.get_DER())
return elt
- def serve_dispatch(self, r_msg, client):
+ def serve_dispatch(self, r_msg, client, cb):
"""Action dispatch handler."""
if client is None:
raise rpki.exceptions.BadQuery, "Client query received on control channel"
@@ -176,6 +177,7 @@ class publication_object_elt(rpki.xml_utils.base_elt, publication_namespace):
r_pdu.tag = self.tag
r_pdu.uri = self.uri
r_msg.append(r_pdu)
+ cb()
def serve_publish(self):
"""Publish an object."""
@@ -256,16 +258,21 @@ class msg(rpki.xml_utils.msg, publication_namespace):
pdus = dict((x.element_name, x)
for x in (config_elt, client_elt, certificate_elt, crl_elt, manifest_elt, roa_elt, report_error_elt))
- def serve_top_level(self, gctx, client):
+ def serve_top_level(self, gctx, client, cb):
"""Serve one msg PDU."""
if self.type != "query":
raise rpki.exceptions.BadQuery, "Message type is not query"
r_msg = self.__class__()
r_msg.type = "reply"
- for q_pdu in self:
+
+ def loop(iterator, q_pdu):
q_pdu.gctx = gctx
- q_pdu.serve_dispatch(r_msg, client)
- return r_msg
+ q_pdu.serve_dispatch(r_msg, client, iterator)
+
+ def done():
+ cb(r_msg)
+
+ rpki.async.iterator(self, loop, done)
class sax_handler(rpki.xml_utils.sax_handler):
"""SAX handler for publication protocol."""
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py
index 5f89c338..c373e856 100644
--- a/rpkid/rpki/rpki_engine.py
+++ b/rpkid/rpki/rpki_engine.py
@@ -41,16 +41,8 @@ class rpkid_context(object):
self.publication_kludge_base = cfg.get("publication-kludge-base", "publication/")
- def irdb_query(self, self_id, child_id = None):
- """Perform an IRDB callback query. In the long run this should not
- be a blocking routine, it should instead issue a query and set up a
- handler to receive the response. For the moment, though, we are
- doing simple lock step and damn the torpedos. Not yet doing
- anything useful with subject name. Most likely this function should
- really be wrapped up in a class that carries both the query result
- and also the intermediate state needed for the event-driven code
- that this function will need to become.
- """
+ def irdb_query(self, self_id, child_id, callback):
+ """Perform an IRDB callback query."""
rpki.log.trace()
@@ -60,40 +52,52 @@ class rpkid_context(object):
q_msg[0].self_id = self_id
q_msg[0].child_id = child_id
q_cms = rpki.left_right.cms_msg.wrap(q_msg, self.rpkid_key, self.rpkid_cert)
- der = rpki.https.client(
+
+ def unwrap(der):
+ r_msg = rpki.left_right.cms_msg.unwrap(der, (self.bpki_ta, self.irdb_cert))
+ if len(r_msg) == 0 or not isinstance(r_msg[0], rpki.left_right.list_resources_elt) or r_msg.type != "reply":
+ raise rpki.exceptions.BadIRDBReply, "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii")
+ callback(rpki.resource_set.resource_bag(
+ asn = r_msg[0].asn,
+ v4 = r_msg[0].ipv4,
+ v6 = r_msg[0].ipv6,
+ valid_until = r_msg[0].valid_until))
+
+ rpki.https.client(
server_ta = (self.bpki_ta, self.irdb_cert),
client_key = self.rpkid_key,
client_cert = self.rpkid_cert,
url = self.irdb_url,
- msg = q_cms)
- r_msg = rpki.left_right.cms_msg.unwrap(der, (self.bpki_ta, self.irdb_cert))
- if len(r_msg) == 0 or not isinstance(r_msg[0], rpki.left_right.list_resources_elt) or r_msg.type != "reply":
- raise rpki.exceptions.BadIRDBReply, "Unexpected response to IRDB query: %s" % lxml.etree.tostring(r_msg.toXML(), pretty_print = True, encoding = "us-ascii")
- return rpki.resource_set.resource_bag(
- asn = r_msg[0].asn,
- v4 = r_msg[0].ipv4,
- v6 = r_msg[0].ipv6,
- valid_until = r_msg[0].valid_until)
+ msg = q_cms,
+ callback = unwrap)
def left_right_handler(self, query, path, cb):
"""Process one left-right PDU."""
rpki.log.trace()
+
+ def done(r_msg):
+ reply = rpki.left_right.cms_msg.wrap(r_msg, self.rpkid_key, self.rpkid_cert)
+ self.sql.sweep()
+ cb(200, reply)
+
try:
self.sql.ping()
q_msg = rpki.left_right.cms_msg.unwrap(query, (self.bpki_ta, self.irbe_cert))
if q_msg.type != "query":
raise rpki.exceptions.BadQuery, "Message type is not query"
- r_msg = q_msg.serve_top_level(self)
- reply = rpki.left_right.cms_msg.wrap(r_msg, self.rpkid_key, self.rpkid_cert)
- self.sql.sweep()
- return 200, reply
+ q_msg.serve_top_level(self, done)
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 500, "Unhandled exception %s" % data
+ cb(500, "Unhandled exception %s" % data)
def up_down_handler(self, query, path, cb):
"""Process one up-down PDU."""
rpki.log.trace()
+
+ def done(reply):
+ self.sql.sweep()
+ cb(200, reply)
+
try:
self.sql.ping()
child_id = path.partition("/up-down/")[2]
@@ -102,12 +106,10 @@ class rpkid_context(object):
child = rpki.left_right.child_elt.sql_fetch(self, long(child_id))
if child is None:
raise rpki.exceptions.ChildNotFound, "Could not find child %s" % child_id
- reply = child.serve_up_down(query)
- self.sql.sweep()
- return 200, reply
+ child.serve_up_down(query, done)
except Exception, data:
rpki.log.error(traceback.format_exc())
- return 400, "Could not process PDU: %s" % data
+ cb(400, "Could not process PDU: %s" % data)
def cronjob_handler(self, query, path, cb):
"""Periodic tasks. This will need another rewrite once we have internal timers."""
@@ -115,31 +117,31 @@ class rpkid_context(object):
rpki.log.trace()
self.sql.ping()
- def cronjob_do_one(iterator, s):
+ def each(iterator, s):
- def client_poll():
+ def one():
rpki.log.debug("Self %s polling parents" % s.self_id)
- s.client_poll(update_children)
+ s.client_poll(two)
- def update_children():
+ def two():
rpki.log.debug("Self %s updating children" % s.self_id)
- s.update_children(update_roas_crls_and_manifests)
+ s.update_children(three)
- def update_roas_crls_and_manifests():
+ def three():
rpki.log.debug("Self %s updating ROAs" % s.self_id)
- s.update_roas()
+ s.update_roas(four)
+
+ def four():
rpki.log.debug("Self %s regenerating CRLs and manifests" % s.self_id)
- s.regenerate_crls_and_manifests()
- iterator()
+ s.regenerate_crls_and_manifests(iterator)
- client_poll()
+ one()
- def cronjob_done():
+ def done():
self.sql.sweep()
cb(200, "OK")
- rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self),
- cronjob_do_one, cronjob_done)
+ rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), each, done)
## @var https_ta_cache
# HTTPS trust anchor cache, to avoid regenerating it for every TLS connection.
@@ -223,7 +225,7 @@ class ca_obj(rpki.sql.sql_persistant):
raise rpki.exceptions.BadURISyntax, "SIA URI must end with a slash: %s" % sia_uri
return sia_uri + str(self.ca_id) + "/"
- def check_for_updates(self, parent, rc):
+ def check_for_updates(self, parent, rc, cb):
"""Parent has signaled continued existance of a resource class we
already knew about, so we need to check for an updated
certificate, changes in resource coverage, revocation and reissue
@@ -239,37 +241,45 @@ class ca_obj(rpki.sql.sql_persistant):
rc_resources = rc.to_resource_bag()
cert_map = dict((c.cert.get_SKI(), c) for c in rc.certs)
- for ca_detail in ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)):
+ def loop(iterator, ca_detail):
ski = ca_detail.latest_ca_cert.get_SKI()
if ski not in cert_map:
rpki.log.warn("Certificate in database missing from list_response, class %s, SKI %s, maybe parent certificate went away?"
% (repr(rc.class_name), ca_detail.latest_ca_cert.gSKI()))
- ca_detail.delete(self, parent.repository())
- continue
+ return ca_detail.delete(self, parent.repository(), iterator)
+
+ def cleanup():
+ del cert_map[ski]
+ iterator()
if ca_detail.state in ("pending", "active"):
current_resources = ca_detail.latest_ca_cert.get_3779resources()
- if sia_uri_changed or \
- ca_detail.latest_ca_cert != cert_map[ski].cert or \
- current_resources.undersized(rc_resources) or \
- current_resources.oversized(rc_resources):
- ca_detail.update(
+ if (sia_uri_changed or
+ ca_detail.latest_ca_cert != cert_map[ski].cert or
+ current_resources.undersized(rc_resources) or
+ current_resources.oversized(rc_resources)):
+ return ca_detail.update(
parent = parent,
ca = self,
rc = rc,
sia_uri_changed = sia_uri_changed,
- old_resources = current_resources)
+ old_resources = current_resources,
+ callback = cleanup)
+
+ cleanup()
- del cert_map[ski]
+ def done():
+ if cert_map:
+ rpki.log.warn("Certificates in list_response missing from our database, class %s, SKIs %s"
+ % (repr(rc.class_name), ", ".join(c.cert.gSKI() for c in cert_map.values())))
+ cb()
- if cert_map:
- rpki.log.warn("Certificates in list_response missing from our database, class %s, SKIs %s"
- % (repr(rc.class_name), ", ".join(c.cert.gSKI() for c in cert_map.values())))
+ rpki.async.iterator(ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)), loop, done)
@classmethod
- def create(cls, parent, rc):
+ def create(cls, parent, rc, cb):
"""Parent has signaled existance of a new resource class, so we
need to create and set up a corresponding CA object.
"""
@@ -282,13 +292,14 @@ class ca_obj(rpki.sql.sql_persistant):
self.sia_uri = self.construct_sia_uri(parent, rc)
ca_detail = ca_detail_obj.create(self)
- # This will need a callback when we go event-driven
- issue_response = rpki.up_down.issue_pdu.query(parent, self, ca_detail)
+ def done(issue_response):
+ ca_detail.activate(
+ ca = self,
+ cert = issue_response.payload.classes[0].certs[0].cert,
+ uri = issue_response.payload.classes[0].certs[0].cert_url,
+ callback = cb)
- ca_detail.activate(
- ca = self,
- cert = issue_response.payload.classes[0].certs[0].cert,
- uri = issue_response.payload.classes[0].certs[0].cert_url)
+ rpki.up_down.issue_pdu.query(parent, self, ca_detail, done)
def delete(self, parent):
"""The list of current resource classes received from parent does
@@ -324,7 +335,7 @@ class ca_obj(rpki.sql.sql_persistant):
self.sql_mark_dirty()
return self.last_crl_sn
- def rekey(self):
+ def rekey(self, cb):
"""Initiate a rekey operation for this ca. Generate a new
keypair. Request cert from parent using new keypair. Mark result
as our active ca_detail. Reissue all child certs issued by this
@@ -337,22 +348,25 @@ class ca_obj(rpki.sql.sql_persistant):
old_detail = self.fetch_active()
new_detail = ca_detail_obj.create(self)
- # This will need a callback when we go event-driven
- issue_response = rpki.up_down.issue_pdu.query(parent, self, new_detail)
+ def done(issue_response):
+ new_detail.activate(
+ ca = self,
+ cert = issue_response.payload.classes[0].certs[0].cert,
+ uri = issue_response.payload.classes[0].certs[0].cert_url,
+ predecessor = old_detail,
+ callback = cb)
- new_detail.activate(
- ca = self,
- cert = issue_response.payload.classes[0].certs[0].cert,
- uri = issue_response.payload.classes[0].certs[0].cert_url,
- predecessor = old_detail)
+ rpki.up_down.issue_pdu.query(parent, self, new_detail, done)
- def revoke(self):
+ def revoke(self, cb):
"""Revoke deprecated ca_detail objects associated with this ca."""
rpki.log.trace()
- for ca_detail in self.fetch_deprecated():
- ca_detail.revoke()
+ def loop(iterator, ca_detail):
+ ca_detail.revoke(iterator)
+
+ rpki.async.iterator(self.fetch_deprecated(), loop, cb)
class ca_detail_obj(rpki.sql.sql_persistant):
"""Internal CA detail object."""
@@ -408,40 +422,66 @@ class ca_detail_obj(rpki.sql.sql_persistant):
"""Return publication URI for this ca_detail's manifest."""
return ca.sia_uri + self.public_key.gSKI() + ".mnf"
- def activate(self, ca, cert, uri, predecessor = None):
+ def activate(self, ca, cert, uri, predecessor = None, callback = None):
"""Activate this ca_detail."""
+ assert callback is not None # hack to catch positional arguments
+
self.latest_ca_cert = cert
self.ca_cert_uri = uri.rsync()
self.generate_manifest_cert(ca)
- self.generate_crl()
- self.generate_manifest()
- self.state = "active"
- self.sql_mark_dirty()
- if predecessor is not None:
- predecessor.state = "deprecated"
- predecessor.sql_mark_dirty()
- for child_cert in predecessor.child_certs():
- child_cert.reissue(self)
- for route_origin in predecessor.route_origins():
- route_origin.regenerate_roa()
+ def did_crl(*ignored):
+ self.generate_manifest(callback = did_manifest)
+
+ def did_manifest(*ignored):
+ self.state = "active"
+ self.sql_mark_dirty()
+ if predecessor is None:
+ callback()
+ else:
+ predecessor.state = "deprecated"
+ predecessor.sql_mark_dirty()
+ rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs)
+
+ def do_one_child_cert(iterator, child_cert):
+ child_cert.reissue(self, iterator)
+
+ def done_child_certs():
+ rpki.async.iterator(predecessor.route_origins(), do_one_route_origin, callback)
+
+ def do_one_route_origin(iterator, route_origin):
+ route_origin.regenerate_roa(iterator)
- def delete(self, ca, repository):
+ self.generate_crl(callback = did_crl)
+
+ def delete(self, ca, repository, cb):
"""Delete this ca_detail and all of the certs it issued."""
- for child_cert in self.child_certs():
- repository.withdraw(child_cert.cert, child_cert.uri(ca))
- child_cert.sql_delete()
- for revoked_cert in self.revoked_certs():
- revoked_cert.sql_delete()
- for route_origin in self.route_origins():
- route_origin.withdraw_roa()
- repository.withdraw(self.latest_manifest, self.manifest_uri(ca))
- repository.withdraw(self.latest_crl, self.crl_uri(ca))
- self.sql_delete()
+ def withdraw_one_child(iterator, child_cert):
+ repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator)
- def revoke(self):
+ def child_certs_done():
+ rpki.async.iterator(self.route_origins(), withdraw_one_roa, withdraw_manifest)
+
+ def withdraw_one_roa(iterator, route_origin):
+ route_origin.withdraw_roa(iterator)
+
+ def withdraw_manifest(*ignored):
+ repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl)
+
+ def withdraw_crl(*ignored):
+ repository.withdraw(self.latest_crl, self.crl_uri(ca), done)
+
+ def done(*ignored):
+ for cert in self.child_certs() + self.revoked_certs():
+ cert.sql_delete()
+ self.sql_delete()
+ cb()
+
+ rpki.async.iterator(self.child_certs(), withdraw_one_child, child_certs_done)
+
+ def revoke(self, cb):
"""Request revocation of all certificates whose SKI matches the key for this ca_detail.
Tasks:
@@ -454,63 +494,79 @@ class ca_detail_obj(rpki.sql.sql_persistant):
the revoked certs, with a next CRL time after the last cert or
CRL signed by the old keypair will have expired.
- - Destroy old keypair (and manifest keypair).
+ - Generate a corresponding final manifest.
+
+ - Destroy old keypairs.
- - Leave final CRL in place until its next CRL time has passed.
+ - Leave final CRL and manifest in place until their nextupdate time has passed.
"""
- # This will need a callback when we go event-driven
- r_msg = rpki.up_down.revoke_pdu.query(self)
+ def parent_revoked(r_msg):
- if r_msg.payload.ski != self.latest_ca_cert.gSKI():
- raise rpki.exceptions.SKIMismatch
+ if r_msg.payload.ski != self.latest_ca_cert.gSKI():
+ raise rpki.exceptions.SKIMismatch
- ca = self.ca()
- parent = ca.parent()
- crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval)
+ ca = self.ca()
+ parent = ca.parent()
+ crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval)
- nextUpdate = rpki.sundial.now()
+ self.nextUpdate = rpki.sundial.now()
- if self.latest_manifest is not None:
- nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate())
+ if self.latest_manifest is not None:
+ self.nextUpdate = self.nextUpdate.later(self.latest_manifest.getNextUpdate())
- if self.latest_crl is not None:
- nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate())
+ if self.latest_crl is not None:
+ self.nextUpdate = self.nextUpdate.later(self.latest_crl.getNextUpdate())
- for child_cert in self.child_certs():
- nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter())
- child_cert.revoke()
+ def revoke_one_child(iterator, child_cert):
+ self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter())
+ child_cert.revoke(iterator)
- nextUpdate += crl_interval
+ def final_crl():
+ self.nextUpdate += crl_interval
+ self.generate_crl(callback = final_manifest, nextUpdate = self.nextUpdate)
- self.generate_crl(nextUpdate)
- self.generate_manifest(nextUpdate)
+ def final_manifest(*ignored):
+ self.generate_manifest(callback = done, nextUpdate = self.nextUpdate)
- self.private_key_id = None
- self.manifest_private_key_id = None
- self.manifest_public_key = None
- self.latest_manifest_cert = None
- self.state = "revoked"
- self.sql_mark_dirty()
+ def done(*ignored):
+ self.private_key_id = None
+ self.manifest_private_key_id = None
+ self.manifest_public_key = None
+ self.latest_manifest_cert = None
+ self.state = "revoked"
+ self.sql_mark_dirty()
+ cb()
- def update(self, parent, ca, rc, sia_uri_changed, old_resources):
+ rpki.async.iterator(self.child_certs(), revoke_one_child, final_crl)
+
+ rpki.up_down.revoke_pdu.query(self, parent_revoked)
+
+ def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback):
"""Need to get a new certificate for this ca_detail and perhaps
frob children of this ca_detail.
"""
- # This will need a callback when we go event-driven
- issue_response = rpki.up_down.issue_pdu.query(parent, ca, self)
-
- self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert
- new_resources = self.latest_ca_cert.get_3779resources()
+ def issued(issue_response):
+ self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert
+ new_resources = self.latest_ca_cert.get_3779resources()
- if sia_uri_changed or old_resources.oversized(new_resources):
- for child_cert in self.child_certs():
+ def loop(iterator, child_cert):
child_resources = child_cert.cert.get_3779resources()
if sia_uri_changed or child_resources.oversized(new_resources):
child_cert.reissue(
ca_detail = self,
- resources = child_resources.intersection(new_resources))
+ resources = child_resources.intersection(new_resources),
+ callback = iterator)
+ else:
+ iterator()
+
+ if sia_uri_changed or old_resources.oversized(new_resources):
+ rpki.async.iterator(self.child_certs(), loop, callback)
+ else:
+ callback()
+
+ rpki.up_down.issue_pdu.query(parent, ca, self, issued)
@classmethod
def create(cls, ca):
@@ -554,7 +610,7 @@ class ca_detail_obj(rpki.sql.sql_persistant):
self.latest_manifest_cert = self.issue_ee(ca, resources, self.manifest_public_key)
- def issue(self, ca, child, subject_key, sia, resources, child_cert = None):
+ def issue(self, ca, child, subject_key, sia, resources, callback, child_cert = None):
"""Issue a new certificate to a child. Optional child_cert
argument specifies an existing child_cert object to update in
place; if not specified, we create a new one. Returns the
@@ -589,13 +645,15 @@ class ca_detail_obj(rpki.sql.sql_persistant):
child_cert.sql_store()
- ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca))
+ def published(*ignored):
+ self.generate_manifest(done)
- self.generate_manifest()
-
- return child_cert
+ def done(*ignored):
+ callback(child_cert)
+
+ ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published)
- def generate_crl(self, nextUpdate = None):
+ def generate_crl(self, callback, nextUpdate = None):
"""Generate a new CRL for this ca_detail. At the moment this is
unconditional, that is, it is up to the caller to decide whether a
new CRL is needed.
@@ -626,9 +684,9 @@ class ca_detail_obj(rpki.sql.sql_persistant):
nextUpdate = nextUpdate,
revokedCertificates = certlist)
- repository.publish(self.latest_crl, self.crl_uri(ca))
+ repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback)
- def generate_manifest(self, nextUpdate = None):
+ def generate_manifest(self, callback, nextUpdate = None):
"""Generate a new manifest for this ca_detail."""
ca = self.ca()
@@ -658,7 +716,7 @@ class ca_detail_obj(rpki.sql.sql_persistant):
keypair = self.manifest_private_key_id,
certs = self.latest_manifest_cert)
- repository.publish(self.latest_manifest, self.manifest_uri(ca))
+ repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback)
class child_cert_obj(rpki.sql.sql_persistant):
"""Certificate that has been issued to a child."""
@@ -696,18 +754,23 @@ class child_cert_obj(rpki.sql.sql_persistant):
"""Return the publication URI for this child_cert."""
return ca.sia_uri + self.uri_tail()
- def revoke(self):
+ def revoke(self, callback):
"""Revoke a child cert."""
+
rpki.log.debug("Revoking %s" % repr(self))
ca_detail = self.ca_detail()
ca = ca_detail.ca()
revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail)
repository = ca.parent().repository()
- repository.withdraw(self.cert, self.uri(ca))
- self.gctx.sql.sweep()
- self.sql_delete()
- def reissue(self, ca_detail, resources = None, sia = None):
+ def done(*ignored):
+ self.gctx.sql.sweep()
+ self.sql_delete()
+ callback()
+
+ repository.withdraw(self.cert, self.uri(ca), done)
+
+ def reissue(self, ca_detail, callback = None, resources = None, sia = None):
"""Reissue an existing cert, reusing the public key. If the cert
we would generate is identical to the one we already have, we just
return the one we already have. If we have to revoke the old
@@ -716,6 +779,8 @@ class child_cert_obj(rpki.sql.sql_persistant):
child_cert_obj must use the return value from this method.
"""
+ assert callback is not None
+
ca = ca_detail.ca()
child = self.child()
@@ -732,7 +797,7 @@ class child_cert_obj(rpki.sql.sql_persistant):
assert resources.valid_until is not None and old_resources.valid_until is not None
if resources == old_resources and sia == old_sia and ca_detail == old_ca_detail:
- return self
+ return callback(self)
must_revoke = old_resources.oversized(resources) or old_resources.valid_until > resources.valid_until
new_issuer = ca_detail != old_ca_detail
@@ -745,20 +810,24 @@ class child_cert_obj(rpki.sql.sql_persistant):
else:
child_cert = self
+ def revoke(child_cert):
+
+ def do_one_cert(iterator, cert):
+ cert.revoke(iterator)
+
+ def done():
+ callback(child_cert)
+
+ rpki.async.iterator([x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert], do_one_cert, done)
+
child_cert = ca_detail.issue(
ca = ca,
child = child,
subject_key = self.cert.getPublicKey(),
sia = sia,
resources = resources,
- child_cert = child_cert)
-
- if must_revoke:
- for cert in child.child_certs(ca_detail = ca_detail, ski = self.ski):
- if cert is not child_cert:
- cert.revoke()
-
- return child_cert
+ child_cert = child_cert,
+ callback = revoke if must_revoke else callback)
@classmethod
def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False):
diff --git a/rpkid/rpki/up_down.py b/rpkid/rpki/up_down.py
index 30085390..0134bbb8 100644
--- a/rpkid/rpki/up_down.py
+++ b/rpkid/rpki/up_down.py
@@ -62,7 +62,7 @@ class base_elt(object):
if value is not None:
lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(value)
- def serve_pdu(self, q_msg, r_msg, child):
+ def serve_pdu(self, q_msg, r_msg, child, callback):
"""Default PDU handler to catch unexpected types."""
raise rpki.exceptions.BadQuery, "Unexpected query type %s" % q_msg.type
@@ -185,37 +185,42 @@ class list_pdu(base_elt):
"""Generate (empty) payload of "list" PDU."""
return []
- def serve_pdu(self, q_msg, r_msg, child):
+ def serve_pdu(self, q_msg, r_msg, child, callback):
"""Serve one "list" PDU."""
- r_msg.payload = list_response_pdu()
-
- # This will require a callback when we go event-driven
- irdb_resources = self.gctx.irdb_query(child.self_id, child.child_id)
-
- for parent in child.parents():
- for ca in parent.cas():
- ca_detail = ca.fetch_active()
- if not ca_detail:
- continue
- resources = ca_detail.latest_ca_cert.get_3779resources().intersection(irdb_resources)
- if resources.empty():
- continue
- rc = class_elt()
- rc.class_name = str(ca.ca_id)
- rc.cert_url = multi_uri(ca_detail.ca_cert_uri)
- rc.from_resource_bag(resources)
- for child_cert in child.child_certs(ca_detail = ca_detail):
- c = certificate_elt()
- c.cert_url = multi_uri(child_cert.uri(ca))
- c.cert = child_cert.cert
- rc.certs.append(c)
- rc.issuer = ca_detail.latest_ca_cert
- r_msg.payload.classes.append(rc)
+
+ def handle(irdb_resources):
+
+ rpki.log.info("list_pdu.serve_pdu callback")
+
+ r_msg.payload = list_response_pdu()
+
+ for parent in child.parents():
+ for ca in parent.cas():
+ ca_detail = ca.fetch_active()
+ if not ca_detail:
+ continue
+ resources = ca_detail.latest_ca_cert.get_3779resources().intersection(irdb_resources)
+ if resources.empty():
+ continue
+ rc = class_elt()
+ rc.class_name = str(ca.ca_id)
+ rc.cert_url = multi_uri(ca_detail.ca_cert_uri)
+ rc.from_resource_bag(resources)
+ for child_cert in child.child_certs(ca_detail = ca_detail):
+ c = certificate_elt()
+ c.cert_url = multi_uri(child_cert.uri(ca))
+ c.cert = child_cert.cert
+ rc.certs.append(c)
+ rc.issuer = ca_detail.latest_ca_cert
+ r_msg.payload.classes.append(rc)
+ callback()
+
+ self.gctx.irdb_query(child.self_id, child.child_id, handle)
@classmethod
- def query(cls, parent):
+ def query(cls, parent, cb):
"""Send a "list" query to parent."""
- return parent.query_up_down(cls())
+ return parent.query_up_down(cls(), cb)
class class_response_syntax(base_elt):
"""Syntax for Up-Down protocol "list_response" and "issue_response" PDUs."""
@@ -265,7 +270,7 @@ class issue_pdu(base_elt):
elt.text = self.pkcs10.get_Base64()
return [elt]
- def serve_pdu(self, q_msg, r_msg, child):
+ def serve_pdu(self, q_msg, r_msg, child, callback):
"""Serve one issue request PDU."""
# Subsetting not yet implemented, this is the one place where we
@@ -285,46 +290,51 @@ class issue_pdu(base_elt):
# Check current cert, if any
- # This will require a callback when we go event-driven
- irdb_resources = self.gctx.irdb_query(child.self_id, child.child_id)
+ def got_resources(irdb_resources):
- resources = irdb_resources.intersection(ca_detail.latest_ca_cert.get_3779resources())
- req_key = self.pkcs10.getPublicKey()
- req_sia = self.pkcs10.get_SIA()
- child_cert = child.child_certs(ca_detail = ca_detail, ski = req_key.get_SKI(), unique = True)
+ resources = irdb_resources.intersection(ca_detail.latest_ca_cert.get_3779resources())
+ req_key = self.pkcs10.getPublicKey()
+ req_sia = self.pkcs10.get_SIA()
+ child_cert = child.child_certs(ca_detail = ca_detail, ski = req_key.get_SKI(), unique = True)
- # Generate new cert or regenerate old one if necessary
+ # Generate new cert or regenerate old one if necessary
- if child_cert is None:
- child_cert = ca_detail.issue(
- ca = ca,
- child = child,
- subject_key = req_key,
- sia = req_sia,
- resources = resources)
- else:
- child_cert = child_cert.reissue(
- ca_detail = ca_detail,
- sia = req_sia,
- resources = resources)
-
- # Save anything we modified and generate response
- self.gctx.sql.sweep()
- assert child_cert and child_cert.sql_in_db
- c = certificate_elt()
- c.cert_url = multi_uri(child_cert.uri(ca))
- c.cert = child_cert.cert
- rc = class_elt()
- rc.class_name = self.class_name
- rc.cert_url = multi_uri(ca_detail.ca_cert_uri)
- rc.from_resource_bag(resources)
- rc.certs.append(c)
- rc.issuer = ca_detail.latest_ca_cert
- r_msg.payload = issue_response_pdu()
- r_msg.payload.classes.append(rc)
+ def got_child_cert(child_cert):
+ # Save anything we modified and generate response
+ self.gctx.sql.sweep()
+ assert child_cert and child_cert.sql_in_db
+ c = certificate_elt()
+ c.cert_url = multi_uri(child_cert.uri(ca))
+ c.cert = child_cert.cert
+ rc = class_elt()
+ rc.class_name = self.class_name
+ rc.cert_url = multi_uri(ca_detail.ca_cert_uri)
+ rc.from_resource_bag(resources)
+ rc.certs.append(c)
+ rc.issuer = ca_detail.latest_ca_cert
+ r_msg.payload = issue_response_pdu()
+ r_msg.payload.classes.append(rc)
+ callback()
+
+ if child_cert is None:
+ ca_detail.issue(
+ ca = ca,
+ child = child,
+ subject_key = req_key,
+ sia = req_sia,
+ resources = resources,
+ callback = got_child_cert)
+ else:
+ child_cert.reissue(
+ ca_detail = ca_detail,
+ sia = req_sia,
+ resources = resources,
+ callback = got_child_cert)
+
+ self.gctx.irdb_query(child.self_id, child.child_id, got_resources)
@classmethod
- def query(cls, parent, ca, ca_detail):
+ def query(cls, parent, ca, ca_detail, callback):
"""Send an "issue" request to parent associated with ca."""
assert ca_detail is not None and ca_detail.state in ("pending", "active")
sia = ((rpki.oids.name2oid["id-ad-caRepository"], ("uri", ca.sia_uri)),
@@ -332,7 +342,7 @@ class issue_pdu(base_elt):
self = cls()
self.class_name = ca.parent_resource_class
self.pkcs10 = rpki.x509.PKCS10.create_ca(ca_detail.private_key_id, sia)
- return parent.query_up_down(self)
+ parent.query_up_down(self, callback)
class issue_response_pdu(class_response_syntax):
"""Up-Down protocol "issue_response" PDU."""
@@ -363,25 +373,34 @@ class revoke_pdu(revoke_syntax):
"""Convert g(SKI) encoding from PDU back to raw SKI."""
return base64.urlsafe_b64decode(self.ski + "=")
- def serve_pdu(self, q_msg, r_msg, child):
+ def serve_pdu(self, q_msg, r_msg, child, cb):
"""Serve one revoke request PDU."""
- for ca_detail in child.ca_from_class_name(self.class_name).ca_details():
- for child_cert in child.child_certs(ca_detail = ca_detail, ski = self.get_SKI()):
- child_cert.revoke()
- self.gctx.sql.sweep()
- r_msg.payload = revoke_response_pdu()
- r_msg.payload.class_name = self.class_name
- r_msg.payload.ski = self.ski
+
+ def loop1(iterator1, ca_detail):
+
+ def loop2(iterator2, child_cert):
+ child_cert.revoke(iterator2)
+
+ rpki.async.iterator(child.child_certs(ca_detail = ca_detail, ski = self.get_SKI()), loop2, iterator1)
+
+ def done():
+ self.gctx.sql.sweep()
+ r_msg.payload = revoke_response_pdu()
+ r_msg.payload.class_name = self.class_name
+ r_msg.payload.ski = self.ski
+ cb()
+
+ rpki.async.iterator(child.ca_from_class_name(self.class_name).ca_details(), loop1, done)
@classmethod
- def query(cls, ca_detail):
+ def query(cls, ca_detail, cb):
"""Send a "revoke" request to parent associated with ca_detail."""
ca = ca_detail.ca()
parent = ca.parent()
self = cls()
self.class_name = ca.parent_resource_class
self.ski = ca_detail.latest_ca_cert.gSKI()
- return parent.query_up_down(self)
+ return parent.query_up_down(self, cb)
class revoke_response_pdu(revoke_syntax):
"""Up-Down protocol "revoke_response" PDU."""
@@ -402,7 +421,8 @@ class error_response_pdu(base_elt):
1302 : "Revoke - no such key",
2001 : "Internal Server Error - Request not performed" }
- exceptions = {}
+ exceptions = {
+ rpki.exceptions.NoActiveCA : 1202 }
def __init__(self, exception = None):
"""Initialize an error_response PDU from an exception object."""
@@ -487,14 +507,18 @@ class message_pdu(base_elt):
"""Convert a message PDU to a string."""
lxml.etree.tostring(self.toXML(), pretty_print = True, encoding = "UTF-8")
- def serve_top_level(self, child):
+ def serve_top_level(self, child, callback):
"""Serve one message request PDU."""
+
r_msg = message_pdu()
r_msg.sender = self.recipient
r_msg.recipient = self.sender
- self.payload.serve_pdu(self, r_msg, child)
- r_msg.type = self.type2name[type(r_msg.payload)]
- return r_msg
+
+ def done():
+ r_msg.type = self.type2name[type(r_msg.payload)]
+ callback(r_msg)
+
+ self.payload.serve_pdu(self, r_msg, child, done)
def serve_error(self, exception):
"""Generate an error_response message PDU."""
diff --git a/rpkid/rpki/xml_utils.py b/rpkid/rpki/xml_utils.py
index eda8aa85..2f29b312 100644
--- a/rpkid/rpki/xml_utils.py
+++ b/rpkid/rpki/xml_utils.py
@@ -220,24 +220,30 @@ class data_elt(base_elt):
"""Overridable hook."""
pass
- def serve_pre_save_hook(self, q_pdu, r_pdu):
+ def serve_pre_save_hook(self, q_pdu, r_pdu, cb):
"""Overridable hook."""
- pass
+ cb()
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Overridable hook."""
- pass
+ cb()
- def serve_create(self, r_msg):
+ def serve_create(self, r_msg, cb):
"""Handle a create action."""
r_pdu = self.make_reply()
- self.serve_pre_save_hook(self, r_pdu)
- self.sql_store()
- setattr(r_pdu, self.sql_template.index, getattr(self, self.sql_template.index))
- self.serve_post_save_hook(self, r_pdu)
- r_msg.append(r_pdu)
- def serve_set(self, r_msg):
+ def one():
+ self.sql_store()
+ setattr(r_pdu, self.sql_template.index, getattr(self, self.sql_template.index))
+ self.serve_post_save_hook(self, r_pdu, two)
+
+ def two():
+ r_msg.append(r_pdu)
+ cb()
+
+ self.serve_pre_save_hook(self, r_pdu, one)
+
+ def serve_set(self, r_msg, cb):
"""Handle a set action."""
db_pdu = self.serve_fetch_one()
r_pdu = self.make_reply()
@@ -246,30 +252,39 @@ class data_elt(base_elt):
if v is not None:
setattr(db_pdu, a, v)
db_pdu.sql_mark_dirty()
- db_pdu.serve_pre_save_hook(self, r_pdu)
- db_pdu.sql_store()
- db_pdu.serve_post_save_hook(self, r_pdu)
- r_msg.append(r_pdu)
- def serve_get(self, r_msg):
+ def one():
+ db_pdu.sql_store()
+ db_pdu.serve_post_save_hook(self, r_pdu, two)
+
+ def two():
+ r_msg.append(r_pdu)
+ cb()
+
+ db_pdu.serve_pre_save_hook(self, r_pdu, one)
+
+ def serve_get(self, r_msg, cb):
"""Handle a get action."""
r_pdu = self.serve_fetch_one()
self.make_reply(r_pdu)
r_msg.append(r_pdu)
+ cb()
- def serve_list(self, r_msg):
+ def serve_list(self, r_msg, cb):
"""Handle a list action for non-self objects."""
for r_pdu in self.serve_fetch_all():
self.make_reply(r_pdu)
r_msg.append(r_pdu)
+ cb()
- def serve_destroy(self, r_msg):
+ def serve_destroy(self, r_msg, cb):
"""Handle a destroy action."""
db_pdu = self.serve_fetch_one()
db_pdu.sql_delete()
r_msg.append(self.make_reply())
+ cb()
- def serve_dispatch(self, r_msg):
+ def serve_dispatch(self, r_msg, cb):
"""Action dispatch handler."""
dispatch = { "create" : self.serve_create,
"set" : self.serve_set,
@@ -278,7 +293,7 @@ class data_elt(base_elt):
"destroy" : self.serve_destroy }
if self.action not in dispatch:
raise rpki.exceptions.BadQuery, "Unexpected query: action %s" % self.action
- dispatch[self.action](r_msg)
+ dispatch[self.action](r_msg, cb)
def unimplemented_control(self, *controls):
"""Uniform handling for unimplemented control operations."""
diff --git a/rpkid/testbed.py b/rpkid/testbed.py
index 395c8155..816e09ba 100644
--- a/rpkid/testbed.py
+++ b/rpkid/testbed.py
@@ -385,7 +385,7 @@ class allocation_db(list):
def apply_delta(self, delta, cb):
"""Apply a delta or run a command."""
- def once(iterator, d):
+ def each(iterator, d):
if isinstance(d, str):
c = d.split()
cmds[c[0]](*c[1:])
@@ -400,7 +400,7 @@ class allocation_db(list):
if delta is None:
cb()
else:
- rpki.async.iterator(delta, once, done)
+ rpki.async.iterator(delta, each, done)
def dump(self):
"""Print content of the database."""
@@ -457,13 +457,13 @@ class allocation(object):
rpki.log.info("Applying delta: %s" % yaml)
- def once(iterator, kv):
+ def each(iterator, kv):
if kv[0] == "name":
iterator()
else:
getattr(self, "apply_" + kv[0])(kv[1], iterator)
- rpki.async.iterator(yaml.items(), once, cb)
+ rpki.async.iterator(yaml.items(), each, cb)
def apply_add_as(self, text, cb):
self.base.asn = self.base.asn.union(rpki.resource_set.resource_set_as(text))
@@ -803,7 +803,7 @@ class allocation(object):
sql_db = MySQLdb.connect(user = "irdb", db = self.irdb_db_name, passwd = irdb_db_pass)
sql_cur = sql_db.cursor()
- def once(iterator, kid):
+ def each(iterator, kid):
if kid.is_leaf():
bpki_cert = self.cross_certify(kid.name + "-TA")
@@ -824,7 +824,7 @@ class allocation(object):
sql_db.close()
do_route_origins()
- rpki.async.iterator(self.kids, once, done)
+ rpki.async.iterator(self.kids, each, done)
def do_route_origins():
rpki.log.info("Creating rpkid route_origin objects for %s" % self.name)
diff --git a/rpkid/testpoke.py b/rpkid/testpoke.py
index 04517ce4..f50d4ecf 100644
--- a/rpkid/testpoke.py
+++ b/rpkid/testpoke.py
@@ -56,8 +56,9 @@ for o,a in opts:
if argv:
usage(1)
+rpki.log.init("testpoke")
+
if debug:
- rpki.log.init("testpoke")
rpki.log.set_trace(True)
f = open(yaml_file)