diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-20 22:00:07 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-20 22:00:07 +0000 |
commit | 633b5b6c1c57af37eb2e4106d777a2af094b7b61 (patch) | |
tree | cba3cf712054487ea3c98ccbe14c48e99c9aa4e7 | |
parent | e42948305301058ab7f9298e715ae14272068fb0 (diff) |
Rewrite voluntary scheduler yield code to be in terms of elapsed time
rather than number of objects processed or published. In theory, the
new code should be reusable in other task classes.
svn path=/branches/tk274/; revision=4650
-rw-r--r-- | rpkid/rpki/rpkid.py | 21 | ||||
-rw-r--r-- | rpkid/rpki/rpkid_tasks.py | 40 |
2 files changed, 35 insertions, 26 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index 4ed232a3..c827386b 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -1906,15 +1906,22 @@ class publication_queue(object): return self._add( uri, obj, repository, handler, cls.make_withdraw) def call_pubd(self, cb, eb): - def loop(iterator, rid): - rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) - self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) - def done(): - rpki.log.debug("Publication complete") - self.clear() + if self.empty(): cb() - rpki.async.iterator(self.repositories, loop, done) + else: + def loop(iterator, rid): + rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) + self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) + def done(): + rpki.log.debug("Publication complete") + self.clear() + cb() + rpki.async.iterator(self.repositories, loop, done) @property def size(self): return sum(len(self.msgs[rid]) for rid in self.repositories) + + def empty(self): + assert (not self.msgs) == (self.size == 0) + return not self.msgs diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py index 4f9e09f6..e23fd0b0 100644 --- a/rpkid/rpki/rpkid_tasks.py +++ b/rpkid/rpki/rpkid_tasks.py @@ -27,15 +27,6 @@ import rpki.sundial import rpki.publication import rpki.exceptions -## @var max_new_roas_at_once -# Upper limit on the number of ROAs we'll create in a single -# self_elt.update_roas() call. This is a bit of a kludge, and may be -# replaced with something more clever or general later; for the moment -# the goal is to avoid going totally compute bound when somebody -# throws 50,000 new ROA requests at us in a single batch. - -max_new_roas_at_once = 200 - class CompletionHandler(object): """ Track one or more scheduled rpkid tasks and execute a callback when @@ -86,11 +77,18 @@ class AbstractTask(object): class. Rewrite, rewrite, remove this comment when done, OK! """ + ## @var timeslice + # How long before a task really should consider yielding the CPU to + # let something else run. + + timeslice = rpki.sundial.timedelta(seconds = 15) + def __init__(self, s, description = None): self.self = s self.description = description self.completions = [] self.continuation = None + self.due_date = None self.clear() def __repr__(self): @@ -103,14 +101,17 @@ class AbstractTask(object): while self.completions: self.completions.pop(0)(self) self.clear() + self.due_date = None self.self.gctx.task_next() def postpone(self, continuation): self.continuation = continuation + self.due_date = None self.self.gctx.task_add(self) self.self.gctx.task_next() def __call__(self): + self.due_date = rpki.sundial.now() + self.timeslice if self.continuation is None: rpki.log.debug("Running task %r" % self) self.clear() @@ -121,6 +122,10 @@ class AbstractTask(object): self.continuation = None continuation() + @property + def overdue(self): + return rpki.sundial.now() > self.due_date + def __getattr__(self, name): return getattr(self.self, name) @@ -280,10 +285,7 @@ class UpdateChildrenTask(AbstractTask): def done(self): self.gctx.checkpoint() self.gctx.sql.sweep() - if self.publisher.size > 0: - self.publisher.call_pubd(self.exit, self.publication_failed) - else: - self.exit() + self.publisher.call_pubd(self.exit, self.publication_failed) def publication_failed(self, e): rpki.log.traceback() @@ -340,8 +342,8 @@ class UpdateROAsTask(AbstractTask): self.orphans.append(roa) for roa_request in roa_requests: - rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % ( - len(roa_requests), len(roas), len(self.orphans), len(self.updates), self.publisher.size, + rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s ca_details %s seen %s cache %s" % ( + len(roa_requests), len(roas), len(self.orphans), len(self.updates), len(self.ca_details), len(seen), len(self.gctx.sql.cache))) k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) if k in seen: @@ -363,8 +365,8 @@ class UpdateROAsTask(AbstractTask): def loop(self, iterator, roa): self.gctx.checkpoint() - rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % ( - len(self.updates), len(self.orphans), self.publisher.size, len(self.ca_details), len(self.gctx.sql.cache))) + rpki.log.debug("++ updates %s orphans %s ca_details %s cache %s" % ( + len(self.updates), len(self.orphans), len(self.ca_details), len(self.gctx.sql.cache))) try: roa.update(publisher = self.publisher, fast = True) self.ca_details.add(roa.ca_detail) @@ -377,13 +379,13 @@ class UpdateROAsTask(AbstractTask): rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) self.count += 1 - if max_new_roas_at_once is not None and self.count % max_new_roas_at_once == 0: + if self.overdue: self.publish(lambda: self.postpone(iterator)) else: iterator() def publish(self, done): - if self.publisher.size > 0: + if not self.publisher.empty(): for ca_detail in self.ca_details: rpki.log.debug("Generating new CRL for %r" % ca_detail) ca_detail.generate_crl(publisher = self.publisher) |