aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-20 22:00:07 +0000
committerRob Austein <sra@hactrn.net>2012-08-20 22:00:07 +0000
commit633b5b6c1c57af37eb2e4106d777a2af094b7b61 (patch)
treecba3cf712054487ea3c98ccbe14c48e99c9aa4e7
parente42948305301058ab7f9298e715ae14272068fb0 (diff)
Rewrite voluntary scheduler yield code to be in terms of elapsed time
rather than number of objects processed or published. In theory, the new code should be reusable in other task classes. svn path=/branches/tk274/; revision=4650
-rw-r--r--rpkid/rpki/rpkid.py21
-rw-r--r--rpkid/rpki/rpkid_tasks.py40
2 files changed, 35 insertions, 26 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index 4ed232a3..c827386b 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -1906,15 +1906,22 @@ class publication_queue(object):
return self._add( uri, obj, repository, handler, cls.make_withdraw)
def call_pubd(self, cb, eb):
- def loop(iterator, rid):
- rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
- self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
- def done():
- rpki.log.debug("Publication complete")
- self.clear()
+ if self.empty():
cb()
- rpki.async.iterator(self.repositories, loop, done)
+ else:
+ def loop(iterator, rid):
+ rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
+ self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
+ def done():
+ rpki.log.debug("Publication complete")
+ self.clear()
+ cb()
+ rpki.async.iterator(self.repositories, loop, done)
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)
+
+ def empty(self):
+ assert (not self.msgs) == (self.size == 0)
+ return not self.msgs
diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py
index 4f9e09f6..e23fd0b0 100644
--- a/rpkid/rpki/rpkid_tasks.py
+++ b/rpkid/rpki/rpkid_tasks.py
@@ -27,15 +27,6 @@ import rpki.sundial
import rpki.publication
import rpki.exceptions
-## @var max_new_roas_at_once
-# Upper limit on the number of ROAs we'll create in a single
-# self_elt.update_roas() call. This is a bit of a kludge, and may be
-# replaced with something more clever or general later; for the moment
-# the goal is to avoid going totally compute bound when somebody
-# throws 50,000 new ROA requests at us in a single batch.
-
-max_new_roas_at_once = 200
-
class CompletionHandler(object):
"""
Track one or more scheduled rpkid tasks and execute a callback when
@@ -86,11 +77,18 @@ class AbstractTask(object):
class. Rewrite, rewrite, remove this comment when done, OK!
"""
+ ## @var timeslice
+ # How long before a task really should consider yielding the CPU to
+ # let something else run.
+
+ timeslice = rpki.sundial.timedelta(seconds = 15)
+
def __init__(self, s, description = None):
self.self = s
self.description = description
self.completions = []
self.continuation = None
+ self.due_date = None
self.clear()
def __repr__(self):
@@ -103,14 +101,17 @@ class AbstractTask(object):
while self.completions:
self.completions.pop(0)(self)
self.clear()
+ self.due_date = None
self.self.gctx.task_next()
def postpone(self, continuation):
self.continuation = continuation
+ self.due_date = None
self.self.gctx.task_add(self)
self.self.gctx.task_next()
def __call__(self):
+ self.due_date = rpki.sundial.now() + self.timeslice
if self.continuation is None:
rpki.log.debug("Running task %r" % self)
self.clear()
@@ -121,6 +122,10 @@ class AbstractTask(object):
self.continuation = None
continuation()
+ @property
+ def overdue(self):
+ return rpki.sundial.now() > self.due_date
+
def __getattr__(self, name):
return getattr(self.self, name)
@@ -280,10 +285,7 @@ class UpdateChildrenTask(AbstractTask):
def done(self):
self.gctx.checkpoint()
self.gctx.sql.sweep()
- if self.publisher.size > 0:
- self.publisher.call_pubd(self.exit, self.publication_failed)
- else:
- self.exit()
+ self.publisher.call_pubd(self.exit, self.publication_failed)
def publication_failed(self, e):
rpki.log.traceback()
@@ -340,8 +342,8 @@ class UpdateROAsTask(AbstractTask):
self.orphans.append(roa)
for roa_request in roa_requests:
- rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % (
- len(roa_requests), len(roas), len(self.orphans), len(self.updates), self.publisher.size,
+ rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s ca_details %s seen %s cache %s" % (
+ len(roa_requests), len(roas), len(self.orphans), len(self.updates),
len(self.ca_details), len(seen), len(self.gctx.sql.cache)))
k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
if k in seen:
@@ -363,8 +365,8 @@ class UpdateROAsTask(AbstractTask):
def loop(self, iterator, roa):
self.gctx.checkpoint()
- rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % (
- len(self.updates), len(self.orphans), self.publisher.size, len(self.ca_details), len(self.gctx.sql.cache)))
+ rpki.log.debug("++ updates %s orphans %s ca_details %s cache %s" % (
+ len(self.updates), len(self.orphans), len(self.ca_details), len(self.gctx.sql.cache)))
try:
roa.update(publisher = self.publisher, fast = True)
self.ca_details.add(roa.ca_detail)
@@ -377,13 +379,13 @@ class UpdateROAsTask(AbstractTask):
rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
self.count += 1
- if max_new_roas_at_once is not None and self.count % max_new_roas_at_once == 0:
+ if self.overdue:
self.publish(lambda: self.postpone(iterator))
else:
iterator()
def publish(self, done):
- if self.publisher.size > 0:
+ if not self.publisher.empty():
for ca_detail in self.ca_details:
rpki.log.debug("Generating new CRL for %r" % ca_detail)
ca_detail.generate_crl(publisher = self.publisher)