aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpki/left_right.py96
-rw-r--r--rpki/rpkid.py11
2 files changed, 67 insertions, 40 deletions
diff --git a/rpki/left_right.py b/rpki/left_right.py
index a97d864f..46aafc6a 100644
--- a/rpki/left_right.py
+++ b/rpki/left_right.py
@@ -346,46 +346,68 @@ class self_elt(data_elt):
Handle a left-right publish_world_now action for this self.
"""
- # This probably needs find out what's already published and
- # compute the difference before attempting to make any changes.
- # Just blindly overwriting doesn't work anymore, and probably
- # never should have.
-
- # The current loop doesn't appear to do anything asynchronous, so
- # it could become a plain old for loop. Polling all the
- # repositories to find out what's published, however, will require
- # talking to pubd(s). This polling looks a lot like what we
- # already do with publication_queue, might be worth generalizing
- # since this probably is not the only place where we need recovery
- # logic. Which, hmm, suggests that we might need to refactor.
-
publisher = rpki.rpkid.publication_queue()
+ repositories = set()
+ objects = dict()
+
+ def list_handler(r_pdu, repository):
+ rpki.publication.raise_if_error(r_pdu)
+ assert r_pdu.tag == rpki.publication.tag_list
+ assert r_pdu.get("uri") not in objects
+ objects[r_pdu.get("uri")] = (r_pdu.get("hash"), repository)
def loop(iterator, parent):
- repo = parent.repository
- for ca in parent.cas:
- ca_detail = ca.active_ca_detail
- if ca_detail is not None:
- publisher.queue(
- uri = ca_detail.crl_uri, new_obj = ca_detail.latest_crl, repository = repo)
- publisher.queue(
- uri = ca_detail.manifest_uri, new_obj = ca_detail.latest_manifest, repository = repo)
- for c in ca_detail.child_certs:
- publisher.queue(
- uri = c.uri, new_obj = c.cert, repository = repo)
- for r in ca_detail.roas:
- if r.roa is not None:
- publisher.queue(
- uri = r.uri, new_obj = r.roa, repository = repo)
- for g in ca_detail.ghostbusters:
- publisher.queue(
- uri = g.uri, new_obj = g.ghostbuster, repository = repo)
- for c in ca_detail.ee_certificates:
- publisher.queue(
- uri = c.uri, new_obj = c.cert, repository = repo)
- iterator()
+ repository = parent.repository
+ if repository.peer_contact_uri in repositories:
+ return iterator()
+ repositories.add(repository.peer_contact_uri)
+ q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
+ type = "query", version = rpki.publication.version)
+ SubElement(q_msg, rpki.publication.tag_list, tag = "list")
+ def handler(r_pdu):
+ list_handler(r_pdu, repository)
+ repository.call_pubd(iterator, eb, q_msg,
+ handlers = dict(list = handler),
+ length_check = False)
+
+ def reconcile(uri, obj, repository):
+ h, r = objects.pop(uri, (None, None))
+ if h is not None:
+ assert r == repository
+ publisher.queue(uri = uri, new_obj = obj, old_hash = h, repository = repository)
def done():
+ for parent in self.parents:
+ repository = parent.repository
+ for ca in parent.cas:
+ ca_detail = ca.active_ca_detail
+ if ca_detail is not None:
+ reconcile(uri = ca_detail.crl_uri,
+ obj = ca_detail.latest_crl,
+ repository = repository)
+ reconcile(uri = ca_detail.manifest_uri,
+ obj = ca_detail.latest_manifest,
+ repository = repository)
+ for c in ca_detail.child_certs:
+ reconcile(uri = c.uri,
+ obj = c.cert,
+ repository = repository)
+ for r in ca_detail.roas:
+ if r.roa is not None:
+ reconcile(uri = r.uri,
+ obj = r.roa,
+ repository = repository)
+ for g in ca_detail.ghostbusters:
+ reconcile(uri = g.uri,
+ obj = g.ghostbuster,
+ repository = repository)
+ for c in ca_detail.ee_certificates:
+ reconcile(uri = c.uri,
+ obj = c.cert,
+ repository = repository)
+ for u in objects:
+ h, r = objects[h]
+ publisher.queue(uri = u, old_hash = h, repository = r)
publisher.call_pubd(cb, eb)
rpki.async.iterator(self.parents, loop, done)
@@ -589,7 +611,7 @@ class repository_elt(data_elt):
cb()
- def call_pubd(self, callback, errback, q_msg, handlers = {}):
+ def call_pubd(self, callback, errback, q_msg, handlers = {}, length_check = True):
"""
Send a message to publication daemon and return the response.
@@ -627,7 +649,7 @@ class repository_elt(data_elt):
if handler:
logger.debug("Calling pubd handler %r", handler)
handler(r_pdu)
- if len(q_msg) != len(r_msg):
+ if length_check and len(q_msg) != len(r_msg):
raise rpki.exceptions.BadPublicationReply("Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg))
callback()
except (rpki.async.ExitNow, SystemExit):
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index 25a6206d..64157663 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -2527,13 +2527,16 @@ class publication_queue(object):
if self.replace:
self.uris = {}
- def queue(self, uri, repository, handler = None, old_obj = None, new_obj = None):
+ def queue(self, uri, repository, handler = None,
+ old_obj = None, new_obj = None, old_hash = None):
- assert old_obj is not None or new_obj is not None
+ assert old_obj is not None or new_obj is not None or old_hash is not None
+ assert old_obj is None or old_hash is None
assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri))
assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri))
- logger.debug("Queuing publication action: uri %s, old %r, new %r", uri, old_obj, new_obj)
+ logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s",
+ uri, old_obj, new_obj, old_hash)
# id(repository) may need to change to repository.peer_contact_uri
# once we convert from our custom SQL cache to Django ORM.
@@ -2549,6 +2552,8 @@ class publication_queue(object):
old_pdu = self.uris.pop(uri)
self.msgs[rid].remove(old_pdu)
hash = old_pdu.get("hash")
+ elif old_hash is not None:
+ hash = old_hash
elif old_obj is None:
hash = None
else: