diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-19 16:12:26 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-19 16:12:26 +0000 |
commit | 01bc52850438c23b31347440825dedfcd125bfbb (patch) | |
tree | 22628b29003006f0aed50fb43b37d21d81d7465c /rpkid/rpki/async.py | |
parent | b2eb70ea0d75d8e969b29502a264704599e790d7 (diff) |
Simplify core I/O loop, back to what I originally intended before
accidently digging myself into a hole with a circular reference that
confused Python's garbage collector. See #275.
This version is much too noisy, and could use some cleanup, but basic
code seems to work, and I want to test it on multiple machines in
parallel, so checking it in now.
svn path=/branches/tk274/; revision=4642
Diffstat (limited to 'rpkid/rpki/async.py')
-rw-r--r-- | rpkid/rpki/async.py | 94 |
1 files changed, 37 insertions, 57 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index c2a02e4f..7563f589 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -94,6 +94,12 @@ class iterator(object): if self.done_callback is not None: self.done_callback() + +## @var queue +# Timer queue. + +timer_queue = [] + class timer(object): """ Timer construct for event-driven code. It can be used in either of two ways: @@ -117,10 +123,6 @@ class timer(object): # Verbose chatter about timers being run. run_debug = False - ## @var queue - # Timer queue, shared by all timer instances (there can be only one queue). - queue = [] - def __init__(self, handler = None, errback = None): if handler is not None: self.set_handler(handler) @@ -150,9 +152,9 @@ class timer(object): else: self.when = when assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) - if self not in self.queue: - self.queue.append(self) - self.queue.sort(key = lambda x: x.when) + if self not in timer_queue: + timer_queue.append(self) + timer_queue.sort(key = lambda x: x.when) def __cmp__(self, other): return cmp(id(self), id(other)) @@ -169,7 +171,7 @@ class timer(object): self.trace("Canceling %r" % self) try: while True: - self.queue.remove(self) + timer_queue.remove(self) except ValueError: pass @@ -177,7 +179,7 @@ class timer(object): """ Test whether this timer is currently set. """ - return self in self.queue + return self in timer_queue def handler(self): """ @@ -215,8 +217,8 @@ class timer(object): Run the timer queue: for each timer whose call time has passed, pull the timer off the queue and call its handler() method. """ - while cls.queue and rpki.sundial.now() >= cls.queue[0].when: - t = cls.queue.pop(0) + if timer_queue and rpki.sundial.now() >= timer_queue[0].when: + t = timer_queue.pop(0) if cls.run_debug: rpki.log.debug("Running %r" % t) try: @@ -239,12 +241,12 @@ class timer(object): the same units (argh!), and we're not doing anything that hair-triggered, so rounding up is simplest. """ - if not cls.queue: + if not timer_queue: return None now = rpki.sundial.now() - if now >= cls.queue[0].when: + if now >= timer_queue[0].when: return 0 - delay = cls.queue[0].when - now + delay = timer_queue[0].when - now seconds = delay.convert_to_seconds() if delay.microseconds: seconds += 1 @@ -257,40 +259,32 @@ class timer(object): queue content, but this way we can notify subclasses that provide their own cancel() method. """ - while cls.queue: - cls.queue.pop(0).cancel() - -## @var deferred_queue -# List to hold deferred actions. We used to do this with the timer -# queue, but that appears to confuse the garbage collector, and is -# overengineering for simple deferred actions in any case. + while timer_queue: + timer_queue.pop(0).cancel() -deferred_queue = [] - -def defer(thunk): +def _raiseExitNow(signum, frame): """ - Defer an action until the next pass through the event loop. + Signal handler for event_loop(). """ - deferred_queue.append(thunk) + raise ExitNow -def run_deferred(): +def exit_event_loop(): """ - Run deferred actions. + Force exit from event_loop(). """ - while deferred_queue: - try: - deferred_queue.pop(0)() - except (ExitNow, SystemExit): - raise - except Exception, e: - rpki.log.error("Unhandled exception from deferred action %s: %s" % (e.__class__.__name__, e)) - rpki.log.traceback() + raise ExitNow -def _raiseExitNow(signum, frame): +def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 0)): """ - Signal handler for event_loop(). + Use a near-term timer to schedule an event after letting the timer + and I/O systems run. """ - raise ExitNow + t = timer(handler) + t.set(delay) + +# Backwards compatability -- clean this up if the change works + +defer = event_yield def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): """ @@ -304,10 +298,10 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): old = signal.signal(sig, _raiseExitNow) if save_sigs: old_signal_handlers[sig] = old - while asyncore.socket_map or deferred_queue or timer.queue: - run_deferred() - asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) - run_deferred() + while asyncore.socket_map or timer_queue: + t = timer.seconds_until_wakeup() + rpki.log.debug("Dismissing to asyncore.poll(), t = %s, q = %r" % (t, timer_queue)) + asyncore.poll(t, asyncore.socket_map) timer.runq() if timer.gc_debug: gc.collect() @@ -394,20 +388,6 @@ class sync_wrapper(object): else: raise self.err -def exit_event_loop(): - """ - Force exit from event_loop(). - """ - raise ExitNow - -def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)): - """ - Use a near-term timer to schedule an event after letting the timer - and I/O systems run. - """ - t = timer(handler) - t.set(delay) - class gc_summary(object): """ Periodic summary of GC state, for tracking down memory bloat. |