diff options
Diffstat (limited to 'rpkid/rpki/async.py')
-rw-r--r-- | rpkid/rpki/async.py | 196 |
1 files changed, 89 insertions, 107 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index 5eaa34f9..aee7770f 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -3,7 +3,7 @@ Utilities for event-driven programming. $Id$ -Copyright (C) 2009--2011 Internet Systems Consortium ("ISC") +Copyright (C) 2009--2012 Internet Systems Consortium ("ISC") Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -18,8 +18,13 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import asyncore, signal, traceback, gc, sys -import rpki.log, rpki.sundial +import asyncore +import signal +import traceback +import gc +import sys +import rpki.log +import rpki.sundial ExitNow = asyncore.ExitNow @@ -40,15 +45,24 @@ class iterator(object): to continue to the next item in the iteration. The termination callback receives no arguments. + + Special case for memory constrained cases: if keyword argument + pop_list is True, iterable must be a list, which is modified in + place, popping items off of it until it's empty. """ - def __init__(self, iterable, item_callback, done_callback, unwind_stack = True): + def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False): + assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list" self.item_callback = item_callback - self.done_callback = done_callback + self.done_callback = done_callback if done_callback is not None else lambda: None self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] self.unwind_stack = unwind_stack + self.pop_list = pop_list try: - self.iterator = iter(iterable) + if self.pop_list: + self.iterator = iterable + else: + self.iterator = iter(iterable) except (ExitNow, SystemExit): raise except Exception: @@ -57,13 +71,14 @@ class iterator(object): self.doit() def __repr__(self): - return ("<%s created at %s:%s %s at 0x%x>" % - (self.__class__.__name__, - self.caller_file, self.caller_line, self.caller_function, id(self))) + return rpki.log.log_repr(self, + "created at %s:%s" % (self.caller_file, + self.caller_line), + self.caller_function) def __call__(self): if self.unwind_stack: - defer(self.doit) + event_defer(self.doit) else: self.doit() @@ -73,25 +88,25 @@ class iterator(object): with the next iteration value, call the termination handler if the iterator signaled StopIteration. """ - try: - self.item_callback(self, self.iterator.next()) - except StopIteration: - if self.done_callback is not None: - self.done_callback() -class timer(object): - """ - Timer construct for event-driven code. It can be used in either of two ways: + try: + if self.pop_list: + val = self.iterator.pop(0) + else: + val = self.iterator.next() + except (IndexError, StopIteration): + self.done_callback() + else: + self.item_callback(self, val) - - As a virtual class, in which case the subclass should provide a - handler() method to receive the wakup event when the timer expires; or +## @var timer_queue +# Timer queue. - - By setting an explicit handler callback, either via the - constructor or the set_handler() method. +timer_queue = [] - Subclassing is probably more Pythonic, but setting an explict - handler turns out to be very convenient when combined with bound - methods to other objects. +class timer(object): + """ + Timer construct for event-driven code. """ ## @var gc_debug @@ -102,15 +117,9 @@ class timer(object): # Verbose chatter about timers being run. run_debug = False - ## @var queue - # Timer queue, shared by all timer instances (there can be only one queue). - queue = [] - def __init__(self, handler = None, errback = None): - if handler is not None: - self.set_handler(handler) - if errback is not None: - self.set_errback(errback) + self.set_handler(handler) + self.set_errback(errback) self.when = None if self.gc_debug: self.trace("Creating %r" % self) @@ -135,9 +144,9 @@ class timer(object): else: self.when = when assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) - if self not in self.queue: - self.queue.append(self) - self.queue.sort(key = lambda x: x.when) + if self not in timer_queue: + timer_queue.append(self) + timer_queue.sort(key = lambda x: x.when) def __cmp__(self, other): return cmp(id(self), id(other)) @@ -154,7 +163,7 @@ class timer(object): self.trace("Canceling %r" % self) try: while True: - self.queue.remove(self) + timer_queue.remove(self) except ValueError: pass @@ -162,14 +171,7 @@ class timer(object): """ Test whether this timer is currently set. """ - return self in self.queue - - def handler(self): - """ - Handle a timer that has expired. This must either be overriden by - a subclass or set dynamically by set_handler(). - """ - raise NotImplementedError + return self in timer_queue def set_handler(self, handler): """ @@ -181,13 +183,6 @@ class timer(object): """ self.handler = handler - def errback(self, e): - """ - Error callback. May be overridden, or set with set_errback(). - """ - rpki.log.error("Unhandled exception from timer: %s" % e) - rpki.log.traceback() - def set_errback(self, errback): """ Set a timer's errback. Like set_handler(), for errbacks. @@ -199,17 +194,29 @@ class timer(object): """ Run the timer queue: for each timer whose call time has passed, pull the timer off the queue and call its handler() method. + + Comparisions are made against time at which this function was + called, so that even if new events keep getting scheduled, we'll + return to the I/O loop reasonably quickly. """ - while cls.queue and rpki.sundial.now() >= cls.queue[0].when: - t = cls.queue.pop(0) + now = rpki.sundial.now() + while timer_queue and now >= timer_queue[0].when: + t = timer_queue.pop(0) if cls.run_debug: rpki.log.debug("Running %r" % t) try: - t.handler() + if t.handler is not None: + t.handler() + else: + rpki.log.warn("Timer %r expired with no handler set" % t) except (ExitNow, SystemExit): raise except Exception, e: - t.errback(e) + if t.errback is not None: + t.errback(e) + else: + rpki.log.error("Unhandled exception from timer %r: %s" % (t, e)) + rpki.log.traceback() def __repr__(self): return rpki.log.log_repr(self, self.when, repr(self.handler)) @@ -224,12 +231,12 @@ class timer(object): the same units (argh!), and we're not doing anything that hair-triggered, so rounding up is simplest. """ - if not cls.queue: + if not timer_queue: return None now = rpki.sundial.now() - if now >= cls.queue[0].when: + if now >= timer_queue[0].when: return 0 - delay = cls.queue[0].when - now + delay = timer_queue[0].when - now seconds = delay.convert_to_seconds() if delay.microseconds: seconds += 1 @@ -242,40 +249,32 @@ class timer(object): queue content, but this way we can notify subclasses that provide their own cancel() method. """ - while cls.queue: - cls.queue.pop(0).cancel() - -## @var deferred_queue -# List to hold deferred actions. We used to do this with the timer -# queue, but that appears to confuse the garbage collector, and is -# overengineering for simple deferred actions in any case. - -deferred_queue = [] + while timer_queue: + timer_queue.pop(0).cancel() -def defer(thunk): +def _raiseExitNow(signum, frame): """ - Defer an action until the next pass through the event loop. + Signal handler for event_loop(). """ - deferred_queue.append(thunk) + raise ExitNow -def run_deferred(): +def exit_event_loop(): """ - Run deferred actions. + Force exit from event_loop(). """ - while deferred_queue: - try: - deferred_queue.pop(0)() - except (ExitNow, SystemExit): - raise - except Exception, e: - rpki.log.error("Unhandled exception from deferred action %s: %s" % (e.__class__.__name__, e)) - rpki.log.traceback() + raise ExitNow -def _raiseExitNow(signum, frame): +def event_defer(handler, delay = rpki.sundial.timedelta(seconds = 0)): """ - Signal handler for event_loop(). + Use a near-term (default: zero interval) timer to schedule an event + to run after letting the I/O system have a turn. """ - raise ExitNow + timer(handler).set(delay) + +## @var debug_event_timing +# Enable insanely verbose logging of event timing + +debug_event_timing = False def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): """ @@ -289,10 +288,11 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): old = signal.signal(sig, _raiseExitNow) if save_sigs: old_signal_handlers[sig] = old - while asyncore.socket_map or deferred_queue or timer.queue: - run_deferred() - asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) - run_deferred() + while asyncore.socket_map or timer_queue: + t = timer.seconds_until_wakeup() + if debug_event_timing: + rpki.log.debug("Dismissing to asyncore.poll(), t = %s, q = %r" % (t, timer_queue)) + asyncore.poll(t, asyncore.socket_map) timer.runq() if timer.gc_debug: gc.collect() @@ -359,10 +359,6 @@ class sync_wrapper(object): def __call__(self, *args, **kwargs): def thunk(): - """ - Deferred action to call the wrapped code once event system is - running. - """ try: self.func(self.cb, self.eb, *args, **kwargs) except ExitNow: @@ -370,7 +366,7 @@ class sync_wrapper(object): except Exception, e: self.eb(e) - defer(thunk) + event_defer(thunk) event_loop() if self.err is None: return self.res @@ -379,20 +375,6 @@ class sync_wrapper(object): else: raise self.err -def exit_event_loop(): - """ - Force exit from event_loop(). - """ - raise ExitNow - -def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)): - """ - Use a near-term timer to schedule an event after letting the timer - and I/O systems run. - """ - t = timer(handler) - t.set(delay) - class gc_summary(object): """ Periodic summary of GC state, for tracking down memory bloat. |