diff options
author | Rob Austein <sra@hactrn.net> | 2010-10-08 17:26:25 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2010-10-08 17:26:25 +0000 |
commit | 7a5797231b6174ec28a4635a7f129d7acb74fc2f (patch) | |
tree | 3fbb21b51034e0edca53782e2f078305076ea7ba /rpkid.without_tls/rpki/async.py | |
parent | 4269912a916a7bead0f3f71d017448b7d6644fe0 (diff) |
Clean up after TLS flag day
svn path=/rpkid.with_tls; revision=3471
Diffstat (limited to 'rpkid.without_tls/rpki/async.py')
-rw-r--r-- | rpkid.without_tls/rpki/async.py | 411 |
1 files changed, 0 insertions, 411 deletions
diff --git a/rpkid.without_tls/rpki/async.py b/rpkid.without_tls/rpki/async.py deleted file mode 100644 index 5bff4d45..00000000 --- a/rpkid.without_tls/rpki/async.py +++ /dev/null @@ -1,411 +0,0 @@ -""" -Utilities for event-driven programming. - -$Id$ - -Copyright (C) 2009--2010 Internet Systems Consortium ("ISC") - -Permission to use, copy, modify, and distribute this software for any -purpose with or without fee is hereby granted, provided that the above -copyright notice and this permission notice appear in all copies. - -THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH -REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY -AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, -INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM -LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE -OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -PERFORMANCE OF THIS SOFTWARE. -""" - -import asyncore, signal, traceback, gc, sys -import rpki.log, rpki.sundial - -ExitNow = asyncore.ExitNow - -class iterator(object): - """ - Iteration construct for event-driven code. Takes three - arguments: - - - Some kind of iterable object - - - A callback to call on each item in the iteration - - - A callback to call after the iteration terminates. - - The item callback receives two arguments: the callable iterator - object and the current value of the iteration. It should call the - iterator (or arrange for the iterator to be called) when it is time - to continue to the next item in the iteration. - - The termination callback receives no arguments. - """ - - def __init__(self, iterable, item_callback, done_callback, unwind_stack = True): - self.item_callback = item_callback - self.done_callback = done_callback - self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] - self.unwind_stack = unwind_stack - try: - self.iterator = iter(iterable) - except (ExitNow, SystemExit): - raise - except: - rpki.log.debug("Problem constructing iterator for %r" % (iterable,)) - raise - self.doit() - - def __repr__(self): - return ("<%s created at %s:%s %s at 0x%x>" % - (self.__class__.__name__, - self.caller_file, self.caller_line, self.caller_function, id(self))) - - def __call__(self): - if self.unwind_stack: - defer(self.doit) - else: - self.doit() - - def doit(self): - """ - Implement the iterator protocol: attempt to call the item handler - with the next iteration value, call the termination handler if the - iterator signaled StopIteration. - """ - try: - self.item_callback(self, self.iterator.next()) - except StopIteration: - if self.done_callback is not None: - self.done_callback() - -class timer(object): - """ - Timer construct for event-driven code. It can be used in either of two ways: - - - As a virtual class, in which case the subclass should provide a - handler() method to receive the wakup event when the timer expires; or - - - By setting an explicit handler callback, either via the - constructor or the set_handler() method. - - Subclassing is probably more Pythonic, but setting an explict - handler turns out to be very convenient when combined with bound - methods to other objects. - """ - - ## @var gc_debug - # Verbose chatter about timers states and garbage collection. - gc_debug = False - - ## @var run_debug - # Verbose chatter about timers being run. - run_debug = False - - ## @var queue - # Timer queue, shared by all timer instances (there can be only one queue). - queue = [] - - def __init__(self, handler = None, errback = None): - if handler is not None: - self.set_handler(handler) - if errback is not None: - self.set_errback(errback) - self.when = None - if self.gc_debug: - self.trace("Creating %r" % self) - - def trace(self, msg): - """ - Debug logging. - """ - if self.gc_debug: - bt = traceback.extract_stack(limit = 3) - rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1])) - - def set(self, when): - """ - Set a timer. Argument can be a datetime, to specify an absolute - time, or a timedelta, to specify an offset time. - """ - if self.gc_debug: - self.trace("Setting %r to %r" % (self, when)) - if isinstance(when, rpki.sundial.timedelta): - self.when = rpki.sundial.now() + when - else: - self.when = when - assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) - if self not in self.queue: - self.queue.append(self) - self.queue.sort(key = lambda x: x.when) - - def __cmp__(self, other): - return cmp(id(self), id(other)) - - if gc_debug: - def __del__(self): - rpki.log.debug("Deleting %r" % self) - - def cancel(self): - """ - Cancel a timer, if it was set. - """ - if self.gc_debug: - self.trace("Canceling %r" % self) - try: - while True: - self.queue.remove(self) - except ValueError: - pass - - def is_set(self): - """Test whether this timer is currently set.""" - return self in self.queue - - def handler(self): - """ - Handle a timer that has expired. This must either be overriden by - a subclass or set dynamically by set_handler(). - """ - raise NotImplementedError - - def set_handler(self, handler): - """ - Set timer's expiration handler. This is an alternative to - subclassing the timer class, and may be easier to use when - integrating timers into other classes (eg, the handler can be a - bound method to an object in a class representing a network - connection). - """ - self.handler = handler - - def errback(self, e): - """ - Error callback. May be overridden, or set with set_errback(). - """ - rpki.log.error("Unhandled exception from timer: %s" % e) - rpki.log.traceback() - - def set_errback(self, errback): - """Set a timer's errback. Like set_handler(), for errbacks.""" - self.errback = errback - - @classmethod - def runq(cls): - """ - Run the timer queue: for each timer whose call time has passed, - pull the timer off the queue and call its handler() method. - """ - while cls.queue and rpki.sundial.now() >= cls.queue[0].when: - t = cls.queue.pop(0) - if cls.run_debug: - rpki.log.debug("Running %r" % t) - try: - t.handler() - except (ExitNow, SystemExit): - raise - except Exception, e: - t.errback(e) - - def __repr__(self): - return "<%s %r %r at 0x%x>" % (self.__class__.__name__, self.when, self.handler, id(self)) - - @classmethod - def seconds_until_wakeup(cls): - """ - Calculate delay until next timer expires, or None if no timers are - set and we should wait indefinitely. Rounds up to avoid spinning - in select() or poll(). We could calculate fractional seconds in - the right units instead, but select() and poll() don't even take - the same units (argh!), and we're not doing anything that - hair-triggered, so rounding up is simplest. - """ - if not cls.queue: - return None - now = rpki.sundial.now() - if now >= cls.queue[0].when: - return 0 - delay = cls.queue[0].when - now - seconds = delay.convert_to_seconds() - if delay.microseconds: - seconds += 1 - return seconds - - @classmethod - def clear(cls): - """ - Cancel every timer on the queue. We could just throw away the - queue content, but this way we can notify subclasses that provide - their own cancel() method. - """ - while cls.queue: - cls.queue.pop(0).cancel() - -## @var deferred_queue -# List to hold deferred actions. We used to do this with the timer -# queue, but that appears to confuse the garbage collector, and is -# overengineering for simple deferred actions in any case. - -deferred_queue = [] - -def defer(thunk): - """ - Defer an action until the next pass through the event loop. - """ - deferred_queue.append(thunk) - -def run_deferred(): - """ - Run deferred actions. - """ - while deferred_queue: - try: - deferred_queue.pop(0)() - except (ExitNow, SystemExit): - raise - except Exception, e: - rpki.log.error("Unhandled exception from deferred action: %s" % e) - rpki.log.traceback() - -def _raiseExitNow(signum, frame): - """Signal handler for event_loop().""" - raise ExitNow - -def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): - """ - Replacement for asyncore.loop(), adding timer and signal support. - """ - while True: - old_signal_handlers = {} - try: - for sig in catch_signals: - old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow) - while asyncore.socket_map or deferred_queue or timer.queue: - run_deferred() - asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) - run_deferred() - timer.runq() - if timer.gc_debug: - gc.collect() - if gc.garbage: - for i in gc.garbage: - rpki.log.debug("GC-cycle %r" % i) - del gc.garbage[:] - except ExitNow: - break - except SystemExit: - raise - except Exception, e: - rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e) - else: - break - finally: - for sig in old_signal_handlers: - signal.signal(sig, old_signal_handlers[sig]) - -class sync_wrapper(object): - """ - Synchronous wrapper around asynchronous functions. Running in - asynchronous mode at all times makes sense for event-driven daemons, - but is kind of tedious for simple scripts, hence this wrapper. - - The wrapped function should take at least two arguments: a callback - function and an errback function. If any arguments are passed to - the wrapper, they will be passed as additional arguments to the - wrapped function. - """ - - res = None - err = None - - def __init__(self, func): - self.func = func - - def cb(self, res = None): - """ - Wrapped code has requested normal termination. Store result, and - exit the event loop. - """ - self.res = res - raise ExitNow - - def eb(self, err): - """ - Wrapped code raised an exception. Store exception data, then exit - the event loop. - """ - exc_info = sys.exc_info() - self.err = exc_info if exc_info[1] is err else err - raise ExitNow - - def __call__(self, *args, **kwargs): - - def thunk(): - """ - Deferred action to call the wrapped code once event system is - running. - """ - try: - self.func(self.cb, self.eb, *args, **kwargs) - except ExitNow: - raise - except Exception, e: - self.eb(e) - - defer(thunk) - event_loop() - if self.err is not None: - if isinstance(self.err, tuple): - raise self.err[0], self.err[1], self.err[2] - else: - raise self.err - else: - return self.res - -def exit_event_loop(): - """Force exit from event_loop().""" - raise ExitNow - -class gc_summary(object): - """ - Periodic summary of GC state, for tracking down memory bloat. - """ - - def __init__(self, interval, threshold = 0): - if isinstance(interval, (int, long)): - interval = rpki.sundial.timedelta(seconds = interval) - self.interval = interval - self.threshold = threshold - self.timer = timer(handler = self.handler) - self.timer.set(self.interval) - - def handler(self): - """ - Collect and log GC state for this period, reset timer. - """ - rpki.log.debug("gc_summary: Running gc.collect()") - gc.collect() - rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold) - total = {} - tuples = {} - for g in gc.get_objects(): - k = type(g).__name__ - total[k] = total.get(k, 0) + 1 - if isinstance(g, tuple): - k = ", ".join(type(x).__name__ for x in g) - tuples[k] = tuples.get(k, 0) + 1 - rpki.log.debug("gc_summary: Sorting result") - total = total.items() - total.sort(reverse = True, key = lambda x: x[1]) - tuples = tuples.items() - tuples.sort(reverse = True, key = lambda x: x[1]) - rpki.log.debug("gc_summary: Object type counts in descending order") - for name, count in total: - if count > self.threshold: - rpki.log.debug("gc_summary: %8d %s" % (count, name)) - rpki.log.debug("gc_summary: Tuple content type signature counts in descending order") - for types, count in tuples: - if count > self.threshold: - rpki.log.debug("gc_summary: %8d (%s)" % (count, types)) - rpki.log.debug("gc_summary: Scheduling next cycle") - self.timer.set(self.interval) |