aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/rpkid_tasks.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-19 01:09:29 +0000
committerRob Austein <sra@hactrn.net>2012-08-19 01:09:29 +0000
commit26c65b2bdfb408a1bdeb3557f6e460d78813f1d6 (patch)
tree743135e60ca412d205aa0952d835a48edfd04f02 /rpkid/rpki/rpkid_tasks.py
parentccc2eb0174eb708888249966e7c10306315fb1a5 (diff)
Refactor rpkid high-level task system to use classes rather than
closures, to make it easier for long-running tasks to yield the CPU periodically. As a side effect, this moves a lot of dense code out of rpki.left_right.self_elt methods and into separate task-specific classes. See #275. svn path=/branches/tk274/; revision=4640
Diffstat (limited to 'rpkid/rpki/rpkid_tasks.py')
-rw-r--r--rpkid/rpki/rpkid_tasks.py84
1 files changed, 68 insertions, 16 deletions
diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py
index c1f2cd7b..62d0f474 100644
--- a/rpkid/rpki/rpkid_tasks.py
+++ b/rpkid/rpki/rpkid_tasks.py
@@ -19,6 +19,23 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
+import rpki.log
+import rpki.rpkid
+import rpki.async
+import rpki.up_down
+import rpki.sundial
+import rpki.publication
+import rpki.exceptions
+
+## @var max_new_roas_at_once
+# Upper limit on the number of ROAs we'll create in a single
+# self_elt.update_roas() call. This is a bit of a kludge, and may be
+# replaced with something more clever or general later; for the moment
+# the goal is to avoid going totally compute bound when somebody
+# throws 50,000 new ROA requests at us in a single batch.
+
+max_new_roas_at_once = 200
+
class CompletionHandler(object):
"""
Track one or more scheduled rpkid tasks and execute a callback when
@@ -30,17 +47,26 @@ class CompletionHandler(object):
self.tasks = set()
def register(self, task):
+ rpki.log.debug("Completion handler %r registering task %r" % (self, task))
self.tasks.add(task)
- task.register_completion(self.complete)
+ task.register_completion(self.done)
- def complete(self, task):
+ def done(self, task):
try:
self.tasks.remove(task)
except KeyError:
rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task))
+ else:
+ rpki.log.debug("Completion handler %r called with registered task %r" % (self, task))
if not self.tasks:
+ rpki.log.debug("Completion handler %r finished, calling %r" % (self, self.cb))
self.cb()
+ @property
+ def count(self):
+ return len(self.tasks)
+
+
class AbstractTask(object):
"""
Abstract base class for rpkid scheduler task objects. This just
@@ -57,6 +83,7 @@ class AbstractTask(object):
self.description = description
self.completions = []
self.continuation = None
+ self.clear()
def __repr__(self):
return rpki.log.log_repr(self, self.description)
@@ -65,8 +92,9 @@ class AbstractTask(object):
self.completions.append(completion)
def exit(self):
- for completion in self.completions:
- completion()
+ while self.completions:
+ self.completions.pop(0)(self)
+ self.clear()
self.self.gctx.task_next()
def postpone(self, continuation):
@@ -77,6 +105,7 @@ class AbstractTask(object):
def __call__(self):
if self.continuation is None:
rpki.log.debug("Running task %r" % self)
+ self.clear()
self.start()
else:
rpki.log.debug("Restarting task %r at " % (self, self.continuation))
@@ -90,6 +119,9 @@ class AbstractTask(object):
def start(self):
raise NotImplementedError
+ def clear(self):
+ pass
+
class PollParentTask(AbstractTask):
"""
@@ -97,6 +129,12 @@ class PollParentTask(AbstractTask):
parents, in turn.
"""
+ def clear(self):
+ self.parent_iterator = None
+ self.parent = None
+ self.ca_map = None
+ self.class_iterator = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
@@ -144,7 +182,7 @@ class PollParentTask(AbstractTask):
def ca_loop(self, iterator, ca):
self.gctx.checkpoint()
- ca.delete(parent, iterator)
+ ca.delete(self.parent, iterator)
def ca_done(self):
self.gctx.checkpoint()
@@ -159,13 +197,20 @@ class UpdateChildrenTask(AbstractTask):
resources and in expiration date.
"""
+ def clear(self):
+ self.now = None
+ self.rsn = None
+ self.publisher = None
+ self.iterator = None
+ self.child = None
+ self.child_certs = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id))
-
self.now = rpki.sundial.now()
- self.rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin)
+ self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin)
self.publisher = rpki.rpkid.publication_queue()
rpki.async.iterator(self.children, self.loop, self.done)
@@ -201,7 +246,7 @@ class UpdateChildrenTask(AbstractTask):
ca_detail.generate_manifest(publisher = self.publisher)
elif old_resources != new_resources or (old_resources.valid_until < self.rsn and irdb_resources.valid_until > self.now):
- rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI()))
+ rpki.log.debug("Need to reissue child %s certificate SKI %s" % (self.child.child_handle, child_cert.cert.gSKI()))
child_cert.reissue(
ca_detail = ca_detail,
resources = new_resources,
@@ -209,7 +254,7 @@ class UpdateChildrenTask(AbstractTask):
elif old_resources.valid_until < self.now:
rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s"
- % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
+ % (self.child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until))
child_cert.sql_delete()
self.publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository)
ca_detail.generate_manifest(publisher = self.publisher)
@@ -218,16 +263,16 @@ class UpdateChildrenTask(AbstractTask):
raise
except Exception, e:
self.gctx.checkpoint()
- lose(e)
+ self.lose(e)
else:
self.gctx.checkpoint()
self.gctx.sql.sweep()
self.iterator()
- def done():
+ def done(self):
self.gctx.checkpoint()
self.gctx.sql.sweep()
- publisher.call_pubd(self.exit, self.publication_failed)
+ self.publisher.call_pubd(self.exit, self.publication_failed)
def publication_failed(self, e):
rpki.log.traceback()
@@ -241,6 +286,13 @@ class UpdateROAsTask(AbstractTask):
Generate or update ROAs for this self.
"""
+ def clear(self):
+ self.orphans = None
+ self.updates = None
+ self.publisher = None
+ self.ca_details = None
+ self.count = None
+
def start(self):
rpki.log.trace()
self.gctx.checkpoint()
@@ -320,7 +372,7 @@ class UpdateROAsTask(AbstractTask):
iterator()
def publish(self, done):
- if publisher.size > 0:
+ if self.publisher.size > 0:
for ca_detail in self.ca_details:
rpki.log.debug("Generating new CRL for %r" % ca_detail)
ca_detail.generate_crl(publisher = self.publisher)
@@ -341,10 +393,10 @@ class UpdateROAsTask(AbstractTask):
self.exit()
def done(self):
- for roa in orphans:
+ for roa in self.orphans:
try:
- ca_details.add(roa.ca_detail)
- roa.revoke(publisher = publisher, fast = True)
+ self.ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = self.publisher, fast = True)
except (SystemExit, rpki.async.ExitNow):
raise
except Exception, e: