aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/rpki_engine.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-11-04 04:27:31 +0000
committerRob Austein <sra@hactrn.net>2009-11-04 04:27:31 +0000
commit6192e5fcba290e11e88597fa7b03dfcde28b89e2 (patch)
treeeb6ddb112f5a5185367a98e9619a9da4a461e73b /rpkid/rpki/rpki_engine.py
parent528e1bf712d82d8024c204a06c756cd577096b47 (diff)
Use batch-mode publication in rpkid. Fix FOREIGN KEY constraints.
svn path=/myrpki/myirbe.py; revision=2876
Diffstat (limited to 'rpkid/rpki/rpki_engine.py')
-rw-r--r--rpkid/rpki/rpki_engine.py499
1 files changed, 264 insertions, 235 deletions
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py
index a1c28f16..4beb19a3 100644
--- a/rpkid/rpki/rpki_engine.py
+++ b/rpkid/rpki/rpki_engine.py
@@ -378,39 +378,40 @@ class ca_obj(rpki.sql.sql_persistent):
def loop(iterator, ca_detail):
- ski = ca_detail.public_key.get_SKI()
+ rc_cert = cert_map.pop(ca_detail.public_key.get_SKI(), None)
+
+ if rc_cert is None:
- if ski not in cert_map:
rpki.log.warn("Certificate in database missing from list_response, class %r, SKI %s, maybe parent certificate went away?"
% (rc.class_name, ca_detail.public_key.gSKI()))
- ca_detail.delete(self, parent.repository(), iterator, eb, allow_failure = True)
- return
+ publisher = publication_queue()
+ ca_detail.delete(ca = ca_detail.ca(), publisher = publisher)
+ return publisher.call_pubd(iterator, eb)
+
+ else:
- def cleanup():
- del cert_map[ski]
- iterator()
-
- if ca_detail.state in ("pending", "active"):
- if ca_detail.state == "pending":
- current_resources = rpki.resource_set.resource_bag()
- else:
- current_resources = ca_detail.latest_ca_cert.get_3779resources()
- if (ca_detail.state == "pending" or
- sia_uri_changed or
- ca_detail.latest_ca_cert != cert_map[ski].cert or
- current_resources.undersized(rc_resources) or
- current_resources.oversized(rc_resources)):
- ca_detail.update(
- parent = parent,
- ca = self,
- rc = rc,
- sia_uri_changed = sia_uri_changed,
- old_resources = current_resources,
- callback = cleanup,
- errback = eb)
- return
-
- cleanup()
+ if ca_detail.state in ("pending", "active"):
+
+ if ca_detail.state == "pending":
+ current_resources = rpki.resource_set.resource_bag()
+ else:
+ current_resources = ca_detail.latest_ca_cert.get_3779resources()
+
+ if (ca_detail.state == "pending" or
+ sia_uri_changed or
+ ca_detail.latest_ca_cert != rc_cert.cert or
+ current_resources.undersized(rc_resources) or
+ current_resources.oversized(rc_resources)):
+ return ca_detail.update(
+ parent = parent,
+ ca = self,
+ rc = rc,
+ sia_uri_changed = sia_uri_changed,
+ old_resources = current_resources,
+ callback = iterator,
+ errback = eb)
+
+ iterator()
def done():
if cert_map:
@@ -424,7 +425,8 @@ class ca_obj(rpki.sql.sql_persistent):
for x in cert_map.itervalues():
rpki.log.debug("Parent thinks I have %r %s" % (x, x.cert.gSKI()))
for x in ca_details:
- rpki.log.debug("I think I have %r %s" % (x, x.latest_ca_cert.gSKI()))
+ if x.latest_ca_cert is not None:
+ rpki.log.debug("I think I have %r %s" % (x, x.latest_ca_cert.gSKI()))
if ca_details:
rpki.async.iterator(ca_details, loop, done)
@@ -469,7 +471,7 @@ class ca_obj(rpki.sql.sql_persistent):
CA, then finally delete this CA itself.
"""
- def fail(e):
+ def lose(e):
rpki.log.traceback()
rpki.log.warn("Could not delete CA %r, skipping: %s" % (self, e))
callback()
@@ -478,12 +480,10 @@ class ca_obj(rpki.sql.sql_persistent):
self.sql_delete()
callback()
- repository = parent.repository()
-
- def loop(iterator, ca_detail):
- ca_detail.delete(self, repository, iterator, fail)
-
- rpki.async.iterator(self.ca_details(), loop, done)
+ publisher = publication_queue()
+ for ca_detail in self.ca_details():
+ ca_detail.delete(ca = self, publisher = publisher)
+ publisher.call_pubd(done, lose)
def next_serial_number(self):
"""
@@ -542,7 +542,7 @@ class ca_obj(rpki.sql.sql_persistent):
rpki.log.trace()
def loop(iterator, ca_detail):
- ca_detail.revoke(iterator, eb)
+ ca_detail.revoke(cb = iterator, eb = eb)
rpki.async.iterator(self.fetch_deprecated(), loop, cb)
@@ -562,10 +562,16 @@ class ca_detail_obj(rpki.sql.sql_persistent):
("latest_manifest_cert", rpki.x509.X509),
("latest_manifest", rpki.x509.SignedManifest),
("latest_crl", rpki.x509.CRL),
+ ("crl_published", rpki.sundial.datetime),
+ ("manifest_published", rpki.sundial.datetime),
"state",
"ca_cert_uri",
"ca_id")
+ crl_published = None
+ manifest_published = None
+ latest_ca_cert = None
+
def sql_decode(self, vals):
"""
Extra assertions for SQL decode of a ca_detail_obj.
@@ -607,35 +613,27 @@ class ca_detail_obj(rpki.sql.sql_persistent):
Activate this ca_detail.
"""
+ publisher = publication_queue()
+
self.latest_ca_cert = cert
self.ca_cert_uri = uri.rsync()
self.generate_manifest_cert(ca)
+ self.state = "active"
+ self.generate_crl(publisher = publisher)
+ self.generate_manifest(publisher = publisher)
+ self.sql_mark_dirty()
- def did_crl():
- self.generate_manifest(callback = did_manifest, errback = errback)
-
- def did_manifest():
- self.state = "active"
- self.sql_mark_dirty()
- if predecessor is None:
- callback()
- else:
- predecessor.state = "deprecated"
- predecessor.sql_mark_dirty()
- rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs)
-
- def do_one_child_cert(iterator, child_cert):
- child_cert.reissue(self, iterator.ignore, errback)
-
- def done_child_certs():
- rpki.async.iterator(predecessor.roas(), do_one_roa, callback)
-
- def do_one_roa(iterator, roa):
- roa.regenerate(iterator, errback)
+ if predecessor is not None:
+ predecessor.state = "deprecated"
+ predecessor.sql_mark_dirty()
+ for child_cert in predecessor.child_certs():
+ child_cert.reissue(ca_detail = self, publisher = publisher)
+ for roa in predecessor.roas():
+ roa.regenerate(publisher = publisher)
- self.generate_crl(callback = did_crl, errback = errback)
+ publisher.call_pubd(callback, errback)
- def delete(self, ca, repository, cb, eb, allow_failure = False):
+ def delete(self, ca, publisher, allow_failure = False):
"""
Delete this ca_detail and all of the certs it issued.
@@ -643,28 +641,19 @@ class ca_detail_obj(rpki.sql.sql_persistent):
raise an exception.
"""
- def withdraw_one_child(iterator, child_cert):
- repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator, eb, allow_failure)
-
- def child_certs_done():
- rpki.async.iterator(self.roas(), revoke_one_roa, withdraw_manifest)
-
- def revoke_one_roa(iterator, roa):
- roa.revoke(iterator, eb, allow_failure = allow_failure)
-
- def withdraw_manifest():
- repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl, eb, allow_failure)
-
- def withdraw_crl():
- repository.withdraw(self.latest_crl, self.crl_uri(ca), done, eb, allow_failure)
-
- def done():
- for cert in self.child_certs() + self.revoked_certs():
- cert.sql_delete()
- self.sql_delete()
- cb()
-
- rpki.async.iterator(self.child_certs(), withdraw_one_child, child_certs_done)
+ repository = ca.parent().repository()
+ for child_cert in self.child_certs():
+ publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri(ca), obj = child_cert.cert, repository = repository,
+ handler = False if allow_failure else None)
+ for roa in self.roas():
+ roa.revoke(publisher = publisher, allow_failure = allow_failure)
+ publisher.withdraw(cls = rpki.publication.manifest_elt, uri = self.manifest_uri(ca), obj = self.latest_manifest, repository = repository,
+ handler = False if allow_failure else None)
+ publisher.withdraw(cls = rpki.publication.crl_elt, uri = self.crl_uri(ca), obj = self.latest_crl, repository = repository,
+ handler = False if allow_failure else None)
+ for cert in self.child_certs() + self.revoked_certs():
+ cert.sql_delete()
+ self.sql_delete()
def revoke(self, cb, eb):
"""
@@ -689,13 +678,14 @@ class ca_detail_obj(rpki.sql.sql_persistent):
time has passed.
"""
+ ca = self.ca()
+ parent = ca.parent()
+
def parent_revoked(r_msg):
if r_msg.payload.ski != self.latest_ca_cert.gSKI():
raise rpki.exceptions.SKIMismatch
- ca = self.ca()
- parent = ca.parent()
crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval)
self.nextUpdate = rpki.sundial.now()
@@ -706,29 +696,24 @@ class ca_detail_obj(rpki.sql.sql_persistent):
if self.latest_crl is not None:
self.nextUpdate = self.nextUpdate.later(self.latest_crl.getNextUpdate())
- def revoke_one_child(iterator, child_cert):
- self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter())
- child_cert.revoke(iterator, eb)
-
- def final_crl():
- self.nextUpdate += crl_interval
- self.generate_crl(callback = final_manifest, errback = eb, nextUpdate = self.nextUpdate)
-
- def final_manifest():
- self.generate_manifest(callback = done, errback = eb, nextUpdate = self.nextUpdate)
-
- def done():
- self.private_key_id = None
- self.manifest_private_key_id = None
- self.manifest_public_key = None
- self.latest_manifest_cert = None
- self.state = "revoked"
- self.sql_mark_dirty()
- cb()
+ publisher = publication_queue()
- rpki.async.iterator(self.child_certs(), revoke_one_child, final_crl)
+ for child_cert in self.child_certs():
+ self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter())
+ child_cert.revoke(publisher = publisher)
+
+ self.nextUpdate += crl_interval
+ self.generate_crl(publisher = publisher, nextUpdate = self.nextUpdate)
+ self.generate_manifest(publisher = publisher, nextUpdate = self.nextUpdate)
+ self.private_key_id = None
+ self.manifest_private_key_id = None
+ self.manifest_public_key = None
+ self.latest_manifest_cert = None
+ self.state = "revoked"
+ self.sql_mark_dirty()
+ publisher.call_pubd(cb, eb)
- rpki.up_down.revoke_pdu.query(self.ca(), self.latest_ca_cert.gSKI(), parent_revoked, eb)
+ rpki.up_down.revoke_pdu.query(ca, self.latest_ca_cert.gSKI(), parent_revoked, eb)
def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback, errback):
"""
@@ -739,22 +724,18 @@ class ca_detail_obj(rpki.sql.sql_persistent):
def issued(issue_response):
self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert
new_resources = self.latest_ca_cert.get_3779resources()
-
- def loop(iterator, child_cert):
- child_resources = child_cert.cert.get_3779resources()
- if sia_uri_changed or child_resources.oversized(new_resources):
- child_cert.reissue(
- ca_detail = self,
- resources = child_resources.intersection(new_resources),
- callback = iterator.ignore,
- errback = errback)
- else:
- iterator()
+ publisher = publication_queue()
if sia_uri_changed or old_resources.oversized(new_resources):
- rpki.async.iterator(self.child_certs(), loop, callback)
- else:
- callback()
+ for child_cert in self.child_certs():
+ child_resources = child_cert.cert.get_3779resources()
+ if sia_uri_changed or child_resources.oversized(new_resources):
+ child_cert.reissue(
+ ca_detail = self,
+ resources = child_resources.intersection(new_resources),
+ publisher = publisher)
+
+ publisher.call_pubd(callback, errback)
rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback)
@@ -806,7 +787,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
self.latest_manifest_cert = self.issue_ee(ca, resources, self.manifest_public_key)
- def issue(self, ca, child, subject_key, sia, resources, callback, errback, child_cert = None):
+ def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None):
"""
Issue a new certificate to a child. Optional child_cert argument
specifies an existing child_cert object to update in place; if not
@@ -839,18 +820,14 @@ class ca_detail_obj(rpki.sql.sql_persistent):
rpki.log.debug("Reusing existing child_cert %r" % child_cert)
child_cert.ski = cert.get_SKI()
-
+ child_cert.published = rpki.sundial.now()
child_cert.sql_store()
+ publisher.publish(cls = rpki.publication.certificate_elt, uri = child_cert.uri(ca), obj = child_cert.cert, repository = ca.parent().repository(),
+ handler = child_cert.published_callback)
+ self.generate_manifest(publisher = publisher)
+ return child_cert
- def published():
- self.generate_manifest(done, errback)
-
- def done():
- callback(child_cert)
-
- ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published, errback)
-
- def generate_crl(self, callback, errback, nextUpdate = None):
+ def generate_crl(self, publisher, nextUpdate = None):
"""
Generate a new CRL for this ca_detail. At the moment this is
unconditional, that is, it is up to the caller to decide whether a
@@ -859,7 +836,6 @@ class ca_detail_obj(rpki.sql.sql_persistent):
ca = self.ca()
parent = ca.parent()
- repository = parent.repository()
crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval)
now = rpki.sundial.now()
@@ -882,16 +858,26 @@ class ca_detail_obj(rpki.sql.sql_persistent):
nextUpdate = nextUpdate,
revokedCertificates = certlist)
- repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback, errback = errback)
+ self.crl_published = rpki.sundial.now()
+ self.sql_mark_dirty()
+ publisher.publish(cls = rpki.publication.crl_elt, uri = self.crl_uri(ca), obj = self.latest_crl, repository = parent.repository(),
+ handler = self.crl_published_callback)
+
+ def crl_published_callback(self, pdu):
+ """
+ Check result of CRL publication.
+ """
+ pdu.raise_if_error()
+ self.crl_published = None
+ self.sql_mark_dirty()
- def generate_manifest(self, callback, errback, nextUpdate = None):
+ def generate_manifest(self, publisher, nextUpdate = None):
"""
Generate a new manifest for this ca_detail.
"""
ca = self.ca()
parent = ca.parent()
- repository = parent.repository()
crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval)
now = rpki.sundial.now()
@@ -913,7 +899,20 @@ class ca_detail_obj(rpki.sql.sql_persistent):
keypair = self.manifest_private_key_id,
certs = self.latest_manifest_cert)
- repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback, errback = errback)
+
+ self.manifest_published = rpki.sundial.now()
+ self.sql_mark_dirty()
+ publisher.publish(cls = rpki.publication.manifest_elt, uri = self.manifest_uri(ca), obj = self.latest_manifest, repository = parent.repository(),
+ handler = self.manifest_published_callback)
+
+ def manifest_published_callback(self, pdu):
+ """
+ Check result of manifest publication.
+ """
+ pdu.raise_if_error()
+ self.manifest_published = None
+ self.sql_mark_dirty()
+
class child_cert_obj(rpki.sql.sql_persistent):
"""
@@ -926,7 +925,8 @@ class child_cert_obj(rpki.sql.sql_persistent):
("cert", rpki.x509.X509),
"child_id",
"ca_detail_id",
- "ski")
+ "ski",
+ ("published", rpki.sundial.datetime))
def __init__(self, gctx = None, child_id = None, ca_detail_id = None, cert = None):
"""
@@ -937,6 +937,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
self.child_id = child_id
self.ca_detail_id = ca_detail_id
self.cert = cert
+ self.published = None
if child_id or ca_detail_id or cert:
self.sql_mark_dirty()
@@ -956,35 +957,26 @@ class child_cert_obj(rpki.sql.sql_persistent):
"""Return the publication URI for this child_cert."""
return ca.sia_uri + self.uri_tail()
- def revoke(self, callback, errback, withdraw = True):
+ def revoke(self, publisher):
"""
Revoke a child cert.
"""
-
- rpki.log.debug("Revoking %r" % self)
ca_detail = self.ca_detail()
ca = ca_detail.ca()
+ rpki.log.debug("Revoking %r %r" % (self, self.uri(ca)))
revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail)
+ publisher.withdraw(cls = rpki.publication.certificate_elt, uri = self.uri(ca), obj = self.cert, repository = ca.parent().repository())
+ self.gctx.sql.sweep()
+ self.sql_delete()
- def done():
- self.gctx.sql.sweep()
- self.sql_delete()
- callback()
-
- if withdraw:
- ca.parent().repository().withdraw(self.cert, self.uri(ca), done, errback)
- else:
- rpki.log.info("Suppressing withdrawal of %r" % self.cert)
- done()
-
- def reissue(self, ca_detail, callback, errback, resources = None, sia = None):
+ def reissue(self, ca_detail, publisher, resources = None, sia = None):
"""
- Reissue an existing cert, reusing the public key. If the cert we
- would generate is identical to the one we already have, we just
- return the one we already have. If we have to revoke the old
- certificate when generating the new one, we have to generate a new
- child_cert_obj, so calling code that needs the updated
- child_cert_obj must use the return value from this method.
+ Reissue an existing child cert, reusing the public key. If the
+ child cert we would generate is identical to the one we already
+ have, we just return the one we already have. If we have to
+ revoke the old child cert when generating the new one, we have to
+ generate a new child_cert_obj, so calling code that needs the
+ updated child_cert_obj must use the return value from this method.
"""
ca = ca_detail.ca()
@@ -1003,7 +995,7 @@ class child_cert_obj(rpki.sql.sql_persistent):
assert resources.valid_until is not None and old_resources.valid_until is not None
if resources == old_resources and sia == old_sia and ca_detail == old_ca_detail:
- return callback(self)
+ return self
must_revoke = old_resources.oversized(resources) or old_resources.valid_until > resources.valid_until
new_issuer = ca_detail != old_ca_detail
@@ -1013,37 +1005,24 @@ class child_cert_obj(rpki.sql.sql_persistent):
if resources.valid_until != old_resources.valid_until:
rpki.log.debug("Validity changed: %s %s" % ( old_resources.valid_until, resources.valid_until))
- def revoke(child_cert):
-
- uri = child_cert.uri(ca)
- rpki.log.debug("New child_cert %r uri %s" % (child_cert, uri))
-
- def loop(iterator, x):
+ if must_revoke:
+ for x in child.child_certs(ca_detail = ca_detail, ski = self.ski):
rpki.log.debug("Revoking child_cert %r" % x)
- x.revoke(iterator, errback, withdraw = x.uri(ca) != uri)
-
- def manifest():
- ca_detail.generate_manifest(done, errback)
+ x.revoke(publisher = publisher)
+ ca_detail.generate_crl(publisher = publisher)
- def done():
- callback(child_cert)
-
- certs_to_revoke = [x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert]
-
- if certs_to_revoke:
- rpki.async.iterator(certs_to_revoke, loop, manifest)
- else:
- done()
-
- ca_detail.issue(
+ child_cert = ca_detail.issue(
ca = ca,
child = child,
subject_key = self.cert.getPublicKey(),
sia = sia,
resources = resources,
child_cert = None if must_revoke or new_issuer else self,
- callback = revoke if must_revoke else callback,
- errback = errback)
+ publisher = publisher)
+
+ rpki.log.debug("New child_cert %r uri %s" % (child_cert, child_cert.uri(ca)))
+
+ return child_cert
@classmethod
def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False):
@@ -1078,6 +1057,14 @@ class child_cert_obj(rpki.sql.sql_persistent):
else:
return cls.sql_fetch_where(gctx, where, args)
+ def published_callback(self, pdu):
+ """
+ Publication callback: check result and mark published.
+ """
+ pdu.raise_if_error()
+ self.published = None
+ self.sql_mark_dirty()
+
class revoked_cert_obj(rpki.sql.sql_persistent):
"""
Tombstone for a revoked certificate.
@@ -1130,11 +1117,13 @@ class roa_obj(rpki.sql.sql_persistent):
"self_id",
"asn",
("roa", rpki.x509.ROA),
- ("cert", rpki.x509.X509))
+ ("cert", rpki.x509.X509),
+ ("published", rpki.sundial.datetime))
ca_detail_id = None
cert = None
roa = None
+ published = None
def self(self):
"""
@@ -1189,51 +1178,43 @@ class roa_obj(rpki.sql.sql_persistent):
self.asn = asn
self.ipv4 = ipv4
self.ipv6 = ipv6
- #
- # You might think that we should call self.sql_mark_dirty() here,
- # and you'd be right, except that other code kind of assumes that
- # this object will not be saved to sql unless the .generate()
- # method suceeds. If we never get as far as .generate(), or if
- # .generate() fails, we want this roa_obj to softly and silently
- # vanish away.
- #
- # Perhaps I'll figure out a cleaner way to do this some day.
+
+ # Defer marking new ROA as dirty until .generate() has a chance to
+ # finish setup, otherwise we get SQL consistency errors.
#
#if self_id or asn or ipv4 or ipv6: self.sql_mark_dirty()
- def update(self, callback, errback):
+ def update(self, publisher):
"""
Bring this roa_obj's ROA up to date if necesssary.
"""
if self.roa is None:
- return self.generate(callback, errback)
+ return self.generate(publisher = publisher)
ca_detail = self.ca_detail()
if ca_detail is None or ca_detail.state != "active":
- return self.regenerate(callback, errback)
+ return self.regenerate(publisher = publisher)
regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin)
if rpki.sundial.now() + regen_margin > self.cert.getNotAfter():
- return self.regenerate(callback, errback)
+ return self.regenerate(publisher = publisher)
ca_resources = ca_detail.latest_ca_cert.get_3779resources()
ee_resources = self.cert.get_3779resources()
if ee_resources.oversized(ca_resources):
- return self.regenerate(callback, errback)
+ return self.regenerate(publisher = publisher)
v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4()
v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6()
if ee_resources.v4 != v4 or ee_resources.v6 != v6:
- return self.regenerate(callback, errback)
-
- callback()
+ return self.regenerate(publisher = publisher)
- def generate(self, callback, errback):
+ def generate(self, publisher):
"""
Generate a ROA.
@@ -1282,33 +1263,41 @@ class roa_obj(rpki.sql.sql_persistent):
raise rpki.exceptions.NoCoveringCertForROA, "generate() could not find a certificate covering %s %s" % (v4, v6)
ca = ca_detail.ca()
-
resources = rpki.resource_set.resource_bag(v4 = v4, v6 = v6)
-
keypair = rpki.x509.RSA.generate()
self.ca_detail_id = ca_detail.ca_detail_id
-
- self.cert = ca_detail.issue_ee(ca, resources, keypair.get_RSApublic(),
- sia = ((rpki.oids.name2oid["id-ad-signedObject"],
- ("uri", self.uri(keypair))),))
-
+ self.cert = ca_detail.issue_ee(
+ ca = ca,
+ resources = resources,
+ subject_key = keypair.get_RSApublic(),
+ sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri(keypair))),))
self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,))
-
+ self.published = rpki.sundial.now()
self.sql_store()
- def done():
- ca_detail.generate_manifest(callback, errback)
+ rpki.log.debug("Generating ROA %r" % self.uri())
+ publisher.publish(cls = rpki.publication.roa_elt, uri = self.uri(), obj = self.roa, repository = ca.parent().repository(), handler = self.published_callback)
+ ca_detail.generate_manifest(publisher = publisher)
- ca.parent().repository().publish(self.roa, self.uri(), done, errback)
+ def published_callback(self, pdu):
+ """
+ Check publication result.
+ """
+ pdu.raise_if_error()
+ self.published = None
+ self.sql_mark_dirty()
- def revoke(self, callback, errback, regenerate = False, allow_failure = False):
+ def revoke(self, publisher, regenerate = False, allow_failure = False):
"""
Withdraw ROA associated with this roa_obj.
In order to preserve make-before-break properties without
duplicating code, this method also handles generating a
replacement ROA when requested.
+
+ If allow_failure is set, failing to withdraw the ROA will not be
+ considered an error.
"""
ca_detail = self.ca_detail()
@@ -1319,35 +1308,26 @@ class roa_obj(rpki.sql.sql_persistent):
if ca_detail.state != 'active':
self.ca_detail_id = None
- def one():
- rpki.log.debug("Withdrawing ROA and revoking its EE cert")
- rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
- ca_detail.ca().parent().repository().withdraw(roa, uri, two, errback, allow_failure)
-
- def two():
- self.gctx.sql.sweep()
- ca_detail.generate_crl(three, errback)
-
- def three():
- ca_detail.generate_manifest(four, errback)
-
- def four():
- self.sql_delete()
- callback()
-
if regenerate:
- self.generate(one, errback)
- else:
- one()
+ self.generate(publisher = publisher)
+
+ rpki.log.debug("Withdrawing ROA %r and revoking its EE cert" % uri)
+ rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail)
+ publisher.withdraw(cls = rpki.publication.roa_elt, uri = uri, obj = roa, repository = ca_detail.ca().parent().repository(),
+ handler = False if allow_failure else None)
+ self.gctx.sql.sweep()
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+ self.sql_delete()
- def regenerate(self, callback, errback):
+ def regenerate(self, publisher):
"""
Reissue ROA associated with this roa_obj.
"""
if self.ca_detail() is None:
- self.generate(callback, errback)
+ self.generate(publisher = publisher)
else:
- self.revoke(callback, errback, regenerate = True)
+ self.revoke(publisher = publisher, regenerate = True)
def uri(self, key = None):
"""
@@ -1361,3 +1341,52 @@ class roa_obj(rpki.sql.sql_persistent):
roa_obj's ROA.
"""
return (key or self.cert).gSKI() + ".roa"
+
+
+class publication_queue(object):
+ """
+ Utility to simplify publication from within rpkid.
+
+ General idea here is to accumulate a collection of objects to be
+ published, in one or more repositories, each potentially with its
+ own completion callback. Eventually we want to publish everything
+ we've accumulated, at which point we need to iterate over the
+ collection and do repository.call_pubd() for each repository.
+ """
+
+ replace = True
+
+ def __init__(self):
+ self.repositories = {}
+ self.msgs = {}
+ self.handlers = {}
+ if self.replace:
+ self.uris = {}
+
+ def _add(self, cls, uri, obj, repository, handler, withdraw):
+ rid = id(repository)
+ if rid not in self.repositories:
+ self.repositories[rid] = repository
+ self.msgs[rid] = rpki.publication.msg.query()
+ if self.replace and uri in self.uris:
+ rpki.log.debug("Removing publication duplicate <%s %r %r>" % (self.uris[uri].action, self.uris[uri].uri, self.uris[uri].payload))
+ self.msgs[rid].remove(self.uris.pop(uri))
+ make_pdu = cls.make_withdraw if withdraw else cls.make_publish
+ pdu = make_pdu(uri = uri, obj = obj)
+ if handler is not None:
+ self.handlers[id(pdu)] = handler
+ pdu.tag = id(pdu)
+ self.msgs[rid].append(pdu)
+ if self.replace:
+ self.uris[uri] = pdu
+
+ def publish( self, cls, uri, obj, repository, handler = None):
+ return self._add(cls, uri, obj, repository, handler, False)
+
+ def withdraw(self, cls, uri, obj, repository, handler = None):
+ return self._add(cls, uri, obj, repository, handler, True)
+
+ def call_pubd(self, cb, eb):
+ def loop(iterator, rid):
+ self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
+ rpki.async.iterator(self.repositories, loop, cb)