aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/left_right.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r--rpkid/rpki/left_right.py443
1 files changed, 242 insertions, 201 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 4769cf0e..363feddb 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -20,7 +20,7 @@ PERFORMANCE OF THIS SOFTWARE.
import base64, lxml.etree, time, traceback, os
import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils
import rpki.https, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa
-import rpki.publication
+import rpki.publication, rpki.async
# Enforce strict checking of XML "sender" field in up-down protocol
enforce_strict_up_down_xml_sender = False
@@ -105,26 +105,34 @@ class self_elt(data_elt):
"""Fetch all route_origin objects that link to this self object."""
return route_origin_elt.sql_fetch_where(self.gctx, "self_id = %s", (self.self_id,))
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for self_elt."""
rpki.log.trace()
if q_pdu.rekey:
- self.serve_rekey()
- if q_pdu.revoke:
- self.serve_revoke()
- self.unimplemented_control("reissue", "run_now", "publish_world_now")
+ self.serve_rekey(cb)
+ elif q_pdu.revoke:
+ self.serve_revoke(cb)
+ else:
+ self.unimplemented_control("reissue", "run_now", "publish_world_now")
+ cb()
- def serve_rekey(self):
+ def serve_rekey(self, cb):
"""Handle a left-right rekey action for this self."""
rpki.log.trace()
- for parent in self.parents():
- parent.serve_rekey()
- def serve_revoke(self):
+ def loop(iterator, parent):
+ parent.serve_rekey(iterator)
+
+ rpki.async.iterator(self.parents(), loop, cb)
+
+ def serve_revoke(self, cb):
"""Handle a left-right revoke action for this self."""
rpki.log.trace()
- for parent in self.parents():
- parent.serve_revoke()
+
+ def loop(iterator, parent):
+ parent.serve_revoke(iterator)
+
+ rpki.async.iterator(self.parents(), loop, cb)
def serve_fetch_one(self):
"""Find the self object upon which a get, set, or destroy action
@@ -142,29 +150,35 @@ class self_elt(data_elt):
"""
return self.sql_fetch_all(self.gctx)
- def client_poll(self, cb):
+ def client_poll(self, callback):
"""Run the regular client poll cycle with each of this self's parents in turn."""
rpki.log.trace()
- for parent in self.parents():
-
- # This will need a callback when we go event-driven
- r_msg = rpki.up_down.list_pdu.query(parent)
-
- ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas())
- for rc in r_msg.payload.classes:
- if rc.class_name in ca_map:
- ca = ca_map[rc.class_name]
- del ca_map[rc.class_name]
- ca.check_for_updates(parent, rc)
- else:
- rpki.rpki_engine.ca_obj.create(parent, rc)
- for ca in ca_map.values():
- ca.delete(parent) # CA not listed by parent
- self.gctx.sql.sweep()
+ def each(iterator, parent):
- cb()
+ def handle(r_msg):
+ ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas())
+
+ def loop(iterator, rc):
+ if rc.class_name in ca_map:
+ ca = ca_map[rc.class_name]
+ del ca_map[rc.class_name]
+ ca.check_for_updates(parent, rc, iterator)
+ else:
+ rpki.rpki_engine.ca_obj.create(parent, rc, iterator)
+
+ def done():
+ for ca in ca_map.values():
+ ca.delete(parent) # CA not listed by parent
+ self.gctx.sql.sweep()
+ iterator()
+
+ rpki.async.iterator(r_msg.payload.classes, loop, done)
+
+ rpki.up_down.list_pdu.query(parent, handle)
+
+ rpki.async.iterator(self.parents(), each, callback)
def update_children(self, cb):
"""Check for updated IRDB data for all of this self's children and
@@ -178,38 +192,48 @@ class self_elt(data_elt):
rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
- for child in self.children():
+ def loop1(iterator1, child):
+
+ def got_resources(irdb_resources):
+
+ def loop2(iterator2, child_cert):
+
+ ca_detail = child_cert.ca_detail()
+
+ if ca_detail.state == "active":
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = irdb_resources.intersection(old_resources)
+
+ if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now):
+ rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI())
+ return child_cert.reissue(
+ ca_detail = ca_detail,
+ resources = new_resources,
+ callback = iterator2)
+
+ if old_resources.valid_until < now:
+ rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
+ % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
+ ca = ca_detail.ca()
+ parent = ca.parent()
+ repository = parent.repository()
+ child_cert.sql_delete()
+ return ca_detail.generate_manifest(lambda *ignored: repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator2))
+
+ iterator2()
+
+ rpki.async.iterator(child_certs, loop2, iterator1)
+
child_certs = child.child_certs()
- if not child_certs:
- continue
-
- # This will require a callback when we go event-driven
- irdb_resources = self.gctx.irdb_query(child.self_id, child.child_id)
-
- for child_cert in child_certs:
- ca_detail = child_cert.ca_detail()
- if ca_detail.state != "active":
- continue
- old_resources = child_cert.cert.get_3779resources()
- new_resources = irdb_resources.intersection(old_resources)
- if old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now):
- rpki.log.debug("Need to reissue child certificate SKI %s" % child_cert.cert.gSKI())
- child_cert.reissue(
- ca_detail = ca_detail,
- resources = new_resources)
- elif old_resources.valid_until < now:
- rpki.log.debug("Child certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
- % (child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
- ca = ca_detail.ca()
- parent = ca.parent()
- repository = parent.repository()
- child_cert.sql_delete()
- ca_detail.generate_manifest()
- repository.withdraw(child_cert.cert, child_cert.uri(ca))
+ if child_certs:
+ self.gctx.irdb_query(child.self_id, child.child_id, got_resources)
+ else:
+ iterator1()
+
+ rpki.async.iterator(self.children(), loop1, cb)
- cb()
- def regenerate_crls_and_manifests(self):
+ def regenerate_crls_and_manifests(self, cb):
"""Generate new CRLs and manifests as necessary for all of this
self's CAs. Extracting nextUpdate from a manifest is hard at the
moment due to implementation silliness, so for now we generate a
@@ -223,22 +247,44 @@ class self_elt(data_elt):
rpki.log.trace()
now = rpki.sundial.now()
- for parent in self.parents():
+
+ def loop1(iterator1, parent):
repository = parent.repository()
- for ca in parent.cas():
- for ca_detail in ca.fetch_revoked():
- if now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.delete(ca, repository)
- ca_detail = ca.fetch_active()
- if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.generate_crl()
- ca_detail.generate_manifest()
-
- def update_roas(self):
+
+ def loop2(iterator2, ca):
+
+ def loop3(iterator3, ca_detail):
+ ca_detail.delete(ca, repository, iterator3)
+
+ def done3():
+
+ ca_detail = ca.fetch_active()
+
+ def do_crl():
+ ca_detail.generate_crl(do_manifest)
+
+ def do_manifest(*ignored):
+ ca_detail.generate_manifest(iterator2)
+
+ if ca_detail is not None and now > ca_detail.latest_crl.getNextUpdate():
+ do_crl()
+ else:
+ iterator2()
+
+ rpki.async.iterator([x for x in ca.fetch_revoked() if now > x.latest_crl.getNextUpdate()], loop3, done3)
+
+ rpki.async.iterator(parent.cas(), loop2, iterator1)
+
+ rpki.async.iterator(self.parents(), loop1, cb)
+
+
+ def update_roas(self, cb):
"""Generate or update ROAs for this self's route_origin objects."""
- for route_origin in self.route_origins():
- route_origin.update_roa()
+ def loop(iterator, route_origin):
+ route_origin.update_roa(iterator)
+
+ rpki.async.iterator(self.route_origins(), loop, cb)
class bsc_elt(data_elt):
@@ -272,7 +318,7 @@ class bsc_elt(data_elt):
"""Fetch all child objects that link to this BSC object."""
return child_elt.sql_fetch_where(self.gctx, "bsc_id = %s", (self.bsc_id,))
- def serve_pre_save_hook(self, q_pdu, r_pdu):
+ def serve_pre_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for bsc_elt -- handle key generation.
For now this only allows RSA with SHA-256.
"""
@@ -281,6 +327,7 @@ class bsc_elt(data_elt):
self.private_key_id = rpki.x509.RSA.generate(keylength = q_pdu.key_length or 2048)
self.pkcs10_request = rpki.x509.PKCS10.create(self.private_key_id)
r_pdu.pkcs10_request = self.pkcs10_request
+ cb()
class parent_elt(data_elt):
"""<parent/> element."""
@@ -309,37 +356,34 @@ class parent_elt(data_elt):
"""Fetch all CA objects that link to this parent object."""
return rpki.rpki_engine.ca_obj.sql_fetch_where(self.gctx, "parent_id = %s", (self.parent_id,))
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for parent_elt."""
if q_pdu.rekey:
- self.serve_rekey()
- if q_pdu.revoke:
- self.serve_revoke()
- self.unimplemented_control("reissue")
+ self.serve_rekey(cb)
+ elif q_pdu.revoke:
+ self.serve_revoke(cb)
+ else:
+ self.unimplemented_control("reissue")
+ cb()
- def serve_rekey(self):
+ def serve_rekey(self, cb):
"""Handle a left-right rekey action for this parent."""
- for ca in self.cas():
- ca.rekey()
- def serve_revoke(self):
+ def loop(iterator, ca):
+ ca.rekey(iterator)
+
+ rpki.async.iterator(self.cas(), loop, cb)
+
+ def serve_revoke(self, cb):
"""Handle a left-right revoke action for this parent."""
- for ca in self.cas():
- ca.revoke()
- def query_up_down(self, q_pdu):
- """Client code for sending one up-down query PDU to this parent.
+ def loop(iterator, ca):
+ ca.revoke(iterator)
- I haven't figured out yet whether this method should do something
- clever like dispatching via a method in the response PDU payload,
- or just hand back the whole response to the caller. In the long
- run this will have to become event driven with a context object
- that has methods of its own, but as this method is common code for
- several different queries and I don't yet know what the response
- processing looks like, it's too soon to tell what will make sense.
+ rpki.async.iterator(self.cas(), loop, cb)
- For now, keep this dead simple lock step, rewrite it later.
- """
+ def query_up_down(self, q_pdu, cb):
+ """Client code for sending one up-down query PDU to this parent."""
rpki.log.trace()
@@ -356,21 +400,21 @@ class parent_elt(data_elt):
bsc.signing_cert,
bsc.signing_cert_crl)
- der = rpki.https.client(server_ta = (self.gctx.bpki_ta,
- self.self().bpki_cert, self.self().bpki_glue,
- self.bpki_https_cert, self.bpki_https_glue),
- client_key = bsc.private_key_id,
- client_cert = bsc.signing_cert,
- msg = q_cms,
- url = self.peer_contact_uri)
-
- r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta,
- self.self().bpki_cert, self.self().bpki_glue,
- self.bpki_cms_cert, self.bpki_cms_glue))
-
- r_msg.payload.check_response()
- return r_msg
-
+ def unwrap(der):
+ r_msg = rpki.up_down.cms_msg.unwrap(der, (self.gctx.bpki_ta,
+ self.self().bpki_cert, self.self().bpki_glue,
+ self.bpki_cms_cert, self.bpki_cms_glue))
+ r_msg.payload.check_response()
+ cb(r_msg)
+
+ rpki.https.client(server_ta = (self.gctx.bpki_ta,
+ self.self().bpki_cert, self.self().bpki_glue,
+ self.bpki_https_cert, self.bpki_https_glue),
+ client_key = bsc.private_key_id,
+ client_cert = bsc.signing_cert,
+ msg = q_cms,
+ url = self.peer_contact_uri,
+ callback = unwrap)
class child_elt(data_elt):
"""<child/> element."""
@@ -408,12 +452,13 @@ class child_elt(data_elt):
raise rpki.exceptions.ClassNameMismatch, "Class name mismatch: child.self_id = %d, parent.self_id = %d" % (self.self_id, parent.self_id)
return ca
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for child_elt."""
self.unimplemented_control("reissue")
if self.clear_https_ta_cache:
self.gctx.clear_https_ta_cache()
self.clear_https_ta_cache = False
+ cb()
def endElement(self, stack, name, text):
"""Handle subelements of <child/> element. These require special
@@ -424,7 +469,7 @@ class child_elt(data_elt):
if name in self.elements:
self.clear_https_ta_cache = True
- def serve_up_down(self, query):
+ def serve_up_down(self, query, callback):
"""Outer layer of server handling for one up-down PDU from this child."""
rpki.log.trace()
@@ -438,19 +483,24 @@ class child_elt(data_elt):
q_msg.payload.gctx = self.gctx
if enforce_strict_up_down_xml_sender and q_msg.sender != str(self.child_id):
raise rpki.exceptions.BadSender, "Unexpected XML sender %s" % q_msg.sender
+
+ def done(r_msg):
+ #
+ # Exceptions from this point on are problematic, as we have no
+ # sane way of reporting errors in the error reporting mechanism.
+ # May require refactoring, ignore the issue for now.
+ #
+ r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id,
+ bsc.signing_cert, bsc.signing_cert_crl)
+ callback(r_cms)
+
try:
- r_msg = q_msg.serve_top_level(self)
+ q_msg.serve_top_level(self, done)
+ except rpki.exceptions.NoActiveCA, data:
+ done(q_msg.serve_error(data))
except Exception, data:
rpki.log.error(traceback.format_exc())
- r_msg = q_msg.serve_error(data)
- #
- # Exceptions from this point on are problematic, as we have no
- # sane way of reporting errors in the error reporting mechanism.
- # May require refactoring, ignore the issue for now.
- #
- r_cms = rpki.up_down.cms_msg.wrap(r_msg, bsc.private_key_id,
- bsc.signing_cert, bsc.signing_cert_crl)
- return r_cms
+ done(q_msg.serve_error(data))
class repository_elt(data_elt):
"""<repository/> element."""
@@ -468,41 +518,11 @@ class repository_elt(data_elt):
bpki_https_cert = None
bpki_https_glue = None
- use_pubd = True
-
def parents(self):
"""Fetch all parent objects that link to this repository object."""
return parent_elt.sql_fetch_where(self.gctx, "repository_id = %s", (self.repository_id,))
- @staticmethod
- def uri_to_filename(base, uri):
- """Convert a URI to a filename. [TEMPORARY]"""
- if not uri.startswith("rsync://"):
- raise rpki.exceptions.BadURISyntax
- filename = base + uri[len("rsync://"):]
- if filename.find("//") >= 0 or filename.find("/../") >= 0 or filename.endswith("/.."):
- raise rpki.exceptions.BadURISyntax
- return filename
-
- @classmethod
- def object_write(cls, base, uri, obj):
- """Write an object to disk. [TEMPORARY]"""
- rpki.log.trace()
- filename = cls.uri_to_filename(base, uri)
- dirname = os.path.dirname(filename)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
- f = open(filename, "wb")
- f.write(obj.get_DER())
- f.close()
-
- @classmethod
- def object_delete(cls, base, uri):
- """Delete an object from disk. [TEMPORARY]"""
- rpki.log.trace()
- os.remove(cls.uri_to_filename(base, uri))
-
- def call_pubd(self, *pdus):
+ def call_pubd(self, callback, *pdus):
"""Send a message to publication daemon and return the response."""
rpki.log.trace()
bsc = self.bsc()
@@ -510,33 +530,31 @@ class repository_elt(data_elt):
q_msg.type = "query"
q_cms = rpki.publication.cms_msg.wrap(q_msg, bsc.private_key_id, bsc.signing_cert, bsc.signing_cert_crl)
bpki_ta_path = (self.gctx.bpki_ta, self.self().bpki_cert, self.self().bpki_glue, self.bpki_https_cert, self.bpki_https_glue)
- r_cms = rpki.https.client(
+
+ def done(r_cms):
+ r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path)
+ assert len(r_msg) == 1
+ callback(r_msg[0])
+
+ rpki.https.client(
client_key = bsc.private_key_id,
client_cert = bsc.signing_cert,
server_ta = bpki_ta_path,
url = self.peer_contact_uri,
- msg = q_cms)
- r_msg = rpki.publication.cms_msg.unwrap(r_cms, bpki_ta_path)
- assert len(r_msg) == 1
- return r_msg[0]
+ msg = q_cms,
+ callback = done)
- def publish(self, obj, uri):
- """Placeholder for publication operation. [TEMPORARY]"""
+ def publish(self, obj, uri, callback):
+ """Publish one object in the repository."""
rpki.log.trace()
rpki.log.info("Publishing %s as %s" % (repr(obj), repr(uri)))
- if self.use_pubd:
- self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj))
- else:
- self.object_write(self.gctx.publication_kludge_base, uri, obj)
+ self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "publish", uri = uri, payload = obj))
- def withdraw(self, obj, uri):
- """Placeholder for publication withdrawal operation. [TEMPORARY]"""
+ def withdraw(self, obj, uri, callback):
+ """Withdraw one object from the repository."""
rpki.log.trace()
rpki.log.info("Withdrawing %s from at %s" % (repr(obj), repr(uri)))
- if self.use_pubd:
- self.call_pubd(rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri))
- else:
- self.object_delete(self.gctx.publication_kludge_base, uri)
+ self.call_pubd(callback, rpki.publication.obj2elt[type(obj)].make_pdu(action = "withdraw", uri = uri))
class route_origin_elt(data_elt):
"""<route_origin/> element."""
@@ -590,9 +608,10 @@ class route_origin_elt(data_elt):
"""Fetch all ca_detail objects that link to this route_origin object."""
return rpki.rpki_engine.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
- def serve_post_save_hook(self, q_pdu, r_pdu):
+ def serve_post_save_hook(self, q_pdu, r_pdu, cb):
"""Extra server actions for route_origin_elt."""
self.unimplemented_control("suppress_publication")
+ cb()
def startElement(self, stack, name, attrs):
"""Handle <route_origin/> element. This requires special
@@ -607,35 +626,37 @@ class route_origin_elt(data_elt):
if self.ipv6 is not None:
self.ipv6 = rpki.resource_set.roa_prefix_set_ipv6(self.ipv6)
- def update_roa(self):
+ def update_roa(self, callback):
"""Bring this route_origin's ROA up to date if necesssary."""
if self.roa is None:
- return self.generate_roa()
+ return self.generate_roa(callback)
ca_detail = self.ca_detail()
if ca_detail is None or ca_detail.state != "active":
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin)
if rpki.sundial.now() + regen_margin > self.cert.getNotAfter():
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
ca_resources = ca_detail.latest_ca_cert.get_3779resources()
ee_resources = self.cert.get_3779resources()
if ee_resources.oversized(ca_resources):
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4()
v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6()
if ee_resources.v4 != v4 or ee_resources.v6 != v6:
- return self.regenerate_roa()
+ return self.regenerate_roa(callback)
+
+ callback()
- def generate_roa(self):
+ def generate_roa(self, callback):
"""Generate a ROA based on this <route_origin/> object.
At present this does not support ROAs with multiple signatures
@@ -698,12 +719,17 @@ class route_origin_elt(data_elt):
self.sql_store()
repository = ca.parent().repository()
- repository.publish(self.roa, self.roa_uri(ca))
- if self.publish_ee_separately:
- repository.publish(self.cert, self.ee_uri(ca))
- ca_detail.generate_manifest()
- def withdraw_roa(self, regenerate = False):
+ def one():
+ repository.publish(self.cert, self.ee_uri(ca), two)
+
+ def two(*ignored):
+ ca_detail.generate_manifest(callback)
+
+ repository.publish(self.roa, self.roa_uri(ca),
+ one if self.publish_ee_separately else two)
+
+ def withdraw_roa(self, callback, regenerate = False):
"""Withdraw ROA associated with this route_origin.
In order to preserve make-before-break properties without
@@ -721,24 +747,34 @@ class route_origin_elt(data_elt):
if ca_detail.state != 'active':
self.ca_detail_id = None
+
+ def one(*ignored):
+ rpki.log.debug("Withdrawing ROA and revoking its EE cert")
+ rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
+ repository.withdraw(roa, roa_uri,
+ two if self.publish_ee_separately else three)
+
+ def two():
+ repository.withdraw(cert, ee_uri, three)
+
+ def three(*ignored):
+ self.gctx.sql.sweep()
+ ca_detail.generate_crl(four)
+
+ def four(*ignored):
+ ca_detail.generate_manifest(callback)
+
if regenerate:
- self.generate_roa()
-
- rpki.log.debug("Withdrawing ROA and revoking its EE cert")
- rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
- repository.withdraw(roa, roa_uri)
- if self.publish_ee_separately:
- repository.withdraw(cert, ee_uri)
- self.gctx.sql.sweep()
- ca_detail.generate_crl()
- ca_detail.generate_manifest()
-
- def regenerate_roa(self):
+ self.generate_roa(one)
+ else:
+ one()
+
+ def regenerate_roa(self, callback):
"""Reissue ROA associated with this route_origin."""
if self.ca_detail() is None:
- self.generate_roa()
+ self.generate_roa(callback)
else:
- self.withdraw_roa(regenerate = True)
+ self.withdraw_roa(callback, regenerate = True)
def roa_uri(self, ca, key = None):
"""Return the publication URI for this route_origin's ROA."""
@@ -814,14 +850,19 @@ class msg(rpki.xml_utils.msg, left_right_namespace):
for x in (self_elt, child_elt, parent_elt, bsc_elt, repository_elt,
route_origin_elt, list_resources_elt, report_error_elt))
- def serve_top_level(self, gctx):
+ def serve_top_level(self, gctx, cb):
"""Serve one msg PDU."""
r_msg = self.__class__()
r_msg.type = "reply"
- for q_pdu in self:
+
+ def loop(iterator, q_pdu):
q_pdu.gctx = gctx
- q_pdu.serve_dispatch(r_msg)
- return r_msg
+ q_pdu.serve_dispatch(r_msg, iterator)
+
+ def done():
+ cb(r_msg)
+
+ rpki.async.iterator(self, loop, done)
class sax_handler(rpki.xml_utils.sax_handler):
"""SAX handler for Left-Right protocol."""