diff options
author | Rob Austein <sra@hactrn.net> | 2009-11-04 04:27:31 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-11-04 04:27:31 +0000 |
commit | 6192e5fcba290e11e88597fa7b03dfcde28b89e2 (patch) | |
tree | eb6ddb112f5a5185367a98e9619a9da4a461e73b /rpkid/rpki/rpki_engine.py | |
parent | 528e1bf712d82d8024c204a06c756cd577096b47 (diff) |
Use batch-mode publication in rpkid. Fix FOREIGN KEY constraints.
svn path=/myrpki/myirbe.py; revision=2876
Diffstat (limited to 'rpkid/rpki/rpki_engine.py')
-rw-r--r-- | rpkid/rpki/rpki_engine.py | 499 |
1 files changed, 264 insertions, 235 deletions
diff --git a/rpkid/rpki/rpki_engine.py b/rpkid/rpki/rpki_engine.py index a1c28f16..4beb19a3 100644 --- a/rpkid/rpki/rpki_engine.py +++ b/rpkid/rpki/rpki_engine.py @@ -378,39 +378,40 @@ class ca_obj(rpki.sql.sql_persistent): def loop(iterator, ca_detail): - ski = ca_detail.public_key.get_SKI() + rc_cert = cert_map.pop(ca_detail.public_key.get_SKI(), None) + + if rc_cert is None: - if ski not in cert_map: rpki.log.warn("Certificate in database missing from list_response, class %r, SKI %s, maybe parent certificate went away?" % (rc.class_name, ca_detail.public_key.gSKI())) - ca_detail.delete(self, parent.repository(), iterator, eb, allow_failure = True) - return + publisher = publication_queue() + ca_detail.delete(ca = ca_detail.ca(), publisher = publisher) + return publisher.call_pubd(iterator, eb) + + else: - def cleanup(): - del cert_map[ski] - iterator() - - if ca_detail.state in ("pending", "active"): - if ca_detail.state == "pending": - current_resources = rpki.resource_set.resource_bag() - else: - current_resources = ca_detail.latest_ca_cert.get_3779resources() - if (ca_detail.state == "pending" or - sia_uri_changed or - ca_detail.latest_ca_cert != cert_map[ski].cert or - current_resources.undersized(rc_resources) or - current_resources.oversized(rc_resources)): - ca_detail.update( - parent = parent, - ca = self, - rc = rc, - sia_uri_changed = sia_uri_changed, - old_resources = current_resources, - callback = cleanup, - errback = eb) - return - - cleanup() + if ca_detail.state in ("pending", "active"): + + if ca_detail.state == "pending": + current_resources = rpki.resource_set.resource_bag() + else: + current_resources = ca_detail.latest_ca_cert.get_3779resources() + + if (ca_detail.state == "pending" or + sia_uri_changed or + ca_detail.latest_ca_cert != rc_cert.cert or + current_resources.undersized(rc_resources) or + current_resources.oversized(rc_resources)): + return ca_detail.update( + parent = parent, + ca = self, + rc = rc, + sia_uri_changed = sia_uri_changed, + old_resources = current_resources, + callback = iterator, + errback = eb) + + iterator() def done(): if cert_map: @@ -424,7 +425,8 @@ class ca_obj(rpki.sql.sql_persistent): for x in cert_map.itervalues(): rpki.log.debug("Parent thinks I have %r %s" % (x, x.cert.gSKI())) for x in ca_details: - rpki.log.debug("I think I have %r %s" % (x, x.latest_ca_cert.gSKI())) + if x.latest_ca_cert is not None: + rpki.log.debug("I think I have %r %s" % (x, x.latest_ca_cert.gSKI())) if ca_details: rpki.async.iterator(ca_details, loop, done) @@ -469,7 +471,7 @@ class ca_obj(rpki.sql.sql_persistent): CA, then finally delete this CA itself. """ - def fail(e): + def lose(e): rpki.log.traceback() rpki.log.warn("Could not delete CA %r, skipping: %s" % (self, e)) callback() @@ -478,12 +480,10 @@ class ca_obj(rpki.sql.sql_persistent): self.sql_delete() callback() - repository = parent.repository() - - def loop(iterator, ca_detail): - ca_detail.delete(self, repository, iterator, fail) - - rpki.async.iterator(self.ca_details(), loop, done) + publisher = publication_queue() + for ca_detail in self.ca_details(): + ca_detail.delete(ca = self, publisher = publisher) + publisher.call_pubd(done, lose) def next_serial_number(self): """ @@ -542,7 +542,7 @@ class ca_obj(rpki.sql.sql_persistent): rpki.log.trace() def loop(iterator, ca_detail): - ca_detail.revoke(iterator, eb) + ca_detail.revoke(cb = iterator, eb = eb) rpki.async.iterator(self.fetch_deprecated(), loop, cb) @@ -562,10 +562,16 @@ class ca_detail_obj(rpki.sql.sql_persistent): ("latest_manifest_cert", rpki.x509.X509), ("latest_manifest", rpki.x509.SignedManifest), ("latest_crl", rpki.x509.CRL), + ("crl_published", rpki.sundial.datetime), + ("manifest_published", rpki.sundial.datetime), "state", "ca_cert_uri", "ca_id") + crl_published = None + manifest_published = None + latest_ca_cert = None + def sql_decode(self, vals): """ Extra assertions for SQL decode of a ca_detail_obj. @@ -607,35 +613,27 @@ class ca_detail_obj(rpki.sql.sql_persistent): Activate this ca_detail. """ + publisher = publication_queue() + self.latest_ca_cert = cert self.ca_cert_uri = uri.rsync() self.generate_manifest_cert(ca) + self.state = "active" + self.generate_crl(publisher = publisher) + self.generate_manifest(publisher = publisher) + self.sql_mark_dirty() - def did_crl(): - self.generate_manifest(callback = did_manifest, errback = errback) - - def did_manifest(): - self.state = "active" - self.sql_mark_dirty() - if predecessor is None: - callback() - else: - predecessor.state = "deprecated" - predecessor.sql_mark_dirty() - rpki.async.iterator(predecessor.child_certs(), do_one_child_cert, done_child_certs) - - def do_one_child_cert(iterator, child_cert): - child_cert.reissue(self, iterator.ignore, errback) - - def done_child_certs(): - rpki.async.iterator(predecessor.roas(), do_one_roa, callback) - - def do_one_roa(iterator, roa): - roa.regenerate(iterator, errback) + if predecessor is not None: + predecessor.state = "deprecated" + predecessor.sql_mark_dirty() + for child_cert in predecessor.child_certs(): + child_cert.reissue(ca_detail = self, publisher = publisher) + for roa in predecessor.roas(): + roa.regenerate(publisher = publisher) - self.generate_crl(callback = did_crl, errback = errback) + publisher.call_pubd(callback, errback) - def delete(self, ca, repository, cb, eb, allow_failure = False): + def delete(self, ca, publisher, allow_failure = False): """ Delete this ca_detail and all of the certs it issued. @@ -643,28 +641,19 @@ class ca_detail_obj(rpki.sql.sql_persistent): raise an exception. """ - def withdraw_one_child(iterator, child_cert): - repository.withdraw(child_cert.cert, child_cert.uri(ca), iterator, eb, allow_failure) - - def child_certs_done(): - rpki.async.iterator(self.roas(), revoke_one_roa, withdraw_manifest) - - def revoke_one_roa(iterator, roa): - roa.revoke(iterator, eb, allow_failure = allow_failure) - - def withdraw_manifest(): - repository.withdraw(self.latest_manifest, self.manifest_uri(ca), withdraw_crl, eb, allow_failure) - - def withdraw_crl(): - repository.withdraw(self.latest_crl, self.crl_uri(ca), done, eb, allow_failure) - - def done(): - for cert in self.child_certs() + self.revoked_certs(): - cert.sql_delete() - self.sql_delete() - cb() - - rpki.async.iterator(self.child_certs(), withdraw_one_child, child_certs_done) + repository = ca.parent().repository() + for child_cert in self.child_certs(): + publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri(ca), obj = child_cert.cert, repository = repository, + handler = False if allow_failure else None) + for roa in self.roas(): + roa.revoke(publisher = publisher, allow_failure = allow_failure) + publisher.withdraw(cls = rpki.publication.manifest_elt, uri = self.manifest_uri(ca), obj = self.latest_manifest, repository = repository, + handler = False if allow_failure else None) + publisher.withdraw(cls = rpki.publication.crl_elt, uri = self.crl_uri(ca), obj = self.latest_crl, repository = repository, + handler = False if allow_failure else None) + for cert in self.child_certs() + self.revoked_certs(): + cert.sql_delete() + self.sql_delete() def revoke(self, cb, eb): """ @@ -689,13 +678,14 @@ class ca_detail_obj(rpki.sql.sql_persistent): time has passed. """ + ca = self.ca() + parent = ca.parent() + def parent_revoked(r_msg): if r_msg.payload.ski != self.latest_ca_cert.gSKI(): raise rpki.exceptions.SKIMismatch - ca = self.ca() - parent = ca.parent() crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval) self.nextUpdate = rpki.sundial.now() @@ -706,29 +696,24 @@ class ca_detail_obj(rpki.sql.sql_persistent): if self.latest_crl is not None: self.nextUpdate = self.nextUpdate.later(self.latest_crl.getNextUpdate()) - def revoke_one_child(iterator, child_cert): - self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter()) - child_cert.revoke(iterator, eb) - - def final_crl(): - self.nextUpdate += crl_interval - self.generate_crl(callback = final_manifest, errback = eb, nextUpdate = self.nextUpdate) - - def final_manifest(): - self.generate_manifest(callback = done, errback = eb, nextUpdate = self.nextUpdate) - - def done(): - self.private_key_id = None - self.manifest_private_key_id = None - self.manifest_public_key = None - self.latest_manifest_cert = None - self.state = "revoked" - self.sql_mark_dirty() - cb() + publisher = publication_queue() - rpki.async.iterator(self.child_certs(), revoke_one_child, final_crl) + for child_cert in self.child_certs(): + self.nextUpdate = self.nextUpdate.later(child_cert.cert.getNotAfter()) + child_cert.revoke(publisher = publisher) + + self.nextUpdate += crl_interval + self.generate_crl(publisher = publisher, nextUpdate = self.nextUpdate) + self.generate_manifest(publisher = publisher, nextUpdate = self.nextUpdate) + self.private_key_id = None + self.manifest_private_key_id = None + self.manifest_public_key = None + self.latest_manifest_cert = None + self.state = "revoked" + self.sql_mark_dirty() + publisher.call_pubd(cb, eb) - rpki.up_down.revoke_pdu.query(self.ca(), self.latest_ca_cert.gSKI(), parent_revoked, eb) + rpki.up_down.revoke_pdu.query(ca, self.latest_ca_cert.gSKI(), parent_revoked, eb) def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback, errback): """ @@ -739,22 +724,18 @@ class ca_detail_obj(rpki.sql.sql_persistent): def issued(issue_response): self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert new_resources = self.latest_ca_cert.get_3779resources() - - def loop(iterator, child_cert): - child_resources = child_cert.cert.get_3779resources() - if sia_uri_changed or child_resources.oversized(new_resources): - child_cert.reissue( - ca_detail = self, - resources = child_resources.intersection(new_resources), - callback = iterator.ignore, - errback = errback) - else: - iterator() + publisher = publication_queue() if sia_uri_changed or old_resources.oversized(new_resources): - rpki.async.iterator(self.child_certs(), loop, callback) - else: - callback() + for child_cert in self.child_certs(): + child_resources = child_cert.cert.get_3779resources() + if sia_uri_changed or child_resources.oversized(new_resources): + child_cert.reissue( + ca_detail = self, + resources = child_resources.intersection(new_resources), + publisher = publisher) + + publisher.call_pubd(callback, errback) rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback) @@ -806,7 +787,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.latest_manifest_cert = self.issue_ee(ca, resources, self.manifest_public_key) - def issue(self, ca, child, subject_key, sia, resources, callback, errback, child_cert = None): + def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None): """ Issue a new certificate to a child. Optional child_cert argument specifies an existing child_cert object to update in place; if not @@ -839,18 +820,14 @@ class ca_detail_obj(rpki.sql.sql_persistent): rpki.log.debug("Reusing existing child_cert %r" % child_cert) child_cert.ski = cert.get_SKI() - + child_cert.published = rpki.sundial.now() child_cert.sql_store() + publisher.publish(cls = rpki.publication.certificate_elt, uri = child_cert.uri(ca), obj = child_cert.cert, repository = ca.parent().repository(), + handler = child_cert.published_callback) + self.generate_manifest(publisher = publisher) + return child_cert - def published(): - self.generate_manifest(done, errback) - - def done(): - callback(child_cert) - - ca.parent().repository().publish(child_cert.cert, child_cert.uri(ca), published, errback) - - def generate_crl(self, callback, errback, nextUpdate = None): + def generate_crl(self, publisher, nextUpdate = None): """ Generate a new CRL for this ca_detail. At the moment this is unconditional, that is, it is up to the caller to decide whether a @@ -859,7 +836,6 @@ class ca_detail_obj(rpki.sql.sql_persistent): ca = self.ca() parent = ca.parent() - repository = parent.repository() crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval) now = rpki.sundial.now() @@ -882,16 +858,26 @@ class ca_detail_obj(rpki.sql.sql_persistent): nextUpdate = nextUpdate, revokedCertificates = certlist) - repository.publish(self.latest_crl, self.crl_uri(ca), callback = callback, errback = errback) + self.crl_published = rpki.sundial.now() + self.sql_mark_dirty() + publisher.publish(cls = rpki.publication.crl_elt, uri = self.crl_uri(ca), obj = self.latest_crl, repository = parent.repository(), + handler = self.crl_published_callback) + + def crl_published_callback(self, pdu): + """ + Check result of CRL publication. + """ + pdu.raise_if_error() + self.crl_published = None + self.sql_mark_dirty() - def generate_manifest(self, callback, errback, nextUpdate = None): + def generate_manifest(self, publisher, nextUpdate = None): """ Generate a new manifest for this ca_detail. """ ca = self.ca() parent = ca.parent() - repository = parent.repository() crl_interval = rpki.sundial.timedelta(seconds = parent.self().crl_interval) now = rpki.sundial.now() @@ -913,7 +899,20 @@ class ca_detail_obj(rpki.sql.sql_persistent): keypair = self.manifest_private_key_id, certs = self.latest_manifest_cert) - repository.publish(self.latest_manifest, self.manifest_uri(ca), callback = callback, errback = errback) + + self.manifest_published = rpki.sundial.now() + self.sql_mark_dirty() + publisher.publish(cls = rpki.publication.manifest_elt, uri = self.manifest_uri(ca), obj = self.latest_manifest, repository = parent.repository(), + handler = self.manifest_published_callback) + + def manifest_published_callback(self, pdu): + """ + Check result of manifest publication. + """ + pdu.raise_if_error() + self.manifest_published = None + self.sql_mark_dirty() + class child_cert_obj(rpki.sql.sql_persistent): """ @@ -926,7 +925,8 @@ class child_cert_obj(rpki.sql.sql_persistent): ("cert", rpki.x509.X509), "child_id", "ca_detail_id", - "ski") + "ski", + ("published", rpki.sundial.datetime)) def __init__(self, gctx = None, child_id = None, ca_detail_id = None, cert = None): """ @@ -937,6 +937,7 @@ class child_cert_obj(rpki.sql.sql_persistent): self.child_id = child_id self.ca_detail_id = ca_detail_id self.cert = cert + self.published = None if child_id or ca_detail_id or cert: self.sql_mark_dirty() @@ -956,35 +957,26 @@ class child_cert_obj(rpki.sql.sql_persistent): """Return the publication URI for this child_cert.""" return ca.sia_uri + self.uri_tail() - def revoke(self, callback, errback, withdraw = True): + def revoke(self, publisher): """ Revoke a child cert. """ - - rpki.log.debug("Revoking %r" % self) ca_detail = self.ca_detail() ca = ca_detail.ca() + rpki.log.debug("Revoking %r %r" % (self, self.uri(ca))) revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) + publisher.withdraw(cls = rpki.publication.certificate_elt, uri = self.uri(ca), obj = self.cert, repository = ca.parent().repository()) + self.gctx.sql.sweep() + self.sql_delete() - def done(): - self.gctx.sql.sweep() - self.sql_delete() - callback() - - if withdraw: - ca.parent().repository().withdraw(self.cert, self.uri(ca), done, errback) - else: - rpki.log.info("Suppressing withdrawal of %r" % self.cert) - done() - - def reissue(self, ca_detail, callback, errback, resources = None, sia = None): + def reissue(self, ca_detail, publisher, resources = None, sia = None): """ - Reissue an existing cert, reusing the public key. If the cert we - would generate is identical to the one we already have, we just - return the one we already have. If we have to revoke the old - certificate when generating the new one, we have to generate a new - child_cert_obj, so calling code that needs the updated - child_cert_obj must use the return value from this method. + Reissue an existing child cert, reusing the public key. If the + child cert we would generate is identical to the one we already + have, we just return the one we already have. If we have to + revoke the old child cert when generating the new one, we have to + generate a new child_cert_obj, so calling code that needs the + updated child_cert_obj must use the return value from this method. """ ca = ca_detail.ca() @@ -1003,7 +995,7 @@ class child_cert_obj(rpki.sql.sql_persistent): assert resources.valid_until is not None and old_resources.valid_until is not None if resources == old_resources and sia == old_sia and ca_detail == old_ca_detail: - return callback(self) + return self must_revoke = old_resources.oversized(resources) or old_resources.valid_until > resources.valid_until new_issuer = ca_detail != old_ca_detail @@ -1013,37 +1005,24 @@ class child_cert_obj(rpki.sql.sql_persistent): if resources.valid_until != old_resources.valid_until: rpki.log.debug("Validity changed: %s %s" % ( old_resources.valid_until, resources.valid_until)) - def revoke(child_cert): - - uri = child_cert.uri(ca) - rpki.log.debug("New child_cert %r uri %s" % (child_cert, uri)) - - def loop(iterator, x): + if must_revoke: + for x in child.child_certs(ca_detail = ca_detail, ski = self.ski): rpki.log.debug("Revoking child_cert %r" % x) - x.revoke(iterator, errback, withdraw = x.uri(ca) != uri) - - def manifest(): - ca_detail.generate_manifest(done, errback) + x.revoke(publisher = publisher) + ca_detail.generate_crl(publisher = publisher) - def done(): - callback(child_cert) - - certs_to_revoke = [x for x in child.child_certs(ca_detail = ca_detail, ski = self.ski) if x is not child_cert] - - if certs_to_revoke: - rpki.async.iterator(certs_to_revoke, loop, manifest) - else: - done() - - ca_detail.issue( + child_cert = ca_detail.issue( ca = ca, child = child, subject_key = self.cert.getPublicKey(), sia = sia, resources = resources, child_cert = None if must_revoke or new_issuer else self, - callback = revoke if must_revoke else callback, - errback = errback) + publisher = publisher) + + rpki.log.debug("New child_cert %r uri %s" % (child_cert, child_cert.uri(ca))) + + return child_cert @classmethod def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False): @@ -1078,6 +1057,14 @@ class child_cert_obj(rpki.sql.sql_persistent): else: return cls.sql_fetch_where(gctx, where, args) + def published_callback(self, pdu): + """ + Publication callback: check result and mark published. + """ + pdu.raise_if_error() + self.published = None + self.sql_mark_dirty() + class revoked_cert_obj(rpki.sql.sql_persistent): """ Tombstone for a revoked certificate. @@ -1130,11 +1117,13 @@ class roa_obj(rpki.sql.sql_persistent): "self_id", "asn", ("roa", rpki.x509.ROA), - ("cert", rpki.x509.X509)) + ("cert", rpki.x509.X509), + ("published", rpki.sundial.datetime)) ca_detail_id = None cert = None roa = None + published = None def self(self): """ @@ -1189,51 +1178,43 @@ class roa_obj(rpki.sql.sql_persistent): self.asn = asn self.ipv4 = ipv4 self.ipv6 = ipv6 - # - # You might think that we should call self.sql_mark_dirty() here, - # and you'd be right, except that other code kind of assumes that - # this object will not be saved to sql unless the .generate() - # method suceeds. If we never get as far as .generate(), or if - # .generate() fails, we want this roa_obj to softly and silently - # vanish away. - # - # Perhaps I'll figure out a cleaner way to do this some day. + + # Defer marking new ROA as dirty until .generate() has a chance to + # finish setup, otherwise we get SQL consistency errors. # #if self_id or asn or ipv4 or ipv6: self.sql_mark_dirty() - def update(self, callback, errback): + def update(self, publisher): """ Bring this roa_obj's ROA up to date if necesssary. """ if self.roa is None: - return self.generate(callback, errback) + return self.generate(publisher = publisher) ca_detail = self.ca_detail() if ca_detail is None or ca_detail.state != "active": - return self.regenerate(callback, errback) + return self.regenerate(publisher = publisher) regen_margin = rpki.sundial.timedelta(seconds = self.self().regen_margin) if rpki.sundial.now() + regen_margin > self.cert.getNotAfter(): - return self.regenerate(callback, errback) + return self.regenerate(publisher = publisher) ca_resources = ca_detail.latest_ca_cert.get_3779resources() ee_resources = self.cert.get_3779resources() if ee_resources.oversized(ca_resources): - return self.regenerate(callback, errback) + return self.regenerate(publisher = publisher) v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() if ee_resources.v4 != v4 or ee_resources.v6 != v6: - return self.regenerate(callback, errback) - - callback() + return self.regenerate(publisher = publisher) - def generate(self, callback, errback): + def generate(self, publisher): """ Generate a ROA. @@ -1282,33 +1263,41 @@ class roa_obj(rpki.sql.sql_persistent): raise rpki.exceptions.NoCoveringCertForROA, "generate() could not find a certificate covering %s %s" % (v4, v6) ca = ca_detail.ca() - resources = rpki.resource_set.resource_bag(v4 = v4, v6 = v6) - keypair = rpki.x509.RSA.generate() self.ca_detail_id = ca_detail.ca_detail_id - - self.cert = ca_detail.issue_ee(ca, resources, keypair.get_RSApublic(), - sia = ((rpki.oids.name2oid["id-ad-signedObject"], - ("uri", self.uri(keypair))),)) - + self.cert = ca_detail.issue_ee( + ca = ca, + resources = resources, + subject_key = keypair.get_RSApublic(), + sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri(keypair))),)) self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,)) - + self.published = rpki.sundial.now() self.sql_store() - def done(): - ca_detail.generate_manifest(callback, errback) + rpki.log.debug("Generating ROA %r" % self.uri()) + publisher.publish(cls = rpki.publication.roa_elt, uri = self.uri(), obj = self.roa, repository = ca.parent().repository(), handler = self.published_callback) + ca_detail.generate_manifest(publisher = publisher) - ca.parent().repository().publish(self.roa, self.uri(), done, errback) + def published_callback(self, pdu): + """ + Check publication result. + """ + pdu.raise_if_error() + self.published = None + self.sql_mark_dirty() - def revoke(self, callback, errback, regenerate = False, allow_failure = False): + def revoke(self, publisher, regenerate = False, allow_failure = False): """ Withdraw ROA associated with this roa_obj. In order to preserve make-before-break properties without duplicating code, this method also handles generating a replacement ROA when requested. + + If allow_failure is set, failing to withdraw the ROA will not be + considered an error. """ ca_detail = self.ca_detail() @@ -1319,35 +1308,26 @@ class roa_obj(rpki.sql.sql_persistent): if ca_detail.state != 'active': self.ca_detail_id = None - def one(): - rpki.log.debug("Withdrawing ROA and revoking its EE cert") - rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - ca_detail.ca().parent().repository().withdraw(roa, uri, two, errback, allow_failure) - - def two(): - self.gctx.sql.sweep() - ca_detail.generate_crl(three, errback) - - def three(): - ca_detail.generate_manifest(four, errback) - - def four(): - self.sql_delete() - callback() - if regenerate: - self.generate(one, errback) - else: - one() + self.generate(publisher = publisher) + + rpki.log.debug("Withdrawing ROA %r and revoking its EE cert" % uri) + rpki.rpki_engine.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) + publisher.withdraw(cls = rpki.publication.roa_elt, uri = uri, obj = roa, repository = ca_detail.ca().parent().repository(), + handler = False if allow_failure else None) + self.gctx.sql.sweep() + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + self.sql_delete() - def regenerate(self, callback, errback): + def regenerate(self, publisher): """ Reissue ROA associated with this roa_obj. """ if self.ca_detail() is None: - self.generate(callback, errback) + self.generate(publisher = publisher) else: - self.revoke(callback, errback, regenerate = True) + self.revoke(publisher = publisher, regenerate = True) def uri(self, key = None): """ @@ -1361,3 +1341,52 @@ class roa_obj(rpki.sql.sql_persistent): roa_obj's ROA. """ return (key or self.cert).gSKI() + ".roa" + + +class publication_queue(object): + """ + Utility to simplify publication from within rpkid. + + General idea here is to accumulate a collection of objects to be + published, in one or more repositories, each potentially with its + own completion callback. Eventually we want to publish everything + we've accumulated, at which point we need to iterate over the + collection and do repository.call_pubd() for each repository. + """ + + replace = True + + def __init__(self): + self.repositories = {} + self.msgs = {} + self.handlers = {} + if self.replace: + self.uris = {} + + def _add(self, cls, uri, obj, repository, handler, withdraw): + rid = id(repository) + if rid not in self.repositories: + self.repositories[rid] = repository + self.msgs[rid] = rpki.publication.msg.query() + if self.replace and uri in self.uris: + rpki.log.debug("Removing publication duplicate <%s %r %r>" % (self.uris[uri].action, self.uris[uri].uri, self.uris[uri].payload)) + self.msgs[rid].remove(self.uris.pop(uri)) + make_pdu = cls.make_withdraw if withdraw else cls.make_publish + pdu = make_pdu(uri = uri, obj = obj) + if handler is not None: + self.handlers[id(pdu)] = handler + pdu.tag = id(pdu) + self.msgs[rid].append(pdu) + if self.replace: + self.uris[uri] = pdu + + def publish( self, cls, uri, obj, repository, handler = None): + return self._add(cls, uri, obj, repository, handler, False) + + def withdraw(self, cls, uri, obj, repository, handler = None): + return self._add(cls, uri, obj, repository, handler, True) + + def call_pubd(self, cb, eb): + def loop(iterator, rid): + self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) + rpki.async.iterator(self.repositories, loop, cb) |