aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r--rpki/rpkid_tasks.py750
1 files changed, 750 insertions, 0 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
new file mode 100644
index 00000000..04e1c0df
--- /dev/null
+++ b/rpki/rpkid_tasks.py
@@ -0,0 +1,750 @@
+# $Id$
+#
+# Copyright (C) 2014 Dragon Research Labs ("DRL")
+# Portions copyright (C) 2012--2013 Internet Systems Consortium ("ISC")
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notices and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL
+# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR
+# ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
+# DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA
+# OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
+# TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+"""
+rpkid task objects. Split out from rpki.left_right and rpki.rpkid
+because interactions with rpkid scheduler were getting too complicated.
+"""
+
+import rpki.log
+import rpki.rpkid
+import rpki.async
+import rpki.up_down
+import rpki.sundial
+import rpki.publication
+import rpki.exceptions
+
+task_classes = ()
+
+def queue_task(cls):
+ """
+ Class decorator to add a new task class to task_classes.
+ """
+
+ global task_classes
+ task_classes += (cls,)
+ return cls
+
+
+class CompletionHandler(object):
+ """
+ Track one or more scheduled rpkid tasks and execute a callback when
+ the last of them terminates.
+ """
+
+ ## @var debug
+ # Debug logging.
+
+ debug = False
+
+ def __init__(self, cb):
+ self.cb = cb
+ self.tasks = set()
+
+ def register(self, task):
+ if self.debug:
+ rpki.log.debug("Completion handler %r registering task %r" % (self, task))
+ self.tasks.add(task)
+ task.register_completion(self.done)
+
+ def done(self, task):
+ try:
+ self.tasks.remove(task)
+ except KeyError:
+ rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task))
+ else:
+ if self.debug:
+ rpki.log.debug("Completion handler %r called with registered task %r" % (self, task))
+ if not self.tasks:
+ if self.debug:
+ rpki.log.debug("Completion handler %r finished, calling %r" % (self, self.cb))
+ self.cb()
+
+ @property
+ def count(self):
+ return len(self.tasks)
+
+
+class AbstractTask(object):
+ """
+ Abstract base class for rpkid scheduler task objects. This just
+ handles the scheduler hooks, real work starts in self.start.
+
+ NB: This assumes that the rpki.rpkid.rpkid.task_* methods have been
+ rewritten to expect instances of subclasses of this class, rather
+ than expecting thunks to be wrapped up in the older version of this
+ class. Rewrite, rewrite, remove this comment when done, OK!
+ """
+
+ ## @var timeslice
+ # How long before a task really should consider yielding the CPU to
+ # let something else run.
+
+ timeslice = rpki.sundial.timedelta(seconds = 15)
+
+ def __init__(self, s, description = None):
+ self.self = s
+ self.description = description
+ self.completions = []
+ self.continuation = None
+ self.due_date = None
+ self.clear()
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.description)
+
+ def register_completion(self, completion):
+ self.completions.append(completion)
+
+ def exit(self):
+ while self.completions:
+ self.completions.pop(0)(self)
+ self.clear()
+ self.due_date = None
+ self.self.gctx.task_next()
+
+ def postpone(self, continuation):
+ self.continuation = continuation
+ self.due_date = None
+ self.self.gctx.task_add(self)
+ self.self.gctx.task_next()
+
+ def __call__(self):
+ self.due_date = rpki.sundial.now() + self.timeslice
+ if self.continuation is None:
+ rpki.log.debug("Running task %r" % self)
+ self.clear()
+ self.start()
+ else:
+ rpki.log.debug("Restarting task %r at %r" % (self, self.continuation))
+ continuation = self.continuation
+ self.continuation = None
+ continuation()
+
+ @property
+ def overdue(self):
+ return rpki.sundial.now() > self.due_date
+
+ def __getattr__(self, name):
+ return getattr(self.self, name)
+
+ def start(self):
+ raise NotImplementedError
+
+ def clear(self):
+ pass
+
+
+@queue_task
+class PollParentTask(AbstractTask):
+ """
+ Run the regular client poll cycle with each of this self's
+ parents, in turn.
+ """
+
+ def clear(self):
+ self.parent_iterator = None
+ self.parent = None
+ self.ca_map = None
+ self.class_iterator = None
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] polling parents" % (self.self_handle, self.self_id))
+ rpki.async.iterator(self.parents, self.parent_loop, self.exit)
+
+ def parent_loop(self, parent_iterator, parent):
+ self.parent_iterator = parent_iterator
+ self.parent = parent
+ rpki.up_down.list_pdu.query(parent, self.got_list, self.list_failed)
+
+ def got_list(self, r_msg):
+ self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas)
+ self.gctx.checkpoint()
+ rpki.async.iterator(r_msg.payload.classes, self.class_loop, self.class_done)
+
+ def list_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s (%r)" % (
+ self.parent, e, e))
+ self.parent_iterator()
+
+ def class_loop(self, class_iterator, rc):
+ self.gctx.checkpoint()
+ self.class_iterator = class_iterator
+ try:
+ ca = self.ca_map.pop(rc.class_name)
+ except KeyError:
+ rpki.rpkid.ca_obj.create(self.parent, rc, class_iterator, self.class_create_failed)
+ else:
+ ca.check_for_updates(self.parent, rc, class_iterator, self.class_update_failed)
+
+ def class_update_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't update class, skipping: %s" % e)
+ self.class_iterator()
+
+ def class_create_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't create class, skipping: %s" % e)
+ self.class_iterator()
+
+ def class_done(self):
+ rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done)
+
+ def ca_loop(self, iterator, ca):
+ self.gctx.checkpoint()
+ ca.delete(self.parent, iterator)
+
+ def ca_done(self):
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.parent_iterator()
+
+
+@queue_task
+class UpdateChildrenTask(AbstractTask):
+ """
+ Check for updated IRDB data for all of this self's children and
+ issue new certs as necessary. Must handle changes both in
+ resources and in expiration date.
+ """
+
+ def clear(self):
+ self.now = None
+ self.rsn = None
+ self.publisher = None
+ self.iterator = None
+ self.child = None
+ self.child_certs = None
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id))
+ self.now = rpki.sundial.now()
+ self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin)
+ self.publisher = rpki.rpkid.publication_queue()
+ rpki.async.iterator(self.children, self.loop, self.done)
+
+ def loop(self, iterator, child):
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.iterator = iterator
+ self.child = child
+ self.child_certs = child.child_certs
+ if self.overdue:
+ self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed)
+ else:
+ self.do_child()
+
+ def do_child(self):
+ if self.child_certs:
+ self.gctx.irdb_query_child_resources(self.child.self.self_handle, self.child.child_handle,
+ self.got_resources, self.lose)
+ else:
+ self.iterator()
+
+ def lose(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't update child %r, skipping: %s" % (self.child, e))
+ self.iterator()
+
+ def got_resources(self, irdb_resources):
+ try:
+ for child_cert in self.child_certs:
+ ca_detail = child_cert.ca_detail
+ ca = ca_detail.ca
+ if ca_detail.state == "active":
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
+ old_aia = child_cert.cert.get_AIA()[0]
+ new_aia = ca_detail.ca_cert_uri
+
+ if new_resources.empty():
+ rpki.log.debug("Resources shrank to the null set, "
+ "revoking and withdrawing child %s certificate SKI %s" % (
+ self.child.child_handle, child_cert.cert.gSKI()))
+ child_cert.revoke(publisher = self.publisher)
+ ca_detail.generate_crl(publisher = self.publisher)
+ ca_detail.generate_manifest(publisher = self.publisher)
+
+ elif (old_resources != new_resources or
+ old_aia != new_aia or
+ (old_resources.valid_until < self.rsn and
+ irdb_resources.valid_until > self.now and
+ old_resources.valid_until != irdb_resources.valid_until)):
+
+ rpki.log.debug("Need to reissue child %s certificate SKI %s" % (
+ self.child.child_handle, child_cert.cert.gSKI()))
+ if old_resources != new_resources:
+ rpki.log.debug("Child %s SKI %s resources changed: old %s new %s" % (
+ self.child.child_handle, child_cert.cert.gSKI(), old_resources, new_resources))
+ if old_resources.valid_until != irdb_resources.valid_until:
+ rpki.log.debug("Child %s SKI %s validity changed: old %s new %s" % (
+ self.child.child_handle, child_cert.cert.gSKI(),
+ old_resources.valid_until, irdb_resources.valid_until))
+
+ new_resources.valid_until = irdb_resources.valid_until
+ child_cert.reissue(
+ ca_detail = ca_detail,
+ resources = new_resources,
+ publisher = self.publisher)
+
+ elif old_resources.valid_until < self.now:
+ rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
+ % (self.child.child_handle, child_cert.cert.gSKI(),
+ old_resources.valid_until, irdb_resources.valid_until))
+ child_cert.sql_delete()
+ self.publisher.withdraw(
+ cls = rpki.publication.certificate_elt,
+ uri = child_cert.uri,
+ obj = child_cert.cert,
+ repository = ca.parent.repository)
+ ca_detail.generate_manifest(publisher = self.publisher)
+
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ self.gctx.checkpoint()
+ self.lose(e)
+ else:
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.iterator()
+
+ def done(self):
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.publisher.call_pubd(self.exit, self.publication_failed)
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+
+@queue_task
+class UpdateROAsTask(AbstractTask):
+ """
+ Generate or update ROAs for this self.
+ """
+
+ def clear(self):
+ self.orphans = None
+ self.updates = None
+ self.publisher = None
+ self.ca_details = None
+ self.count = None
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ rpki.log.debug("Self %s[%d] updating ROAs" % (self.self_handle, self.self_id))
+
+ rpki.log.debug("Issuing query for ROA requests")
+ self.gctx.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed)
+
+ def got_roa_requests(self, roa_requests):
+ self.gctx.checkpoint()
+ rpki.log.debug("Received response to query for ROA requests")
+
+ if self.gctx.sql.dirty:
+ rpki.log.warn("Unexpected dirty SQL cache, flushing")
+ self.gctx.sql.sweep()
+
+ roas = {}
+ seen = set()
+ self.orphans = []
+ self.updates = []
+ self.publisher = rpki.rpkid.publication_queue()
+ self.ca_details = set()
+
+ for roa in self.roas:
+ k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
+ if k not in roas:
+ roas[k] = roa
+ elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and
+ (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")):
+ self.orphans.append(roas[k])
+ roas[k] = roa
+ else:
+ self.orphans.append(roa)
+
+ for roa_request in roa_requests:
+ k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
+ if k in seen:
+ rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Created new %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ self.updates.append(roa)
+
+ self.orphans.extend(roas.itervalues())
+
+ if self.overdue:
+ self.postpone(self.begin_loop)
+ else:
+ self.begin_loop()
+
+ def begin_loop(self):
+ self.count = 0
+ rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True)
+
+ def loop(self, iterator, roa):
+ self.gctx.checkpoint()
+ try:
+ roa.update(publisher = self.publisher, fast = True)
+ self.ca_details.add(roa.ca_detail)
+ self.gctx.sql.sweep()
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except rpki.exceptions.NoCoveringCertForROA:
+ rpki.log.warn("No covering certificate for %r, skipping" % roa)
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
+ self.count += 1
+ if self.overdue:
+ self.publish(lambda: self.postpone(iterator))
+ else:
+ iterator()
+
+ def publish(self, done):
+ if not self.publisher.empty():
+ for ca_detail in self.ca_details:
+ rpki.log.debug("Generating new CRL for %r" % ca_detail)
+ ca_detail.generate_crl(publisher = self.publisher)
+ rpki.log.debug("Generating new manifest for %r" % ca_detail)
+ ca_detail.generate_manifest(publisher = self.publisher)
+ self.ca_details.clear()
+ self.gctx.sql.sweep()
+ self.gctx.checkpoint()
+ self.publisher.call_pubd(done, self.publication_failed)
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+ def done(self):
+ for roa in self.orphans:
+ try:
+ self.ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = self.publisher, fast = True)
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not revoke %r: %s" % (roa, e))
+ self.gctx.sql.sweep()
+ self.gctx.checkpoint()
+ self.publish(self.exit)
+
+ def roa_requests_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+
+@queue_task
+class UpdateGhostbustersTask(AbstractTask):
+ """
+ Generate or update Ghostbuster records for this self.
+
+ This was originally based on the ROA update code. It's possible
+ that both could benefit from refactoring, but at this point the
+ potential scaling issues for ROAs completely dominate structure of
+ the ROA code, and aren't relevant here unless someone is being
+ exceptionally silly.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] updating Ghostbuster records" % (self.self_handle, self.self_id))
+
+ self.gctx.irdb_query_ghostbuster_requests(self.self_handle,
+ (p.parent_handle for p in self.parents),
+ self.got_ghostbuster_requests,
+ self.ghostbuster_requests_failed)
+
+ def got_ghostbuster_requests(self, ghostbuster_requests):
+
+ try:
+ self.gctx.checkpoint()
+ if self.gctx.sql.dirty:
+ rpki.log.warn("Unexpected dirty SQL cache, flushing")
+ self.gctx.sql.sweep()
+
+ ghostbusters = {}
+ orphans = []
+ publisher = rpki.rpkid.publication_queue()
+ ca_details = set()
+ seen = set()
+
+ parents = dict((p.parent_handle, p) for p in self.parents)
+
+ for ghostbuster in self.ghostbusters:
+ k = (ghostbuster.ca_detail_id, ghostbuster.vcard)
+ if ghostbuster.ca_detail.state != "active" or k in ghostbusters:
+ orphans.append(ghostbuster)
+ else:
+ ghostbusters[k] = ghostbuster
+
+ for ghostbuster_request in ghostbuster_requests:
+ if ghostbuster_request.parent_handle not in parents:
+ rpki.log.warn("Unknown parent_handle %r in Ghostbuster request, skipping" % ghostbuster_request.parent_handle)
+ continue
+ k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard)
+ if k in seen:
+ rpki.log.warn("Skipping duplicate Ghostbuster request %r" % ghostbuster_request)
+ continue
+ seen.add(k)
+ for ca in parents[ghostbuster_request.parent_handle].cas:
+ ca_detail = ca.active_ca_detail
+ if ca_detail is not None:
+ ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None)
+ if ghostbuster is None:
+ ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard)
+ rpki.log.debug("Created new %r for %r" % (ghostbuster, ghostbuster_request.parent_handle))
+ else:
+ rpki.log.debug("Found existing %r for %s" % (ghostbuster, ghostbuster_request.parent_handle))
+ ghostbuster.update(publisher = publisher, fast = True)
+ ca_details.add(ca_detail)
+
+ orphans.extend(ghostbusters.itervalues())
+ for ghostbuster in orphans:
+ ca_details.add(ghostbuster.ca_detail)
+ ghostbuster.revoke(publisher = publisher, fast = True)
+
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+
+ self.gctx.sql.sweep()
+
+ self.gctx.checkpoint()
+ publisher.call_pubd(self.exit, self.publication_failed)
+
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not update Ghostbuster records for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish Ghostbuster updates for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+ def ghostbuster_requests_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Could not fetch Ghostbuster record requests for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+
+@queue_task
+class UpdateEECertificatesTask(AbstractTask):
+ """
+ Generate or update EE certificates for this self.
+
+ Not yet sure what kind of scaling constraints this task might have,
+ so keeping it simple for initial version, we can optimize later.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] updating EE certificates" % (self.self_handle, self.self_id))
+
+ self.gctx.irdb_query_ee_certificate_requests(self.self_handle,
+ self.got_requests,
+ self.get_requests_failed)
+
+ def got_requests(self, requests):
+
+ try:
+ self.gctx.checkpoint()
+ if self.gctx.sql.dirty:
+ rpki.log.warn("Unexpected dirty SQL cache, flushing")
+ self.gctx.sql.sweep()
+
+ publisher = rpki.rpkid.publication_queue()
+
+ existing = dict()
+ for ee in self.ee_certificates:
+ gski = ee.gski
+ if gski not in existing:
+ existing[gski] = set()
+ existing[gski].add(ee)
+
+ ca_details = set()
+
+ for req in requests:
+ ees = existing.pop(req.gski, ())
+ resources = rpki.resource_set.resource_bag(
+ asn = req.asn,
+ v4 = req.ipv4,
+ v6 = req.ipv6,
+ valid_until = req.valid_until)
+ covering = self.find_covering_ca_details(resources)
+ ca_details.update(covering)
+
+ for ee in ees:
+ if ee.ca_detail in covering:
+ rpki.log.debug("Updating existing EE certificate for %s %s" % (req.gski, resources))
+ ee.reissue(
+ resources = resources,
+ publisher = publisher)
+ covering.remove(ee.ca_detail)
+ else:
+ rpki.log.debug("Existing EE certificate for %s %s is no longer covered" % (req.gski, resources))
+ ee.revoke(publisher = publisher)
+
+ for ca_detail in covering:
+ rpki.log.debug("No existing EE certificate for %s %s" % (req.gski, resources))
+ rpki.rpkid.ee_cert_obj.create(
+ ca_detail = ca_detail,
+ subject_name = rpki.x509.X501DN.from_cn(req.cn, req.sn),
+ subject_key = req.pkcs10.getPublicKey(),
+ resources = resources,
+ publisher = publisher,
+ eku = req.eku or None)
+
+ # Anything left is an orphan
+ for ees in existing.values():
+ for ee in ees:
+ ca_details.add(ee.ca_detail)
+ ee.revoke(publisher = publisher)
+
+ self.gctx.sql.sweep()
+
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+
+ self.gctx.sql.sweep()
+
+ self.gctx.checkpoint()
+ publisher.call_pubd(self.exit, self.publication_failed)
+
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not update EE certificates for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish EE certificate updates for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+ def get_requests_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Could not fetch EE certificate requests for %s, skipping: %s" % (self.self_handle, e))
+ self.exit()
+
+
+@queue_task
+class RegenerateCRLsAndManifestsTask(AbstractTask):
+ """
+ Generate new CRLs and manifests as necessary for all of this self's
+ CAs. Extracting nextUpdate from a manifest is hard at the moment
+ due to implementation silliness, so for now we generate a new
+ manifest whenever we generate a new CRL
+
+ This code also cleans up tombstones left behind by revoked ca_detail
+ objects, since we're walking through the relevant portions of the
+ database anyway.
+ """
+
+ def start(self):
+ rpki.log.trace()
+ self.gctx.checkpoint()
+ rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id))
+
+ now = rpki.sundial.now()
+ crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval)
+ regen_margin = max(self.gctx.cron_period * 2, crl_interval / 4)
+ publisher = rpki.rpkid.publication_queue()
+
+ for parent in self.parents:
+ for ca in parent.cas:
+ try:
+ for ca_detail in ca.revoked_ca_details:
+ if now > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.delete(ca = ca, publisher = publisher)
+ for ca_detail in ca.active_or_deprecated_ca_details:
+ if now + regen_margin > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e))
+
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ publisher.call_pubd(self.exit, self.lose)
+
+ def lose(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish updated CRLs and manifests for self %r, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()
+
+
+@queue_task
+class CheckFailedPublication(AbstractTask):
+ """
+ Periodic check for objects we tried to publish but failed (eg, due
+ to pubd being down or unreachable).
+ """
+
+ def start(self):
+ rpki.log.trace()
+ publisher = rpki.rpkid.publication_queue()
+ for parent in self.parents:
+ for ca in parent.cas:
+ ca_detail = ca.active_ca_detail
+ if ca_detail is not None:
+ ca_detail.check_failed_publication(publisher)
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ publisher.call_pubd(self.exit, self.publication_failed)
+
+ def publication_failed(self, e):
+ rpki.log.traceback()
+ rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
+ self.gctx.checkpoint()
+ self.exit()