aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpkid/rpki/left_right.py462
-rw-r--r--rpkid/rpki/rpkid.py83
-rw-r--r--rpkid/rpki/rpkid_tasks.py84
-rw-r--r--rpkid/tests/yamltest.py2
4 files changed, 107 insertions, 524 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 6a176c15..c6089db4 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -34,22 +34,13 @@ PERFORMANCE OF THIS SOFTWARE.
import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils
import rpki.http, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa
-import rpki.publication, rpki.async
+import rpki.publication, rpki.async, rpki.rpkid_tasks
## @var enforce_strict_up_down_xml_sender
# Enforce strict checking of XML "sender" field in up-down protocol
enforce_strict_up_down_xml_sender = False
-## @var max_new_roas_at_once
-# Upper limit on the number of ROAs we'll create in a single
-# self_elt.update_roas() call. This is a bit of a kludge, and may be
-# replaced with something more clever or general later; for the moment
-# the goal is to avoid going totally compute bound when somebody
-# throws 50,000 new ROA requests at us in a single batch.
-
-max_new_roas_at_once = 200
-
class left_right_namespace(object):
"""
XML namespace parameters for left-right protocol.
@@ -159,6 +150,7 @@ class self_elt(data_elt):
regen_margin = None
bpki_cert = None
bpki_glue = None
+ cron_tasks = None
@property
def bscs(self):
@@ -326,10 +318,10 @@ class self_elt(data_elt):
"""
rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % (
self.self_handle, self.self_id))
- if self.gctx.task_add(self.cron, cb):
- self.gctx.task_run()
- else:
- cb()
+ completion = rpki.rpkid_tasks.CompletionHandler(cb)
+ self.schedule_cron_tasks(completion)
+ assert completion.count > 0
+ self.gctx.task_run()
def serve_fetch_one_maybe(self):
"""
@@ -353,440 +345,22 @@ class self_elt(data_elt):
"""
return self.sql_fetch_all(self.gctx)
- def cron(self, cb):
+ def schedule_cron_tasks(self, completion):
"""
- Periodic tasks.
+ Schedule periodic tasks.
"""
- def one():
- self.gctx.checkpoint()
- rpki.log.debug("Self %s[%d] polling parents" % (self.self_handle, self.self_id))
- self.client_poll(two)
-
- def two():
- self.gctx.checkpoint()
- rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id))
- self.update_children(three)
-
- def three():
- self.gctx.checkpoint()
- rpki.log.debug("Self %s[%d] updating ROAs" % (self.self_handle, self.self_id))
- self.update_roas(four)
-
- def four():
- self.gctx.checkpoint()
- rpki.log.debug("Self %s[%d] updating Ghostbuster records" % (self.self_handle, self.self_id))
- self.update_ghostbusters(five)
-
- def five():
- self.gctx.checkpoint()
- rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id))
- self.regenerate_crls_and_manifests(six)
-
- def six():
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb))
- cb()
-
- one()
-
-
- def client_poll(self, callback):
- """
- Run the regular client poll cycle with each of this self's parents
- in turn.
- """
-
- rpki.log.trace()
-
- def parent_loop(parent_iterator, parent):
-
- def got_list(r_msg):
- ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas)
- self.gctx.checkpoint()
-
- def class_loop(class_iterator, rc):
-
- def class_update_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't update class, skipping: %s" % e)
- class_iterator()
-
- def class_create_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't create class, skipping: %s" % e)
- class_iterator()
-
- self.gctx.checkpoint()
- if rc.class_name in ca_map:
- ca = ca_map[rc.class_name]
- del ca_map[rc.class_name]
- ca.check_for_updates(parent, rc, class_iterator, class_update_failed)
- else:
- rpki.rpkid.ca_obj.create(parent, rc, class_iterator, class_create_failed)
-
- def class_done():
-
- def ca_loop(iterator, ca):
- self.gctx.checkpoint()
- ca.delete(parent, iterator)
-
- def ca_done():
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- parent_iterator()
-
- rpki.async.iterator(ca_map.values(), ca_loop, ca_done)
-
- rpki.async.iterator(r_msg.payload.classes, class_loop, class_done)
-
- def list_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s (%r)" % (parent, e, e))
- parent_iterator()
-
- rpki.up_down.list_pdu.query(parent, got_list, list_failed)
-
- rpki.async.iterator(self.parents, parent_loop, callback)
-
-
- def update_children(self, cb):
- """
- Check for updated IRDB data for all of this self's children and
- issue new certs as necessary. Must handle changes both in
- resources and in expiration date.
- """
-
- rpki.log.trace()
- now = rpki.sundial.now()
- rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
- publisher = rpki.rpkid.publication_queue()
-
- def loop(iterator, child):
-
- def lose(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't update child %r, skipping: %s" % (child, e))
- iterator()
-
- def got_resources(irdb_resources):
- try:
- for child_cert in child_certs:
- ca_detail = child_cert.ca_detail
- ca = ca_detail.ca
- if ca_detail.state == "active":
- old_resources = child_cert.cert.get_3779resources()
- new_resources = irdb_resources.intersection(old_resources).intersection(ca_detail.latest_ca_cert.get_3779resources())
-
- if new_resources.empty():
- rpki.log.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
- child_cert.revoke(publisher = publisher)
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
-
- elif old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now):
- rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
- child_cert.reissue(
- ca_detail = ca_detail,
- resources = new_resources,
- publisher = publisher)
-
- elif old_resources.valid_until < now:
- rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
- % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
- child_cert.sql_delete()
- publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository)
- ca_detail.generate_manifest(publisher = publisher)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- self.gctx.checkpoint()
- lose(e)
- else:
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- iterator()
-
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- child_certs = child.child_certs
- if child_certs:
- self.gctx.irdb_query_child_resources(child.self.self_handle, child.child_handle, got_resources, lose)
- else:
- iterator()
-
- def done():
- def lose(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
- self.gctx.checkpoint()
- cb()
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- publisher.call_pubd(cb, lose)
+ if self.cron_tasks is None:
+ self.cron_tasks = (
+ rpki.rpkid_tasks.PollParentTask(self),
+ rpki.rpkid_tasks.UpdateChildrenTask(self),
+ rpki.rpkid_tasks.UpdateROAsTask(self),
+ rpki.rpkid_tasks.UpdateGhostbustersTask(self),
+ rpki.rpkid_tasks.RegnerateCRLsAndManifestsTask(self))
- rpki.async.iterator(self.children, loop, done)
-
-
- def regenerate_crls_and_manifests(self, cb):
- """
- Generate new CRLs and manifests as necessary for all of this
- self's CAs. Extracting nextUpdate from a manifest is hard at the
- moment due to implementation silliness, so for now we generate a
- new manifest whenever we generate a new CRL
-
- This method also cleans up tombstones left behind by revoked
- ca_detail objects, since we're walking through the relevant
- portions of the database anyway.
- """
-
- rpki.log.trace()
- now = rpki.sundial.now()
- regen_margin = rpki.sundial.timedelta(seconds = self.regen_margin)
- publisher = rpki.rpkid.publication_queue()
-
- for parent in self.parents:
- for ca in parent.cas:
- try:
- for ca_detail in ca.revoked_ca_details:
- if now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.delete(ca = ca, publisher = publisher)
- ca_detail = ca.active_ca_detail
- if ca_detail is not None and now + regen_margin > ca_detail.latest_crl.getNextUpdate():
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- rpki.log.traceback()
- rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e))
-
- def lose(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish updated CRLs and manifests for self %r, skipping: %s" % (self.self_handle, e))
- self.gctx.checkpoint()
- cb()
-
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- publisher.call_pubd(cb, lose)
-
-
- def update_ghostbusters(self, cb):
- """
- Generate or update Ghostbuster records for this self.
-
- This is heavily based on .update_roas(), and probably both of them
- need refactoring.
- """
-
- parents = dict((p.parent_handle, p) for p in self.parents)
-
- def got_ghostbuster_requests(ghostbuster_requests):
-
- try:
- self.gctx.checkpoint()
- if self.gctx.sql.dirty:
- rpki.log.warn("Unexpected dirty SQL cache, flushing")
- self.gctx.sql.sweep()
-
- ghostbusters = {}
- orphans = []
- for ghostbuster in self.ghostbusters:
- k = (ghostbuster.ca_detail_id, ghostbuster.vcard)
- if ghostbuster.ca_detail.state != "active" or k in ghostbusters:
- orphans.append(ghostbuster)
- else:
- ghostbusters[k] = ghostbuster
-
- publisher = rpki.rpkid.publication_queue()
- ca_details = set()
-
- seen = set()
- for ghostbuster_request in ghostbuster_requests:
- if ghostbuster_request.parent_handle not in parents:
- rpki.log.warn("Unknown parent_handle %r in Ghostbuster request, skipping" % ghostbuster_request.parent_handle)
- continue
- k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard)
- if k in seen:
- rpki.log.warn("Skipping duplicate Ghostbuster request %r" % ghostbuster_request)
- continue
- seen.add(k)
- for ca in parents[ghostbuster_request.parent_handle].cas:
- ca_detail = ca.active_ca_detail
- if ca_detail is not None:
- ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None)
- if ghostbuster is None:
- ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard)
- rpki.log.debug("Created new Ghostbuster request for %r" % ghostbuster_request.parent_handle)
- else:
- rpki.log.debug("Found existing Ghostbuster request for %r" % ghostbuster_request.parent_handle)
- ghostbuster.update(publisher = publisher, fast = True)
- ca_details.add(ca_detail)
-
- orphans.extend(ghostbusters.itervalues())
- for ghostbuster in orphans:
- ca_details.add(ghostbuster.ca_detail)
- ghostbuster.revoke(publisher = publisher, fast = True)
-
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
-
- self.gctx.sql.sweep()
-
- def publication_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish Ghostbuster updates for %s, skipping: %s" % (self.self_handle, e))
- self.gctx.checkpoint()
- cb()
-
- self.gctx.checkpoint()
- publisher.call_pubd(cb, publication_failed)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- rpki.log.traceback()
- rpki.log.warn("Could not update Ghostbuster records for %s, skipping: %s" % (self.self_handle, e))
- cb()
-
- def ghostbuster_requests_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Could not fetch Ghostbuster record requests for %s, skipping: %s" % (self.self_handle, e))
- cb()
-
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- self.gctx.irdb_query_ghostbuster_requests(self.self_handle, parents.iterkeys(),
- got_ghostbuster_requests, ghostbuster_requests_failed)
-
-
- def update_roas(self, cb):
- """
- Generate or update ROAs for this self.
- """
-
- def got_roa_requests(roa_requests):
-
- rpki.log.debug("Received response to query for ROA requests")
-
- self.gctx.checkpoint()
- if self.gctx.sql.dirty:
- rpki.log.warn("Unexpected dirty SQL cache, flushing")
- self.gctx.sql.sweep()
-
- roas = {}
- seen = set()
- orphans = []
- updates = []
- publisher = rpki.rpkid.publication_queue()
- ca_details = set()
-
- for roa in self.roas:
- k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
- if k not in roas:
- roas[k] = roa
- elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and
- (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")):
- orphans.append(roas[k])
- roas[k] = roa
- else:
- orphans.append(roa)
-
- for roa_request in roa_requests:
- rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % (
- len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
- k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
- if k in seen:
- rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
- else:
- rpki.log.debug("Found existing %r" % roa)
- updates.append(roa)
-
- orphans.extend(roas.itervalues())
-
- roas.clear() # Release references we no longer need, to free up memory
- seen.clear() # Why does using "del" here raise SyntaxError?!?
- del roa_requests[:]
-
- def loop(iterator, roa):
- self.gctx.checkpoint()
- rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % (
- len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache)))
- try:
- roa.update(publisher = publisher, fast = True)
- ca_details.add(roa.ca_detail)
- self.gctx.sql.sweep()
- except (SystemExit, rpki.async.ExitNow):
- raise
- except rpki.exceptions.NoCoveringCertForROA:
- rpki.log.warn("No covering certificate for %r, skipping" % roa)
- except Exception, e:
- rpki.log.traceback()
- rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
- if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
- for ca_detail in ca_details:
- rpki.log.debug("Generating new CRL for %r" % ca_detail)
- ca_detail.generate_crl(publisher = publisher)
- rpki.log.debug("Generating new manifest for %r" % ca_detail)
- ca_detail.generate_manifest(publisher = publisher)
- rpki.log.debug("Sweeping")
- self.gctx.sql.sweep()
- rpki.log.debug("Done sweeping")
- self.gctx.checkpoint()
- rpki.log.debug("Starting publication")
- publisher.call_pubd(iterator, publication_failed)
- else:
- iterator()
-
- def publication_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e))
- self.gctx.checkpoint()
- cb()
-
- def done():
- for roa in orphans:
- try:
- ca_details.add(roa.ca_detail)
- roa.revoke(publisher = publisher, fast = True)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- rpki.log.traceback()
- rpki.log.warn("Could not revoke %r: %s" % (roa, e))
- self.gctx.sql.sweep()
- self.gctx.checkpoint()
- if publisher.size > 0:
- for ca_detail in ca_details:
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
- self.gctx.sql.sweep()
- self.gctx.checkpoint()
- publisher.call_pubd(cb, publication_failed)
- else:
- cb()
-
- rpki.async.iterator(updates, loop, done, pop_list = True)
-
- def roa_requests_failed(e):
- rpki.log.traceback()
- rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e))
- cb()
-
- self.gctx.checkpoint()
- self.gctx.sql.sweep()
- rpki.log.debug("Issuing query for ROA requests")
- self.gctx.irdb_query_roa_requests(self.self_handle, got_roa_requests, roa_requests_failed)
+ for task in self.cron_tasks:
+ self.gctx.task_add(task)
+ completion.register(task)
class bsc_elt(data_elt):
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index 80b0b7cb..962bedaa 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -57,6 +57,7 @@ import rpki.relaxng
import rpki.log
import rpki.async
import rpki.daemonize
+import rpki.rpkid_tasks
class main(object):
"""
@@ -323,19 +324,18 @@ class main(object):
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
- def task_add(self, handler, cb = None, description = None):
+ def task_add(self, task):
"""
Add a task to the scheduler task queue, unless it's already queued.
"""
- t = task(self, handler, cb, description)
- rpki.log.debug("New task %r" % t)
+ rpki.log.debug("New task %r" % task)
rpki.log.debug("Task queue %r" % self.task_queue)
- if t not in self.task_queue:
- rpki.log.debug("Adding %r to task queue" % t)
- self.task_queue.append(t)
+ if task not in self.task_queue:
+ rpki.log.debug("Adding %r to task queue" % task)
+ self.task_queue.append(task)
return True
else:
- rpki.log.debug("Task %r was already in the task queue" % t)
+ rpki.log.debug("Task %r was already in the task queue" % task)
return False
def task_next(self):
@@ -343,20 +343,14 @@ class main(object):
Pull next task from the task queue and put it the deferred event
queue (we don't want to run it directly, as that could eventually
blow out our call stack).
-
- Not yet sure what to do here if task queue is empty. For now we
- just return, on the theory that we've nothing left to do until the
- next cron timer fires. This may be a bad assumption, revisit if
- the rest of the code doesn't fit well with this assumption.
"""
- rpki.log.debug("Looking for next task")
try:
self.task_current = self.task_queue.pop(0)
except IndexError:
self.task_current = None
else:
rpki.log.debug("Pulled %r from task queue" % self.task_current)
- rpki.async.defer(self.task_current.run)
+ rpki.async.defer(self.task_current)
def task_run(self):
"""
@@ -365,15 +359,6 @@ class main(object):
if self.task_current is None:
self.task_next()
- def cron_done(self, cb):
- """
- Completion handler for timer-driven cron.
- """
- self.sql.sweep()
- self.cron_timeout = None
- rpki.log.info("Finished cron run")
- cb()
-
def cron(self, cb = None):
"""
Periodic tasks.
@@ -385,9 +370,17 @@ class main(object):
rpki.log.debug("Starting cron run")
+ def done():
+ self.sql.sweep()
+ self.cron_timeout = None
+ rpki.log.info("Finished cron run started at %s" % now)
+ if cb is not None:
+ cb()
+
+ completion = rpki.rpkid_tasks.CompletionHandler(done)
for s in rpki.left_right.self_elt.sql_fetch_all(self):
- self.task_add(s.cron)
- self.task_add(self.cron_done, cb)
+ s.schedule_cron_tasks(completion)
+ nothing_queued = completion.count == 0
assert self.use_internal_cron or self.cron_timeout is None
@@ -407,6 +400,8 @@ class main(object):
elif self.use_internal_cron:
rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
+ if nothing_queued:
+ done()
def cronjob_handler(self, query, path, cb):
"""
@@ -424,44 +419,6 @@ class main(object):
rpki.log.debug("Starting externally triggered cron")
self.cron(done)
-class task(object):
- """
- Scheduler task object.
- """
-
- def __init__(self, gctx, handler, cb = None, description = None):
- self.gctx = gctx
- self.handler = handler
- self.cb = cb
- self.description = description
-
- def __cmp__(self, other):
- return cmp(self.handler, other.handler)
-
- def __hash__(self):
- return self.handler.__hash__()
-
- def __repr__(self):
- if self.description is None:
- return repr(self.handler)
- else:
- return "<%r: %s>" % (self.handler, self.description)
-
- def done(self):
- """
- Completion handler for task.
- """
- if self.cb is not None:
- self.cb()
- self.gctx.task_next()
-
- def run(self):
- """
- Run this task when called via the deferred event system.
- """
- rpki.log.debug("Running task %r" % self)
- self.handler(self.done)
-
class ca_obj(rpki.sql.sql_persistent):
"""
Internal CA object.
diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py
index c1f2cd7b..62d0f474 100644
--- a/rpkid/rpki/rpkid_tasks.py
+++ b/rpkid/rpki/rpkid_tasks.py
@@ -19,6 +19,23 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
+import rpki.log
+import rpki.rpkid
+import rpki.async
+import rpki.up_down
+import rpki.sundial
+import rpki.publication
+import rpki.exceptions
+
+## @var max_new_roas_at_once
+# Upper limit on the number of ROAs we'll create in a single
+# self_elt.update_roas() call. This is a bit of a kludge, and may be
+# replaced with something more clever or general later; for the moment
+# the goal is to avoid going totally compute bound when somebody
+# throws 50,000 new ROA requests at us in a single batch.
+
+max_new_roas_at_once = 200
+
class CompletionHandler(object):
"""
Track one or more scheduled rpkid tasks and execute a callback when
@@ -30,17 +47,26 @@ class CompletionHandler(object):
self.tasks = set()
def register(self, task):
+ rpki.log.debug("Completion handler %r registering task %r" % (self, task))
self.tasks.add(task)
- task.register_completion(self.complete)
+ task.register_completion(self.done)
- def complete(self, task):
+ def done(self, task):
try:
self.tasks.remove(task)
except KeyError:
rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task))
+ else:
+ rpki.log.debug("Completion handler %r called with registered task %r" % (self, task))
if not self.tasks:
+ rpki.log.debug("Completion handler %r finished, calling %r" % (self, self.cb))
self.cb()
+ @property
+ def count(self):
+ return len(self.tasks)
+
+
class AbstractTask(object):
"""
Abstract base class for rpkid scheduler task objects. This just
@@ -57,6 +83,7 @@ class AbstractTask(object):
self.description = description
self.completions = []
self.continuation = None
+ self.clear()
def __repr__(self):
return rpki.log.log_repr(self, self.description)
@@ -65,8 +92,9 @@ class AbstractTask(object):
self.completions.append(completion)
def exit(self):
- for completion in self.completions:
- completion()
+ while self.completions:
+ self.completions.pop(0)(self)
+ self.clear()
self.self.gctx.task_next()
def postpone(self, continuation):
@@ -77,6 +105,7 @@ class AbstractTask(object):
def __call__(self):
if self.continuation is None:
rpki.log.debug("Running task %r" % self)
+ self.clear()
self.start()
else:
rpki.log.debug("Restarting task %r at " % (self, self.continuation))
@@ -90,6 +119,9 @@ class AbstractTask(object):
def start(self):
raise NotImplementedError
+ def clear(self):
+ pass
+
class PollParentTask(AbstractTask):
"""
@@ -97,6 +129,12 @@ class PollParentTask(AbstractTask):
parents, in turn.
"""
+ def clear(self):
+ self.parent_iterator = None
+ self.parent = None
+ self.ca_map = None
+ self.class_iterator = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
@@ -144,7 +182,7 @@ class PollParentTask(AbstractTask):
def ca_loop(self, iterator, ca):
self.gctx.checkpoint()
- ca.delete(parent, iterator)
+ ca.delete(self.parent, iterator)
def ca_done(self):
self.gctx.checkpoint()
@@ -159,13 +197,20 @@ class UpdateChildrenTask(AbstractTask):
resources and in expiration date.
"""
+ def clear(self):
+ self.now = None
+ self.rsn = None
+ self.publisher = None
+ self.iterator = None
+ self.child = None
+ self.child_certs = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id))
-
self.now = rpki.sundial.now()
- self.rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
+ self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin)
self.publisher = rpki.rpkid.publication_queue()
rpki.async.iterator(self.children, self.loop, self.done)
@@ -201,7 +246,7 @@ class UpdateChildrenTask(AbstractTask):
ca_detail.generate_manifest(publisher = self.publisher)
elif old_resources != new_resources or (old_resources.valid_until < self.rsn and irdb_resources.valid_until > self.now):
- rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
+ rpki.log.debug("Need to reissue child %s certificate SKI %s" % (self.child.child_handle, child_cert.cert.gSKI()))
child_cert.reissue(
ca_detail = ca_detail,
resources = new_resources,
@@ -209,7 +254,7 @@ class UpdateChildrenTask(AbstractTask):
elif old_resources.valid_until < self.now:
rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
- % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
+ % (self.child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
child_cert.sql_delete()
self.publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository)
ca_detail.generate_manifest(publisher = self.publisher)
@@ -218,16 +263,16 @@ class UpdateChildrenTask(AbstractTask):
raise
except Exception, e:
self.gctx.checkpoint()
- lose(e)
+ self.lose(e)
else:
self.gctx.checkpoint()
self.gctx.sql.sweep()
self.iterator()
- def done():
+ def done(self):
self.gctx.checkpoint()
self.gctx.sql.sweep()
- publisher.call_pubd(self.exit, self.publication_failed)
+ self.publisher.call_pubd(self.exit, self.publication_failed)
def publication_failed(self, e):
rpki.log.traceback()
@@ -241,6 +286,13 @@ class UpdateROAsTask(AbstractTask):
Generate or update ROAs for this self.
"""
+ def clear(self):
+ self.orphans = None
+ self.updates = None
+ self.publisher = None
+ self.ca_details = None
+ self.count = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
@@ -320,7 +372,7 @@ class UpdateROAsTask(AbstractTask):
iterator()
def publish(self, done):
- if publisher.size > 0:
+ if self.publisher.size > 0:
for ca_detail in self.ca_details:
rpki.log.debug("Generating new CRL for %r" % ca_detail)
ca_detail.generate_crl(publisher = self.publisher)
@@ -341,10 +393,10 @@ class UpdateROAsTask(AbstractTask):
self.exit()
def done(self):
- for roa in orphans:
+ for roa in self.orphans:
try:
- ca_details.add(roa.ca_detail)
- roa.revoke(publisher = publisher, fast = True)
+ self.ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = self.publisher, fast = True)
except (SystemExit, rpki.async.ExitNow):
raise
except Exception, e:
diff --git a/rpkid/tests/yamltest.py b/rpkid/tests/yamltest.py
index 3039f545..6d4a6381 100644
--- a/rpkid/tests/yamltest.py
+++ b/rpkid/tests/yamltest.py
@@ -608,7 +608,7 @@ try:
root_uri = "rsync://localhost:%d/rpki/" % db.root.pubd.rsync_port
root_sia = ((rpki.oids.name2oid["id-ad-caRepository"], ("uri", root_uri)),
- (rpki.oids.name2oid["id-ad-rpkiManifest"], ("uri", root_uri + "root.mnf")))
+ (rpki.oids.name2oid["id-ad-rpkiManifest"], ("uri", root_uri + "root.mft")))
root_cert = rpki.x509.X509.self_certify(
keypair = root_key,