aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpkid_tasks.py')
-rw-r--r--rpki/rpkid_tasks.py1265
1 files changed, 604 insertions, 661 deletions
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index 58b4bcfe..ee4f90d3 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -22,9 +22,18 @@ because interactions with rpkid scheduler were getting too complicated.
"""
import logging
+import random
+
+import tornado.gen
+import tornado.web
+import tornado.locks
+import tornado.ioloop
+import tornado.httputil
+import tornado.httpclient
+import tornado.httpserver
+
import rpki.log
import rpki.rpkid
-import rpki.async
import rpki.up_down
import rpki.sundial
import rpki.publication
@@ -35,700 +44,634 @@ logger = logging.getLogger(__name__)
task_classes = ()
def queue_task(cls):
- """
- Class decorator to add a new task class to task_classes.
- """
-
- global task_classes
- task_classes += (cls,)
- return cls
-
-
-class CompletionHandler(object):
- """
- Track one or more scheduled rpkid tasks and execute a callback when
- the last of them terminates.
- """
-
- ## @var debug
- # Debug logging.
-
- debug = False
-
- def __init__(self, cb):
- self.cb = cb
- self.tasks = set()
-
- def register(self, task):
- if self.debug:
- logger.debug("Completion handler %r registering task %r", self, task)
- self.tasks.add(task)
- task.register_completion(self.done)
-
- def done(self, task):
- try:
- self.tasks.remove(task)
- except KeyError:
- logger.warning("Completion handler %r called with unregistered task %r, blundering onwards", self, task)
- else:
- if self.debug:
- logger.debug("Completion handler %r called with registered task %r", self, task)
- if not self.tasks:
- if self.debug:
- logger.debug("Completion handler %r finished, calling %r", self, self.cb)
- self.cb()
-
- @property
- def count(self):
- return len(self.tasks)
+ """
+ Class decorator to add a new task class to task_classes.
+ """
+
+ global task_classes # pylint: disable=W0603
+ task_classes += (cls,)
+ return cls
+
+
+class PostponeTask(Exception):
+ """
+ Exit a task without finishing it. We use this to signal that a
+ long-running task wants to yield to the task loop but hasn't yet
+ run to completion.
+ """
class AbstractTask(object):
- """
- Abstract base class for rpkid scheduler task objects. This just
- handles the scheduler hooks, real work starts in self.start.
-
- NB: This assumes that the rpki.rpkid.rpkid.task_* methods have been
- rewritten to expect instances of subclasses of this class, rather
- than expecting thunks to be wrapped up in the older version of this
- class. Rewrite, rewrite, remove this comment when done, OK!
- """
-
- ## @var timeslice
- # How long before a task really should consider yielding the CPU to
- # let something else run.
-
- timeslice = rpki.sundial.timedelta(seconds = 15)
-
- def __init__(self, s, description = None):
- self.self = s
- self.description = description
- self.completions = []
- self.continuation = None
- self.due_date = None
- self.clear()
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.description)
-
- def register_completion(self, completion):
- self.completions.append(completion)
-
- def exit(self):
- self.self.gctx.sql.sweep()
- while self.completions:
- self.completions.pop(0)(self)
- self.clear()
- self.due_date = None
- self.self.gctx.task_next()
-
- def postpone(self, continuation):
- self.self.gctx.sql.sweep()
- self.continuation = continuation
- self.due_date = None
- self.self.gctx.task_add(self)
- self.self.gctx.task_next()
-
- def __call__(self):
- self.due_date = rpki.sundial.now() + self.timeslice
- if self.continuation is None:
- logger.debug("Running task %r", self)
- self.clear()
- self.start()
- else:
- logger.debug("Restarting task %r at %r", self, self.continuation)
- continuation = self.continuation
- self.continuation = None
- continuation()
-
- @property
- def overdue(self):
- return rpki.sundial.now() > self.due_date
-
- def __getattr__(self, name):
- return getattr(self.self, name)
-
- def start(self):
- raise NotImplementedError
-
- def clear(self):
- pass
+ """
+ Abstract base class for rpkid scheduler task objects.
+ """
+
+ ## @var timeslice
+ # How long before a task really should consider yielding the CPU
+ # to let something else run. Should this be something we can
+ # configure from rpki.conf?
+
+ #timeslice = rpki.sundial.timedelta(seconds = 15)
+ timeslice = rpki.sundial.timedelta(seconds = 120)
+
+ def __init__(self, rpkid, tenant, description = None):
+ self.rpkid = rpkid
+ self.tenant = tenant
+ self.description = description
+ self.done_this = None
+ self.done_next = None
+ self.due_date = None
+ self.started = False
+ self.postponed = False
+ self.clear()
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.description)
+
+ @tornado.gen.coroutine
+ def start(self):
+ try:
+ logger.debug("%r: Starting", self)
+ self.due_date = rpki.sundial.now() + self.timeslice
+ self.clear()
+ self.started = True
+ self.postponed = False
+ yield self.main()
+ except PostponeTask:
+ self.postponed = True
+ except:
+ logger.exception("%r: Unhandled exception", self)
+ finally:
+ self.due_date = None
+ self.started = False
+ self.clear()
+ if self.postponed:
+ logger.debug("%r: Postponing", self)
+ self.rpkid.task_add(self)
+ else:
+ logger.debug("%r: Exiting", self)
+ if self.done_this is not None:
+ self.done_this.notify_all()
+ self.done_this = self.done_next
+ self.done_next = None
+
+ def wait(self):
+ done = "done_next" if self.started else "done_this"
+ condition = getattr(self, done)
+ if condition is None:
+ condition = tornado.locks.Condition()
+ setattr(self, done, condition)
+ future = condition.wait()
+ return future
+
+ def waiting(self):
+ return self.done_this is not None
+
+ @tornado.gen.coroutine
+ def overdue(self):
+ yield tornado.gen.moment
+ raise tornado.gen.Return(rpki.sundial.now() > self.due_date and
+ any(not task.postponed for task in self.rpkid.task_ready))
+
+ @tornado.gen.coroutine
+ def main(self):
+ raise NotImplementedError
+
+ def clear(self):
+ pass
@queue_task
class PollParentTask(AbstractTask):
- """
- Run the regular client poll cycle with each of this self's
- parents, in turn.
- """
-
- def clear(self):
- self.parent_iterator = None
- self.parent = None
- self.ca_map = None
- self.class_iterator = None
-
- def start(self):
- self.gctx.checkpoint()
- logger.debug("Self %s[%d] polling parents", self.self_handle, self.self_id)
- rpki.async.iterator(self.parents, self.parent_loop, self.exit)
-
- def parent_loop(self, parent_iterator, parent):
- self.parent_iterator = parent_iterator
- self.parent = parent
- rpki.up_down.list_pdu.query(parent, self.got_list, self.list_failed)
-
- def got_list(self, r_msg):
- self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas)
- self.gctx.checkpoint()
- rpki.async.iterator(r_msg.payload.classes, self.class_loop, self.class_done)
-
- def list_failed(self, e):
- logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent)
- self.parent_iterator()
-
- def class_loop(self, class_iterator, rc):
- self.gctx.checkpoint()
- self.class_iterator = class_iterator
- try:
- ca = self.ca_map.pop(rc.class_name)
- except KeyError:
- rpki.rpkid.ca_obj.create(self.parent, rc, class_iterator, self.class_create_failed)
- else:
- ca.check_for_updates(self.parent, rc, class_iterator, self.class_update_failed)
-
- def class_update_failed(self, e):
- logger.exception("Couldn't update class, skipping")
- self.class_iterator()
-
- def class_create_failed(self, e):
- logger.exception("Couldn't create class, skipping")
- self.class_iterator()
-
- def class_done(self):
- rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done)
-
- def ca_loop(self, iterator, ca):
- self.gctx.checkpoint()
- ca.delete(self.parent, iterator)
-
- def ca_done(self):
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- self.parent_iterator()
+ """
+ Run the regular client poll cycle with each of this tenant's
+ parents, in turn.
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Polling parents", self)
+
+ for parent in rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant):
+ try:
+ logger.debug("%r: Executing list query", self)
+ list_r_msg = yield parent.up_down_list_query(rpkid = self.rpkid)
+ except:
+ logger.exception("%r: Couldn't get resource class list from %r, skipping", self, parent)
+ continue
+
+ logger.debug("%r: Parsing list response", self)
+
+ ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas.all())
+
+ for rc in list_r_msg.getiterator(rpki.up_down.tag_class):
+ try:
+ class_name = rc.get("class_name")
+ ca = ca_map.pop(class_name, None)
+ if ca is None:
+ yield self.create(parent = parent, rc = rc, class_name = class_name)
+ else:
+ yield self.update(parent = parent, rc = rc, class_name = class_name, ca = ca)
+ except:
+ logger.exception("Couldn't update resource class %r, skipping", class_name)
+
+ for class_name, ca in ca_map.iteritems():
+ logger.debug("%r: Destroying orphaned %r for resource class %r", self, ca, class_name)
+ yield ca.destroy(rpkid = self.rpkid, parent = parent)
+
+ @tornado.gen.coroutine
+ def create(self, parent, rc, class_name):
+ logger.debug("%r: Creating new CA for resource class %r", self, class_name)
+ ca = rpki.rpkidb.models.CA.objects.create(
+ parent = parent,
+ parent_resource_class = class_name,
+ sia_uri = parent.construct_sia_uri(rc))
+ ca_detail = ca.create_detail()
+ r_msg = yield parent.up_down_issue_query(rpkid = self.rpkid, ca = ca, ca_detail = ca_detail)
+ elt = r_msg.find(rpki.up_down.tag_class).find(rpki.up_down.tag_certificate)
+ uri = elt.get("cert_url")
+ cert = rpki.x509.X509(Base64 = elt.text)
+ logger.debug("%r: %r received certificate %s", self, ca, uri)
+ yield ca_detail.activate(rpkid = self.rpkid, ca = ca, cert = cert, uri = uri)
+
+ @tornado.gen.coroutine
+ def update(self, parent, rc, class_name, ca):
+
+ # pylint: disable=C0330
+
+ logger.debug("%r: Checking updates for %r", self, ca)
+
+ sia_uri = parent.construct_sia_uri(rc)
+ sia_uri_changed = ca.sia_uri != sia_uri
+
+ if sia_uri_changed:
+ logger.debug("SIA changed: was %s now %s", ca.sia_uri, sia_uri)
+ ca.sia_uri = sia_uri
+
+ rc_resources = rpki.resource_set.resource_bag(
+ asn = rc.get("resource_set_as"),
+ v4 = rc.get("resource_set_ipv4"),
+ v6 = rc.get("resource_set_ipv6"),
+ valid_until = rc.get("resource_set_notafter"))
+
+ cert_map = {}
+
+ for c in rc.getiterator(rpki.up_down.tag_certificate):
+ x = rpki.x509.X509(Base64 = c.text)
+ u = rpki.up_down.multi_uri(c.get("cert_url")).rsync()
+ cert_map[x.gSKI()] = (x, u)
+
+ ca_details = ca.ca_details.exclude(state = "revoked")
+
+ if not ca_details:
+ logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying",
+ class_name, parent.tenant.tenant_handle, parent.parent_handle)
+ yield ca.rekey(rpkid = self.rpkid)
+ return
+
+ for ca_detail in ca_details:
+
+ rc_cert, rc_cert_uri = cert_map.pop(ca_detail.public_key.gSKI(), (None, None))
+
+ if rc_cert is None:
+ logger.warning("g(SKI) %s in resource class %s is in database but missing from list_response to %s from %s, "
+ "maybe parent certificate went away?",
+ ca_detail.public_key.gSKI(), class_name, parent.tenant.tenant_handle, parent.parent_handle)
+ publisher = rpki.rpkid.publication_queue(rpkid = self.rpkid)
+ ca_detail.destroy(publisher = publisher)
+ yield publisher.call_pubd()
+ continue
+
+ if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert_uri:
+ logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert_uri)
+ ca_detail.ca_cert_uri = rc_cert_uri
+ ca_detail.save()
+
+ if ca_detail.state not in ("pending", "active"):
+ continue
+
+ if ca_detail.state == "pending":
+ current_resources = rpki.resource_set.resource_bag()
+ else:
+ current_resources = ca_detail.latest_ca_cert.get_3779resources()
+
+ if (ca_detail.state == "pending" or
+ sia_uri_changed or
+ ca_detail.latest_ca_cert != rc_cert or
+ ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or
+ current_resources.undersized(rc_resources) or
+ current_resources.oversized(rc_resources)):
+
+ yield ca_detail.update(
+ rpkid = self.rpkid,
+ parent = parent,
+ ca = ca,
+ rc = rc,
+ sia_uri_changed = sia_uri_changed,
+ old_resources = current_resources)
+
+ if cert_map:
+ logger.warning("Unknown certificate g(SKI)%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?",
+ "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle)
@queue_task
class UpdateChildrenTask(AbstractTask):
- """
- Check for updated IRDB data for all of this self's children and
- issue new certs as necessary. Must handle changes both in
- resources and in expiration date.
- """
-
- def clear(self):
- self.now = None
- self.rsn = None
- self.publisher = None
- self.iterator = None
- self.child = None
- self.child_certs = None
-
- def start(self):
- self.gctx.checkpoint()
- logger.debug("Self %s[%d] updating children", self.self_handle, self.self_id)
- self.now = rpki.sundial.now()
- self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin)
- self.publisher = rpki.rpkid.publication_queue()
- rpki.async.iterator(self.children, self.loop, self.done)
-
- def loop(self, iterator, child):
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- self.iterator = iterator
- self.child = child
- self.child_certs = child.child_certs
- if self.overdue:
- self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed)
- else:
- self.do_child()
-
- def do_child(self):
- if self.child_certs:
- self.gctx.irdb_query_child_resources(self.child.self.self_handle, self.child.child_handle,
- self.got_resources, self.lose)
- else:
- self.iterator()
-
- def lose(self, e):
- logger.exception("Couldn't update child %r, skipping", self.child)
- self.iterator()
-
- def got_resources(self, irdb_resources):
- try:
- for child_cert in self.child_certs:
- ca_detail = child_cert.ca_detail
- ca = ca_detail.ca
- if ca_detail.state == "active":
- old_resources = child_cert.cert.get_3779resources()
- new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
- old_aia = child_cert.cert.get_AIA()[0]
- new_aia = ca_detail.ca_cert_uri
-
- if new_resources.empty():
- logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s",
- self.child.child_handle, child_cert.cert.gSKI())
- child_cert.revoke(publisher = self.publisher)
- ca_detail.generate_crl(publisher = self.publisher)
- ca_detail.generate_manifest(publisher = self.publisher)
-
- elif (old_resources != new_resources or
- old_aia != new_aia or
- (old_resources.valid_until < self.rsn and
- irdb_resources.valid_until > self.now and
- old_resources.valid_until != irdb_resources.valid_until)):
-
- logger.debug("Need to reissue child %s certificate SKI %s",
- self.child.child_handle, child_cert.cert.gSKI())
- if old_resources != new_resources:
- logger.debug("Child %s SKI %s resources changed: old %s new %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources, new_resources)
- if old_resources.valid_until != irdb_resources.valid_until:
- logger.debug("Child %s SKI %s validity changed: old %s new %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources.valid_until, irdb_resources.valid_until)
-
- new_resources.valid_until = irdb_resources.valid_until
- child_cert.reissue(
- ca_detail = ca_detail,
- resources = new_resources,
- publisher = self.publisher)
-
- elif old_resources.valid_until < self.now:
- logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources.valid_until, irdb_resources.valid_until)
- child_cert.sql_delete()
- self.publisher.withdraw(
- cls = rpki.publication.certificate_elt,
- uri = child_cert.uri,
- obj = child_cert.cert,
- repository = ca.parent.repository)
- ca_detail.generate_manifest(publisher = self.publisher)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- self.gctx.checkpoint()
- self.lose(e)
- else:
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- self.iterator()
-
- def done(self):
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- self.publisher.call_pubd(self.exit, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
+ """
+ Check for updated IRDB data for all of this tenant's children and
+ issue new certs as necessary. Must handle changes both in
+ resources and in expiration date.
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Updating children", self)
+ now = rpki.sundial.now()
+ rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin)
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ postponing = False
+
+ child_certs = rpki.rpkidb.models.ChildCert.objects.filter(child__tenant = self.tenant, ca_detail__state = "active")
+ child_handles = sorted(set(child_cert.child.child_handle for child_cert in child_certs))
+ irdb_resources = dict(zip(child_handles, (yield self.rpkid.irdb_query_children_resources(self.tenant.tenant_handle, child_handles))))
+
+ for child_cert in child_certs:
+ try:
+ ca_detail = child_cert.ca_detail
+ child_handle = child_cert.child.child_handle
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = old_resources & irdb_resources[child_handle] & ca_detail.latest_ca_cert.get_3779resources()
+ old_aia = child_cert.cert.get_AIA()[0]
+ new_aia = ca_detail.ca_cert_uri
+
+ assert child_cert.gski == child_cert.cert.gSKI()
+
+ if new_resources.empty():
+ logger.debug("Resources shrank to null set, revoking and withdrawing child %s g(SKI) %s",
+ child_handle, child_cert.gski)
+ child_cert.revoke(publisher = publisher)
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+
+ elif (old_resources != new_resources or old_aia != new_aia or
+ (old_resources.valid_until < rsn and
+ irdb_resources[child_handle].valid_until > now and
+ old_resources.valid_until != irdb_resources[child_handle].valid_until)):
+ logger.debug("Need to reissue child %s certificate g(SKI) %s", child_handle,
+ child_cert.gski)
+ if old_resources != new_resources:
+ logger.debug("Child %s g(SKI) %s resources changed: old %s new %s",
+ child_handle, child_cert.gski, old_resources, new_resources)
+ if old_resources.valid_until != irdb_resources[child_handle].valid_until:
+ logger.debug("Child %s g(SKI) %s validity changed: old %s new %s",
+ child_handle, child_cert.gski, old_resources.valid_until,
+ irdb_resources[child_handle].valid_until)
+
+ new_resources.valid_until = irdb_resources[child_handle].valid_until
+ child_cert.reissue(ca_detail = ca_detail, resources = new_resources, publisher = publisher)
+
+ elif old_resources.valid_until < now:
+ logger.debug("Child %s certificate g(SKI) %s has expired: cert.valid_until %s, irdb.valid_until %s",
+ child_handle, child_cert.gski, old_resources.valid_until,
+ irdb_resources[child_handle].valid_until)
+ child_cert.delete()
+ publisher.queue(uri = child_cert.uri,
+ old_obj = child_cert.cert,
+ repository = ca_detail.ca.parent.repository)
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+
+ except:
+ logger.exception("%r: Couldn't update %r, skipping", self, child_cert)
+
+ finally:
+ if (yield self.overdue()):
+ postponing = True
+ break
+ try:
+ yield publisher.call_pubd()
+ except:
+ logger.exception("%r: Couldn't publish, skipping", self)
-@queue_task
-class UpdateROAsTask(AbstractTask):
- """
- Generate or update ROAs for this self.
- """
-
- def clear(self):
- self.orphans = None
- self.updates = None
- self.publisher = None
- self.ca_details = None
- self.count = None
-
- def start(self):
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- logger.debug("Self %s[%d] updating ROAs", self.self_handle, self.self_id)
-
- logger.debug("Issuing query for ROA requests")
- self.gctx.irdb_query_roa_requests(self.self_handle, self.got_roa_requests, self.roa_requests_failed)
-
- def got_roa_requests(self, roa_requests):
- self.gctx.checkpoint()
- logger.debug("Received response to query for ROA requests")
-
- if self.gctx.sql.dirty:
- logger.warning("Unexpected dirty SQL cache, flushing")
- self.gctx.sql.sweep()
-
- roas = {}
- seen = set()
- self.orphans = []
- self.updates = []
- self.publisher = rpki.rpkid.publication_queue()
- self.ca_details = set()
-
- for roa in self.roas:
- k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
- if k not in roas:
- roas[k] = roa
- elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and
- (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")):
- self.orphans.append(roas[k])
- roas[k] = roa
- else:
- self.orphans.append(roa)
-
- for roa_request in roa_requests:
- k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
- if k in seen:
- logger.warning("Skipping duplicate ROA request %r", roa_request)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- logger.debug("Created new %r", roa)
- else:
- logger.debug("Found existing %r", roa)
- self.updates.append(roa)
-
- self.orphans.extend(roas.itervalues())
-
- if self.overdue:
- self.postpone(self.begin_loop)
- else:
- self.begin_loop()
-
- def begin_loop(self):
- self.count = 0
- rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True)
-
- def loop(self, iterator, roa):
- self.gctx.checkpoint()
- try:
- roa.update(publisher = self.publisher, fast = True)
- self.ca_details.add(roa.ca_detail)
- self.gctx.sql.sweep()
- except (SystemExit, rpki.async.ExitNow):
- raise
- except rpki.exceptions.NoCoveringCertForROA:
- logger.warning("No covering certificate for %r, skipping", roa)
- except Exception:
- logger.exception("Could not update %r, skipping", roa)
- self.count += 1
- if self.overdue:
- self.publish(lambda: self.postpone(iterator))
- else:
- iterator()
-
- def publish(self, done):
- if not self.publisher.empty():
- for ca_detail in self.ca_details:
- logger.debug("Generating new CRL for %r", ca_detail)
- ca_detail.generate_crl(publisher = self.publisher)
- logger.debug("Generating new manifest for %r", ca_detail)
- ca_detail.generate_manifest(publisher = self.publisher)
- self.ca_details.clear()
- self.gctx.sql.sweep()
- self.gctx.checkpoint()
- self.publisher.call_pubd(done, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
-
- def done(self):
- for roa in self.orphans:
- try:
- self.ca_details.add(roa.ca_detail)
- roa.revoke(publisher = self.publisher, fast = True)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not revoke %r", roa)
- self.gctx.sql.sweep()
- self.gctx.checkpoint()
- self.publish(self.exit)
-
- def roa_requests_failed(self, e):
- logger.exception("Could not fetch ROA requests for %s, skipping", self.self_handle)
- self.exit()
+ if postponing:
+ raise PostponeTask
@queue_task
-class UpdateGhostbustersTask(AbstractTask):
- """
- Generate or update Ghostbuster records for this self.
-
- This was originally based on the ROA update code. It's possible
- that both could benefit from refactoring, but at this point the
- potential scaling issues for ROAs completely dominate structure of
- the ROA code, and aren't relevant here unless someone is being
- exceptionally silly.
- """
-
- def start(self):
- self.gctx.checkpoint()
- logger.debug("Self %s[%d] updating Ghostbuster records",
- self.self_handle, self.self_id)
-
- self.gctx.irdb_query_ghostbuster_requests(self.self_handle,
- (p.parent_handle for p in self.parents),
- self.got_ghostbuster_requests,
- self.ghostbuster_requests_failed)
-
- def got_ghostbuster_requests(self, ghostbuster_requests):
-
- try:
- self.gctx.checkpoint()
- if self.gctx.sql.dirty:
- logger.warning("Unexpected dirty SQL cache, flushing")
- self.gctx.sql.sweep()
-
- ghostbusters = {}
- orphans = []
- publisher = rpki.rpkid.publication_queue()
- ca_details = set()
- seen = set()
-
- parents = dict((p.parent_handle, p) for p in self.parents)
-
- for ghostbuster in self.ghostbusters:
- k = (ghostbuster.ca_detail_id, ghostbuster.vcard)
- if ghostbuster.ca_detail.state != "active" or k in ghostbusters:
- orphans.append(ghostbuster)
- else:
- ghostbusters[k] = ghostbuster
-
- for ghostbuster_request in ghostbuster_requests:
- if ghostbuster_request.parent_handle not in parents:
- logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", ghostbuster_request.parent_handle)
- continue
- k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard)
- if k in seen:
- logger.warning("Skipping duplicate Ghostbuster request %r", ghostbuster_request)
- continue
- seen.add(k)
- for ca in parents[ghostbuster_request.parent_handle].cas:
- ca_detail = ca.active_ca_detail
- if ca_detail is not None:
- ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None)
- if ghostbuster is None:
- ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard)
- logger.debug("Created new %r for %r", ghostbuster, ghostbuster_request.parent_handle)
+class UpdateROAsTask(AbstractTask):
+ """
+ Generate or update ROAs for this tenant.
+ """
+
+ # XXX This might need rewriting to avoid race conditions.
+ #
+ # There's a theoretical race condition here if we're chugging away
+ # and something else needs to update the manifest or CRL, or if
+ # some back-end operation generates or destroys ROAs. The risk is
+ # fairly low given that we defer CRL and manifest generation until
+ # we're ready to publish, but it's theoretically present.
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Updating ROAs", self)
+
+ try:
+ r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
+ except:
+ logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
+ return
+
+ logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
+
+ roas = {}
+ seen = set()
+ orphans = []
+ creates = []
+ updates = []
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ ca_details = set()
+
+ for roa in self.tenant.roas.all():
+ k = "{!s} {!s} {!s}".format(roa.asn, roa.ipv4, roa.ipv6)
+ if k not in roas:
+ roas[k] = roa
+ elif roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active"):
+ orphans.append(roas[k])
+ roas[k] = roa
+ else:
+ orphans.append(roa)
+
+ for r_pdu in r_msg:
+ k = "{!s} {!s} {!s}".format(r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
+ if k in seen:
+ logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
+ continue
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
+ logger.debug("%r: Try to create %r", self, roa)
+ creates.append(roa)
else:
- logger.debug("Found existing %r for %s", ghostbuster, ghostbuster_request.parent_handle)
- ghostbuster.update(publisher = publisher, fast = True)
- ca_details.add(ca_detail)
+ logger.debug("%r: Found existing %r", self, roa)
+ updates.append(roa)
+
+ orphans.extend(roas.itervalues())
+
+ roas = creates + updates
+
+ r_msg = seen = creates = updates = None
- orphans.extend(ghostbusters.itervalues())
- for ghostbuster in orphans:
- ca_details.add(ghostbuster.ca_detail)
- ghostbuster.revoke(publisher = publisher, fast = True)
+ postponing = False
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
+ while roas and not postponing:
+ if (yield self.overdue()):
+ postponing = True
+ break
+ roa = roas.pop(0)
+ try:
+ roa.update(publisher = publisher)
+ ca_details.add(roa.ca_detail)
+ except rpki.exceptions.NoCoveringCertForROA:
+ logger.warning("%r: No covering certificate for %r, skipping", self, roa)
+ except:
+ logger.exception("%r: Could not update %r, skipping", self, roa)
- self.gctx.sql.sweep()
+ if not postponing:
+ for roa in orphans:
+ try:
+ ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = publisher)
+ except:
+ logger.exception("%r: Could not revoke %r", self, roa)
- self.gctx.checkpoint()
- publisher.call_pubd(self.exit, self.publication_failed)
+ if not publisher.empty():
+ for ca_detail in ca_details:
+ logger.debug("%r: Generating new CRL and manifest for %r", self, ca_detail)
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+ yield publisher.call_pubd()
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not update Ghostbuster records for %s, skipping", self.self_handle)
- self.exit()
+ if postponing:
+ raise PostponeTask
- def publication_failed(self, e):
- logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
- def ghostbuster_requests_failed(self, e):
- logger.exception("Could not fetch Ghostbuster record requests for %s, skipping", self.self_handle)
- self.exit()
+@queue_task
+class UpdateGhostbustersTask(AbstractTask):
+ """
+ Generate or update Ghostbuster records for this tenant.
+
+ This was originally based on the ROA update code. It's possible
+ that both could benefit from refactoring, but at this point the
+ potential scaling issues for ROAs completely dominate structure of
+ the ROA code, and aren't relevant here unless someone is being
+ exceptionally silly.
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Updating Ghostbuster records", self)
+ parent_handles = set(p.parent_handle for p in rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant))
+
+ try:
+ r_msg = yield self.rpkid.irdb_query_ghostbuster_requests(self.tenant.tenant_handle, parent_handles)
+
+ ghostbusters = {}
+ orphans = []
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ ca_details = set()
+ seen = set()
+
+ for ghostbuster in self.tenant.ghostbusters.all():
+ k = (ghostbuster.ca_detail.pk, ghostbuster.vcard)
+ if ghostbuster.ca_detail.state != "active" or k in ghostbusters:
+ orphans.append(ghostbuster)
+ else:
+ ghostbusters[k] = ghostbuster
+
+ for r_pdu in r_msg:
+ if not rpki.rpkidb.models.Parent.objects.filter(tenant = self.tenant, parent_handle = r_pdu.get("parent_handle")).exists():
+ logger.warning("%r: Unknown parent_handle %r in Ghostbuster request, skipping", self, r_pdu.get("parent_handle"))
+ continue
+ k = (r_pdu.get("parent_handle"), r_pdu.text)
+ if k in seen:
+ logger.warning("%r: Skipping duplicate Ghostbuster request %r", self, r_pdu)
+ continue
+ seen.add(k)
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"),
+ ca__parent__tenant = self.tenant,
+ state = "active"):
+ ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None)
+ if ghostbuster is None:
+ ghostbuster = rpki.rpkidb.models.Ghostbuster(tenant = self.tenant, ca_detail = ca_detail, vcard = r_pdu.text)
+ logger.debug("%r: Created new %r for %r", self, ghostbuster, r_pdu.get("parent_handle"))
+ else:
+ logger.debug("%r: Found existing %r for %r", self, ghostbuster, r_pdu.get("parent_handle"))
+ ghostbuster.update(publisher = publisher)
+ ca_details.add(ca_detail)
+
+ orphans.extend(ghostbusters.itervalues())
+ for ghostbuster in orphans:
+ ca_details.add(ghostbuster.ca_detail)
+ ghostbuster.revoke(publisher = publisher)
+
+ for ca_detail in ca_details:
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle)
@queue_task
class UpdateEECertificatesTask(AbstractTask):
- """
- Generate or update EE certificates for this self.
-
- Not yet sure what kind of scaling constraints this task might have,
- so keeping it simple for initial version, we can optimize later.
- """
-
- def start(self):
- self.gctx.checkpoint()
- logger.debug("Self %s[%d] updating EE certificates", self.self_handle, self.self_id)
-
- self.gctx.irdb_query_ee_certificate_requests(self.self_handle,
- self.got_requests,
- self.get_requests_failed)
-
- def got_requests(self, requests):
-
- try:
- self.gctx.checkpoint()
- if self.gctx.sql.dirty:
- logger.warning("Unexpected dirty SQL cache, flushing")
- self.gctx.sql.sweep()
-
- publisher = rpki.rpkid.publication_queue()
-
- existing = dict()
- for ee in self.ee_certificates:
- gski = ee.gski
- if gski not in existing:
- existing[gski] = set()
- existing[gski].add(ee)
-
- ca_details = set()
-
- for req in requests:
- ees = existing.pop(req.gski, ())
- resources = rpki.resource_set.resource_bag(
- asn = req.asn,
- v4 = req.ipv4,
- v6 = req.ipv6,
- valid_until = req.valid_until)
- covering = self.find_covering_ca_details(resources)
- ca_details.update(covering)
-
- for ee in ees:
- if ee.ca_detail in covering:
- logger.debug("Updating existing EE certificate for %s %s",
- req.gski, resources)
- ee.reissue(
- resources = resources,
- publisher = publisher)
- covering.remove(ee.ca_detail)
- else:
- logger.debug("Existing EE certificate for %s %s is no longer covered",
- req.gski, resources)
- ee.revoke(publisher = publisher)
-
- for ca_detail in covering:
- logger.debug("No existing EE certificate for %s %s",
- req.gski, resources)
- rpki.rpkid.ee_cert_obj.create(
- ca_detail = ca_detail,
- subject_name = rpki.x509.X501DN.from_cn(req.cn, req.sn),
- subject_key = req.pkcs10.getPublicKey(),
- resources = resources,
- publisher = publisher,
- eku = req.eku or None)
-
- # Anything left is an orphan
- for ees in existing.values():
- for ee in ees:
- ca_details.add(ee.ca_detail)
- ee.revoke(publisher = publisher)
-
- self.gctx.sql.sweep()
-
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
-
- self.gctx.sql.sweep()
-
- self.gctx.checkpoint()
- publisher.call_pubd(self.exit, self.publication_failed)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not update EE certificates for %s, skipping", self.self_handle)
- self.exit()
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
-
- def get_requests_failed(self, e):
- logger.exception("Could not fetch EE certificate requests for %s, skipping", self.self_handle)
- self.exit()
+ """
+ Generate or update EE certificates for this tenant.
+
+ Not yet sure what kind of scaling constraints this task might have,
+ so keeping it simple for initial version, we can optimize later.
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Updating EE certificates", self)
+
+ try:
+ r_msg = yield self.rpkid.irdb_query_ee_certificate_requests(self.tenant.tenant_handle)
+
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+
+ logger.debug("%r: Examining EE certificate requests", self)
+
+ existing = dict()
+ for ee in self.tenant.ee_certificates.all():
+ gski = ee.gski
+ if gski not in existing:
+ existing[gski] = set()
+ existing[gski].add(ee)
+
+ ca_details = set()
+
+ for r_pdu in r_msg:
+ gski = r_pdu.get("gski")
+ ees = existing.pop(gski, ())
+
+ resources = rpki.resource_set.resource_bag(
+ asn = r_pdu.get("asn"),
+ v4 = r_pdu.get("ipv4"),
+ v6 = r_pdu.get("ipv6"),
+ valid_until = r_pdu.get("valid_until"))
+ covering = self.tenant.find_covering_ca_details(resources)
+ ca_details.update(covering)
+
+ for ee in ees:
+ if ee.ca_detail in covering:
+ logger.debug("%r: Updating %r for %s %s", self, ee, gski, resources)
+ ee.reissue(resources = resources, publisher = publisher)
+ covering.remove(ee.ca_detail)
+ else:
+ # This probably never happens, as the most likely cause would be a CA certificate
+ # being revoked, which should trigger automatic clean up of issued certificates.
+ logger.debug("%r: %r for %s %s is no longer covered", self, ee, gski, resources)
+ ca_details.add(ee.ca_detail)
+ ee.revoke(publisher = publisher)
+
+ subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn"))
+ subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey()
+
+ for ca_detail in covering:
+ logger.debug("%r: No existing EE certificate for %s %s", self, gski, resources)
+ cn, sn = subject_name.extract_cn_and_sn()
+ cert = ca_detail.issue_ee(
+ ca = ca_detail.ca,
+ subject_key = subject_key,
+ sia = None,
+ resources = resources,
+ notAfter = resources.valid_until,
+ cn = cn,
+ sn = sn,
+ eku = r_pdu.get("eku", "").split(",") or None)
+ ee = rpki.rpkidb.models.EECertificate.objects.create(
+ tenant = ca_detail.ca.parent.tenant,
+ ca_detail = ca_detail,
+ cert = cert,
+ gski = subject_key.gSKI())
+ publisher.queue(
+ uri = ee.uri,
+ new_obj = cert,
+ repository = ca_detail.ca.parent.repository,
+ handler = ee.published_callback)
+
+ # Anything left is an orphan
+ for ees in existing.values():
+ for ee in ees:
+ ca_details.add(ee.ca_detail)
+ ee.revoke(publisher = publisher)
+
+ for ca_detail in ca_details:
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("%r: Could not update EE certificates, skipping", self)
@queue_task
class RegenerateCRLsAndManifestsTask(AbstractTask):
- """
- Generate new CRLs and manifests as necessary for all of this self's
- CAs. Extracting nextUpdate from a manifest is hard at the moment
- due to implementation silliness, so for now we generate a new
- manifest whenever we generate a new CRL
-
- This code also cleans up tombstones left behind by revoked ca_detail
- objects, since we're walking through the relevant portions of the
- database anyway.
- """
-
- def start(self):
- self.gctx.checkpoint()
- logger.debug("Self %s[%d] regenerating CRLs and manifests",
- self.self_handle, self.self_id)
-
- now = rpki.sundial.now()
- crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval)
- regen_margin = max(self.gctx.cron_period * 2, crl_interval / 4)
- publisher = rpki.rpkid.publication_queue()
-
- for parent in self.parents:
- for ca in parent.cas:
+ """
+ Generate new CRLs and manifests as necessary for all of this tenant's
+ CAs. Extracting nextUpdate from a manifest is hard at the moment
+ due to implementation silliness, so for now we generate a new
+ manifest whenever we generate a new CRL
+
+ This code also cleans up tombstones left behind by revoked ca_detail
+ objects, since we're walking through the relevant portions of the
+ database anyway.
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Regenerating CRLs and manifests", self)
+
try:
- for ca_detail in ca.revoked_ca_details:
- if now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.delete(ca = ca, publisher = publisher)
- for ca_detail in ca.active_or_deprecated_ca_details:
- if now + regen_margin > ca_detail.latest_crl.getNextUpdate():
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca)
-
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- publisher.call_pubd(self.exit, self.lose)
-
- def lose(self, e):
- logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ now = rpki.sundial.now()
+
+ ca_details = rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant,
+ next_crl_manifest_update__isnull = False)
+
+ for ca_detail in ca_details.filter(next_crl_manifest_update__lt = now,
+ state = "revoked"):
+ ca_detail.destroy(publisher = publisher)
+
+ for ca_detail in ca_details.filter(state__in = ("active", "deprecated"),
+ next_crl_manifest_update__lt = now + max(
+ rpki.sundial.timedelta(seconds = self.tenant.crl_interval) / 4,
+ rpki.sundial.timedelta(seconds = self.rpkid.cron_period ) * 2)):
+ ca_detail.generate_crl_and_manifest(publisher = publisher)
+
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self)
@queue_task
class CheckFailedPublication(AbstractTask):
- """
- Periodic check for objects we tried to publish but failed (eg, due
- to pubd being down or unreachable).
- """
-
- def start(self):
- publisher = rpki.rpkid.publication_queue()
- for parent in self.parents:
- for ca in parent.cas:
- ca_detail = ca.active_ca_detail
- if ca_detail is not None:
- ca_detail.check_failed_publication(publisher)
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- publisher.call_pubd(self.exit, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.self_handle)
- self.gctx.checkpoint()
- self.exit()
+ """
+ Periodic check for objects we tried to publish but failed (eg, due
+ to pubd being down or unreachable).
+ """
+
+ @tornado.gen.coroutine
+ def main(self):
+ logger.debug("%r: Checking for failed publication actions", self)
+
+ try:
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"):
+ ca_detail.check_failed_publication(publisher)
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("%r: Couldn't run failed publications, skipping", self)