diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-19 03:36:42 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-19 03:36:42 +0000 |
commit | 7f5e75188ad4527e3c3425a155dfed0847a389dd (patch) | |
tree | 400301cae01f51141e380664cf0b382b8204a00d /rpki/rpkid_tasks.py | |
parent | 7ab6040f7eb05a7ac4424e0294d228256e9a64dd (diff) |
Amputate old SQL code out of rpkid with a fire axe, replacing it with
Django ORM. Duct tape and bailing wire everywhere, much clean-up left
to do, but basic "make yamltest" suite runs. Much of the clean-up
isn't worth doing until after revamping the I/O system, as it'll all
change again at that point anyway.
svn path=/branches/tk705/; revision=6127
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r-- | rpki/rpkid_tasks.py | 417 |
1 files changed, 226 insertions, 191 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index c44b2220..f6afad1e 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -87,11 +87,6 @@ class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. This just handles the scheduler hooks, real work starts in self.start. - - NB: This assumes that the rpki.rpkid.rpkid.task_* methods have been - rewritten to expect instances of subclasses of this class, rather - than expecting thunks to be wrapped up in the older version of this - class. Rewrite, rewrite, remove this comment when done, OK! """ ## @var timeslice @@ -100,7 +95,8 @@ class AbstractTask(object): timeslice = rpki.sundial.timedelta(seconds = 15) - def __init__(self, s, description = None): + def __init__(self, rpkid, s, description = None): + self.rpkid = rpkid self.self = s self.description = description self.completions = [] @@ -115,19 +111,17 @@ class AbstractTask(object): self.completions.append(completion) def exit(self): - self.self.gctx.sql.sweep() while self.completions: self.completions.pop(0)(self) self.clear() self.due_date = None - self.self.gctx.task_next() + self.rpkid.task_next() def postpone(self, continuation): - self.self.gctx.sql.sweep() self.continuation = continuation self.due_date = None - self.self.gctx.task_add(self) - self.self.gctx.task_next() + self.rpkid.task_add(self) + self.rpkid.task_next() def __call__(self): self.due_date = rpki.sundial.now() + self.timeslice @@ -163,58 +157,75 @@ class PollParentTask(AbstractTask): """ def clear(self): + logger.debug("PollParentTask.clear()") self.parent_iterator = None self.parent = None self.ca_map = None self.class_iterator = None + self.started = False def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] polling parents", self.self_handle, self.self_id) - rpki.async.iterator(self.parents, self.parent_loop, self.exit) + logger.debug("PollParentTask.start()") + self.rpkid.checkpoint() + logger.debug("Self %s[%r] polling parents", self.self_handle, self) + assert not self.started + self.started = True + # + # XXX Apparently "self" is a //really// bad choice for a column name with Django + # + rpki.async.iterator(rpki.rpkidb.models.Parent.objects.filter(self__exact = self.self), self.parent_loop, self.exit) def parent_loop(self, parent_iterator, parent): + logger.debug("PollParentTask.parent_loop()") self.parent_iterator = parent_iterator self.parent = parent - parent.up_down_list_query(self.got_list, self.list_failed) + parent.up_down_list_query(rpkid = self.rpkid, cb = self.got_list, eb = self.list_failed) def got_list(self, r_msg): - self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas) - self.gctx.checkpoint() + logger.debug("PollParentTask.got_list()") + self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas.all()) + self.rpkid.checkpoint() rpki.async.iterator(r_msg.getiterator(rpki.up_down.tag_class), self.class_loop, self.class_done) def list_failed(self, e): + logger.debug("PollParentTask.list_failed()") logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent) self.parent_iterator() def class_loop(self, class_iterator, rc): - self.gctx.checkpoint() + logger.debug("PollParentTask.class_loop()") + self.rpkid.checkpoint() self.class_iterator = class_iterator try: ca = self.ca_map.pop(rc.get("class_name")) except KeyError: - rpki.rpkid.ca_obj.create(self.parent, rc, class_iterator, self.class_create_failed) + rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = self.parent, rc = rc, + cb = class_iterator, eb = self.class_create_failed) else: - ca.check_for_updates(self.parent, rc, class_iterator, self.class_update_failed) + ca.check_for_updates(rpkid = self.rpkid, parent = self.parent, rc = rc, cb = class_iterator, eb = self.class_update_failed) def class_update_failed(self, e): + logger.debug("PollParentTask.class_update_failed()") logger.exception("Couldn't update class, skipping") self.class_iterator() def class_create_failed(self, e): + logger.debug("PollParentTask.class_create_failed()") logger.exception("Couldn't create class, skipping") self.class_iterator() def class_done(self): + logger.debug("PollParentTask.class_done()") rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done) def ca_loop(self, iterator, ca): - self.gctx.checkpoint() + logger.debug("PollParentTask.ca_loop()") + self.rpkid.checkpoint() ca.destroy(self.parent, iterator) def ca_done(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() + logger.debug("PollParentTask.ca_done()") + self.rpkid.checkpoint() self.parent_iterator() @@ -233,18 +244,23 @@ class UpdateChildrenTask(AbstractTask): self.iterator = None self.child = None self.child_certs = None + self.started = False def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating children", self.self_handle, self.self_id) + self.rpkid.checkpoint() + logger.debug("Self %s[%r] updating children", self.self_handle, self) + assert not self.started + self.started = True self.now = rpki.sundial.now() self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) - self.publisher = rpki.rpkid.publication_queue() - rpki.async.iterator(self.children, self.loop, self.done) + self.publisher = rpki.rpkid.publication_queue(self.rpkid) + # + # XXX Apparently "self" is a //really// bad choice for a column name with Django + # + rpki.async.iterator(rpki.rpkidb.models.Child.objects.filter(self__exact = self.self), self.loop, self.done) def loop(self, iterator, child): - self.gctx.checkpoint() - self.gctx.sql.sweep() + self.rpkid.checkpoint() self.iterator = iterator self.child = child self.child_certs = child.child_certs @@ -255,7 +271,7 @@ class UpdateChildrenTask(AbstractTask): def do_child(self): if self.child_certs: - self.gctx.irdb_query_child_resources(self.child.self.self_handle, self.child.child_handle, + self.rpkid.irdb_query_child_resources(self.child.self.self_handle, self.child.child_handle, self.got_resources, self.lose) else: self.iterator() @@ -266,74 +282,70 @@ class UpdateChildrenTask(AbstractTask): def got_resources(self, irdb_resources): try: - for child_cert in self.child_certs: + for child_cert in self.child_certs.filter(ca_detail__state = "active"): ca_detail = child_cert.ca_detail - ca = ca_detail.ca - if ca_detail.state == "active": - old_resources = child_cert.cert.get_3779resources() - new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() - old_aia = child_cert.cert.get_AIA()[0] - new_aia = ca_detail.ca_cert_uri - - if new_resources.empty(): - logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - child_cert.revoke(publisher = self.publisher) - ca_detail.generate_crl(publisher = self.publisher) - ca_detail.generate_manifest(publisher = self.publisher) - - elif (old_resources != new_resources or - old_aia != new_aia or - (old_resources.valid_until < self.rsn and - irdb_resources.valid_until > self.now and - old_resources.valid_until != irdb_resources.valid_until)): - - logger.debug("Need to reissue child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - if old_resources != new_resources: - logger.debug("Child %s SKI %s resources changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources, new_resources) - if old_resources.valid_until != irdb_resources.valid_until: - logger.debug("Child %s SKI %s validity changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - - new_resources.valid_until = irdb_resources.valid_until - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources, - publisher = self.publisher) - - elif old_resources.valid_until < self.now: - logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", + old_resources = child_cert.cert.get_3779resources() + new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() + old_aia = child_cert.cert.get_AIA()[0] + new_aia = ca_detail.ca_cert_uri + + if new_resources.empty(): + logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", + self.child.child_handle, child_cert.cert.gSKI()) + child_cert.revoke(publisher = self.publisher) + ca_detail.generate_crl(publisher = self.publisher) + ca_detail.generate_manifest(publisher = self.publisher) + + elif (old_resources != new_resources or + old_aia != new_aia or + (old_resources.valid_until < self.rsn and + irdb_resources.valid_until > self.now and + old_resources.valid_until != irdb_resources.valid_until)): + + logger.debug("Need to reissue child %s certificate SKI %s", + self.child.child_handle, child_cert.cert.gSKI()) + if old_resources != new_resources: + logger.debug("Child %s SKI %s resources changed: old %s new %s", + self.child.child_handle, child_cert.cert.gSKI(), + old_resources, new_resources) + if old_resources.valid_until != irdb_resources.valid_until: + logger.debug("Child %s SKI %s validity changed: old %s new %s", self.child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until) - child_cert.sql_delete() - self.publisher.queue( - uri = child_cert.uri, - old_obj = child_cert.cert, - repository = ca.parent.repository) - ca_detail.generate_manifest(publisher = self.publisher) + + new_resources.valid_until = irdb_resources.valid_until + child_cert.reissue( + ca_detail = ca_detail, + resources = new_resources, + publisher = self.publisher) + + elif old_resources.valid_until < self.now: + logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", + self.child.child_handle, child_cert.cert.gSKI(), + old_resources.valid_until, irdb_resources.valid_until) + child_cert.delete() + self.publisher.queue( + uri = child_cert.uri, + old_obj = child_cert.cert, + repository = ca_detail.ca.parent.repository) + ca_detail.generate_manifest(publisher = self.publisher) except (SystemExit, rpki.async.ExitNow): raise except Exception, e: - self.gctx.checkpoint() + self.rpkid.checkpoint() self.lose(e) else: - self.gctx.checkpoint() - self.gctx.sql.sweep() + self.rpkid.checkpoint() self.iterator() def done(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() + self.rpkid.checkpoint() self.publisher.call_pubd(self.exit, self.publication_failed) def publication_failed(self, e): logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() self.exit() @@ -349,42 +361,55 @@ class UpdateROAsTask(AbstractTask): self.publisher = None self.ca_details = None self.count = None + self.started = False def start(self): - self.gctx.checkpoint() - self.gctx.sql.sweep() - logger.debug("Self %s[%d] updating ROAs", self.self_handle, self.self_id) - + self.rpkid.checkpoint() + logger.debug("Self %s[%r] updating ROAs", self.self_handle, self) + assert not self.started + self.started = True logger.debug("Issuing query for ROA requests") - self.gctx.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed) + self.rpkid.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed) def got_roa_requests(self, r_msg): - self.gctx.checkpoint() + self.rpkid.checkpoint() logger.debug("Received response to query for ROA requests") - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - roas = {} seen = set() self.orphans = [] self.updates = [] - self.publisher = rpki.rpkid.publication_queue() + self.publisher = rpki.rpkid.publication_queue(self.rpkid) self.ca_details = set() - for roa in self.roas: + logger.debug("UpdateROAsTask.got_roa_requests(): setup done, self.orphans %r", self.orphans) + assert isinstance(self.orphans, list) # XXX + + for roa in rpki.rpkidb.models.ROA.objects.filter(self__exact = self.self): # XXX + logger.debug("UpdateROAsTask.got_roa_requests(): roa loop, self.orphans %r", self.orphans) + assert isinstance(self.orphans, list) # XXX k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: roas[k] = roa - elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and - (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): + elif (roa.roa is not None and + roa.cert is not None and + roa.ca_detail is not None and + roa.ca_detail.state == "active" and + (roas[k].roa is None or + roas[k].cert is None or + roas[k].ca_detail is None or + roas[k].ca_detail.state != "active")): self.orphans.append(roas[k]) roas[k] = roa else: self.orphans.append(roa) + logger.debug("UpdateROAsTask.got_roa_requests(): roa loop done, self.orphans %r", self.orphans) + assert isinstance(self.orphans, list) # XXX + for r_pdu in r_msg: + logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop, self.orphans %r", self.orphans) + assert isinstance(self.orphans, list) k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) if k in seen: logger.warning("Skipping duplicate ROA request %r", r_pdu) @@ -392,14 +417,16 @@ class UpdateROAsTask(AbstractTask): seen.add(k) roa = roas.pop(k, None) if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, long(r_pdu.get("asn")), - rpki.resource_set.roa_prefix_set_ipv4(r_pdu.get("ipv4")), - rpki.resource_set.roa_prefix_set_ipv6(r_pdu.get("ipv6"))) + roa = rpki.rpkidb.models.ROA(asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + roa.self = self.self logger.debug("Created new %r", roa) else: logger.debug("Found existing %r", roa) self.updates.append(roa) + logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop done, self.orphans %r", self.orphans) + assert isinstance(self.orphans, list) # XXX + self.orphans.extend(roas.itervalues()) if self.overdue: @@ -412,11 +439,10 @@ class UpdateROAsTask(AbstractTask): rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True) def loop(self, iterator, roa): - self.gctx.checkpoint() + self.rpkid.checkpoint() try: roa.update(publisher = self.publisher, fast = True) self.ca_details.add(roa.ca_detail) - self.gctx.sql.sweep() except (SystemExit, rpki.async.ExitNow): raise except rpki.exceptions.NoCoveringCertForROA: @@ -437,13 +463,12 @@ class UpdateROAsTask(AbstractTask): logger.debug("Generating new manifest for %r", ca_detail) ca_detail.generate_manifest(publisher = self.publisher) self.ca_details.clear() - self.gctx.sql.sweep() - self.gctx.checkpoint() + self.rpkid.checkpoint() self.publisher.call_pubd(done, self.publication_failed) def publication_failed(self, e): logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() self.exit() def done(self): @@ -455,8 +480,7 @@ class UpdateROAsTask(AbstractTask): raise except Exception: logger.exception("Could not revoke %r", roa) - self.gctx.sql.sweep() - self.gctx.checkpoint() + self.rpkid.checkpoint() self.publish(self.exit) def roa_requests_failed(self, e): @@ -476,41 +500,41 @@ class UpdateGhostbustersTask(AbstractTask): exceptionally silly. """ - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating Ghostbuster records", - self.self_handle, self.self_id) + def clear(self): + self.started = False - self.gctx.irdb_query_ghostbuster_requests(self.self_handle, - (p.parent_handle for p in self.parents), - self.got_ghostbuster_requests, - self.ghostbuster_requests_failed) + def start(self): + self.rpkid.checkpoint() + logger.debug("Self %s[%r] updating Ghostbuster records", self.self_handle, self) + assert not self.started + self.started = True + parent_handles = set(p.parent_handle for p in rpki.rpkidb.models.Parent.objects.filter(self__exact = self.self)) + self.rpkid.irdb_query_ghostbuster_requests(self.self_handle, parent_handles, + self.got_ghostbuster_requests, + self.ghostbuster_requests_failed) def got_ghostbuster_requests(self, r_msg): try: - self.gctx.checkpoint() - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() + self.rpkid.checkpoint() ghostbusters = {} orphans = [] - publisher = rpki.rpkid.publication_queue() + publisher = rpki.rpkid.publication_queue(self.rpkid) ca_details = set() seen = set() - parents = dict((p.parent_handle, p) for p in self.parents) - - for ghostbuster in self.ghostbusters: - k = (ghostbuster.ca_detail_id, ghostbuster.vcard) + for ghostbuster in rpki.rpkidb.models.Ghostbuster.objects.filter(self__exact = self.self): + k = (ghostbuster.ca_detail.pk, ghostbuster.vcard) if ghostbuster.ca_detail.state != "active" or k in ghostbusters: orphans.append(ghostbuster) else: ghostbusters[k] = ghostbuster for r_pdu in r_msg: - if r_pdu.get("parent_handle") not in parents: + try: + rpki.rpkidb.models.Parent.objects.get(self__exact = self.self, parent_handle = r_pdu.get("parent_handle")) + except rpki.rpkidb.models.Parent.DoesNotExist: logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", r_pdu.get("parent_handle")) continue k = (r_pdu.get("parent_handle"), r_pdu.text) @@ -518,17 +542,17 @@ class UpdateGhostbustersTask(AbstractTask): logger.warning("Skipping duplicate Ghostbuster request %r", r_pdu) continue seen.add(k) - for ca in parents[r_pdu.get("parent_handle")].cas: - ca_detail = ca.active_ca_detail - if ca_detail is not None: - ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, r_pdu.text), None) - if ghostbuster is None: - ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, r_pdu.text) - logger.debug("Created new %r for %r", ghostbuster, r_pdu.get("parent_handle")) - else: - logger.debug("Found existing %r for %s", ghostbuster, r_pdu.get("parent_handle")) - ghostbuster.update(publisher = publisher, fast = True) - ca_details.add(ca_detail) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), + ca__parent__self = self.self, state = "active"): + ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None) + if ghostbuster is None: + ghostbuster = rpki.rpkidb.models.Ghostbuster(ca_detail = ca_detail, vcard = r_pdu.text) + ghostbuster.self = self.self + logger.debug("Created new %r for %r", ghostbuster, r_pdu.get("parent_handle")) + else: + logger.debug("Found existing %r for %s", ghostbuster, r_pdu.get("parent_handle")) + ghostbuster.update(publisher = publisher, fast = True) + ca_details.add(ca_detail) orphans.extend(ghostbusters.itervalues()) for ghostbuster in orphans: @@ -539,9 +563,7 @@ class UpdateGhostbustersTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() - - self.gctx.checkpoint() + self.rpkid.checkpoint() publisher.call_pubd(self.exit, self.publication_failed) except (SystemExit, rpki.async.ExitNow): @@ -552,7 +574,7 @@ class UpdateGhostbustersTask(AbstractTask): def publication_failed(self, e): logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() self.exit() def ghostbuster_requests_failed(self, e): @@ -569,26 +591,27 @@ class UpdateEECertificatesTask(AbstractTask): so keeping it simple for initial version, we can optimize later. """ - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] updating EE certificates", self.self_handle, self.self_id) + def clear(self): + self.started = False - self.gctx.irdb_query_ee_certificate_requests(self.self_handle, + def start(self): + self.rpkid.checkpoint() + logger.debug("Self %s[%r] updating EE certificates", self.self_handle, self) + assert not self.started + self.started = True + self.rpkid.irdb_query_ee_certificate_requests(self.self_handle, self.got_requests, self.get_requests_failed) def got_requests(self, r_msg): try: - self.gctx.checkpoint() - if self.gctx.sql.dirty: - logger.warning("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() + self.rpkid.checkpoint() - publisher = rpki.rpkid.publication_queue() + publisher = rpki.rpkid.publication_queue(self.rpkid) existing = dict() - for ee in self.ee_certificates: + for ee in rpki.rpkidb.models.EECertificate.objects.filter(self__exact = self.self): # XXX gski = ee.gski if gski not in existing: existing[gski] = set() @@ -626,7 +649,7 @@ class UpdateEECertificatesTask(AbstractTask): for ca_detail in covering: logger.debug("No existing EE certificate for %s %s", gski, resources) - rpki.rpkid.ee_cert_obj.create( + rpki.rpkidb.models.EECertificate.create( # sic: class method, not Django manager method (for now, anyway) ca_detail = ca_detail, subject_name = subject_name, subject_key = subject_key, @@ -640,15 +663,11 @@ class UpdateEECertificatesTask(AbstractTask): ca_details.add(ee.ca_detail) ee.revoke(publisher = publisher) - self.gctx.sql.sweep() - for ca_detail in ca_details: ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() - - self.gctx.checkpoint() + self.rpkid.checkpoint() publisher.call_pubd(self.exit, self.publication_failed) except (SystemExit, rpki.async.ExitNow): @@ -659,7 +678,7 @@ class UpdateEECertificatesTask(AbstractTask): def publication_failed(self, e): logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() self.exit() def get_requests_failed(self, e): @@ -680,38 +699,48 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): database anyway. """ - def start(self): - self.gctx.checkpoint() - logger.debug("Self %s[%d] regenerating CRLs and manifests", - self.self_handle, self.self_id) + def clear(self): + self.started = False + def start(self): + self.rpkid.checkpoint() + logger.debug("Self %s[%r] regenerating CRLs and manifests", self.self_handle, self) + assert not self.started + self.started = True now = rpki.sundial.now() crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval) - regen_margin = max(self.gctx.cron_period * 2, crl_interval / 4) - publisher = rpki.rpkid.publication_queue() + regen_margin = max(self.rpkid.cron_period * 2, crl_interval / 4) + publisher = rpki.rpkid.publication_queue(self.rpkid) - for parent in self.parents: - for ca in parent.cas: - try: - for ca_detail in ca.revoked_ca_details: - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.destroy(ca = ca, publisher = publisher) - for ca_detail in ca.active_or_deprecated_ca_details: - if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca) - - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.lose) + logger.debug("RegenerateCRLsAndManifestsTask: setup complete") # XXX + + for ca in rpki.rpkidb.models.CA.objects.filter(parent__self = self.self): + logger.debug("RegenerateCRLsAndManifestsTask: checking CA %r", ca) # XXX + try: + for ca_detail in ca.ca_details.filter(state = "revoked"): + if now > ca_detail.latest_crl.getNextUpdate(): + ca_detail.destroy(ca = ca, publisher = publisher) + for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")): + if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + except (SystemExit, rpki.async.ExitNow): + raise + except Exception: + logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca) + + logger.debug("RegenerateCRLsAndManifestsTask: CA loop complete") # XXX + + self.rpkid.checkpoint() + publisher.call_pubd(self.done, self.lose) + + def done(self): + logger.debug("RegenerateCRLsAndManifestsTask: publication complete") # XXX + self.exit() def lose(self, e): logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() self.exit() @@ -722,18 +751,24 @@ class CheckFailedPublication(AbstractTask): to pubd being down or unreachable). """ + def clear(self): + self.started = False + def start(self): - publisher = rpki.rpkid.publication_queue() - for parent in self.parents: - for ca in parent.cas: - ca_detail = ca.active_ca_detail - if ca_detail is not None: - ca_detail.check_failed_publication(publisher) - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.publication_failed) + assert not self.started + logger.debug("CheckFailedPublication starting") + self.started = True + publisher = rpki.rpkid.publication_queue(self.rpkid) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__self = self.self, state = "active"): + ca_detail.check_failed_publication(publisher) + self.rpkid.checkpoint() + publisher.call_pubd(self.done, self.publication_failed) def publication_failed(self, e): logger.exception("Couldn't publish for %s, skipping", self.self_handle) - self.gctx.checkpoint() + self.rpkid.checkpoint() + self.exit() + + def done(self): + logger.debug("CheckFailedPublication finished") self.exit() |