aboutsummaryrefslogtreecommitdiff
path: root/rpki/async.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-10-22 02:02:23 +0000
committerRob Austein <sra@hactrn.net>2015-10-22 02:02:23 +0000
commit32d43381dfc0370acb774951f9cdd0cdb1ab7f1b (patch)
treecfb37241df8d869ed08a24b3021a4a599cb98bdb /rpki/async.py
parent1ca142b706eab552f9e2c575ef15d1862516393b (diff)
First cut at replacing rpkid's HTTP and I/O system with Tornado. Not
quite working perfectly yet (cron is a bit wonky) but manages to produce an initial set of ROAs without thowing any exceptions, and code is already much cleaner than the old callback-based horror. svn path=/branches/tk705/; revision=6139
Diffstat (limited to 'rpki/async.py')
-rw-r--r--rpki/async.py382
1 files changed, 0 insertions, 382 deletions
diff --git a/rpki/async.py b/rpki/async.py
deleted file mode 100644
index 74143bd1..00000000
--- a/rpki/async.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# $Id$
-#
-# Copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notice and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-# AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-# PERFORMANCE OF THIS SOFTWARE.
-
-"""
-Utilities for event-driven programming.
-"""
-
-import gc
-import sys
-import signal
-import logging
-import asyncore
-import traceback
-import rpki.log
-import rpki.sundial
-
-logger = logging.getLogger(__name__)
-
-ExitNow = asyncore.ExitNow
-
-class iterator(object):
- """
- Iteration construct for event-driven code. Takes three
- arguments:
-
- - Some kind of iterable object
-
- - A callback to call on each item in the iteration
-
- - A callback to call after the iteration terminates.
-
- The item callback receives two arguments: the callable iterator
- object and the current value of the iteration. It should call the
- iterator (or arrange for the iterator to be called) when it is time
- to continue to the next item in the iteration.
-
- The termination callback receives no arguments.
-
- Special case for memory constrained cases: if keyword argument
- pop_list is True, iterable must be a list, which is modified in
- place, popping items off of it until it's empty.
- """
-
- def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False):
- assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list"
- self.item_callback = item_callback
- self.done_callback = done_callback if done_callback is not None else lambda: None
- self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
- self.unwind_stack = unwind_stack
- self.pop_list = pop_list
- try:
- if self.pop_list:
- self.iterator = iterable
- else:
- self.iterator = iter(iterable)
- except (ExitNow, SystemExit):
- raise
- except Exception:
- logger.debug("Problem constructing iterator for %s", repr(iterable))
- raise
- self.doit()
-
- def __repr__(self):
- return rpki.log.log_repr(self,
- "created at %s:%s" % (self.caller_file,
- self.caller_line),
- self.caller_function)
-
- def __call__(self):
- if self.unwind_stack:
- event_defer(self.doit)
- else:
- self.doit()
-
- def doit(self):
- """
- Implement the iterator protocol: attempt to call the item handler
- with the next iteration value, call the termination handler if the
- iterator signaled StopIteration.
- """
-
- try:
- if self.pop_list:
- val = self.iterator.pop(0)
- else:
- val = self.iterator.next()
- except (IndexError, StopIteration):
- self.done_callback()
- else:
- self.item_callback(self, val)
-
-## @var timer_queue
-# Timer queue.
-
-timer_queue = []
-
-class timer(object):
- """
- Timer construct for event-driven code.
- """
-
- ## @var gc_debug
- # Verbose chatter about timers states and garbage collection.
- gc_debug = False
-
- ## @var run_debug
- # Verbose chatter about timers being run.
- run_debug = False
-
- def __init__(self, handler = None, errback = None):
- self.set_handler(handler)
- self.set_errback(errback)
- self.when = None
- if self.gc_debug:
- self.trace("Creating %r" % self)
-
- def trace(self, msg):
- """
- Debug logging.
- """
-
- if self.gc_debug:
- bt = traceback.extract_stack(limit = 3)
- logger.debug("%s from %s:%d", msg, bt[0][0], bt[0][1])
-
- def set(self, when):
- """
- Set a timer. Argument can be a datetime, to specify an absolute
- time, or a timedelta, to specify an offset time.
- """
-
- if self.gc_debug:
- self.trace("Setting %r to %r" % (self, when))
- if isinstance(when, rpki.sundial.timedelta):
- self.when = rpki.sundial.now() + when
- else:
- self.when = when
- assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
- if self not in timer_queue:
- timer_queue.append(self)
- timer_queue.sort(key = lambda x: x.when)
-
- def __cmp__(self, other):
- return cmp(id(self), id(other))
-
- if gc_debug:
- def __del__(self):
- logger.debug("Deleting %r", self)
-
- def cancel(self):
- """
- Cancel a timer, if it was set.
- """
-
- if self.gc_debug:
- self.trace("Canceling %r" % self)
- try:
- while True:
- timer_queue.remove(self)
- except ValueError:
- pass
-
- def is_set(self):
- """
- Test whether this timer is currently set.
- """
-
- return self in timer_queue
-
- def set_handler(self, handler):
- """
- Set timer's expiration handler. This is an alternative to
- subclassing the timer class, and may be easier to use when
- integrating timers into other classes (eg, the handler can be a
- bound method to an object in a class representing a network
- connection).
- """
-
- self.handler = handler
-
- def set_errback(self, errback):
- """
- Set a timer's errback. Like set_handler(), for errbacks.
- """
-
- self.errback = errback
-
- @classmethod
- def runq(cls):
- """
- Run the timer queue: for each timer whose call time has passed,
- pull the timer off the queue and call its handler() method.
-
- Comparisions are made against time at which this function was
- called, so that even if new events keep getting scheduled, we'll
- return to the I/O loop reasonably quickly.
- """
-
- now = rpki.sundial.now()
- while timer_queue and now >= timer_queue[0].when:
- t = timer_queue.pop(0)
- if cls.run_debug:
- logger.debug("Running %r", t)
- try:
- if t.handler is not None:
- t.handler()
- else:
- logger.warning("Timer %r expired with no handler set", t)
- except (ExitNow, SystemExit):
- raise
- except Exception, e:
- if t.errback is not None:
- t.errback(e)
- else:
- logger.exception("Unhandled exception from timer %r", t)
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.when, repr(self.handler))
-
- @classmethod
- def seconds_until_wakeup(cls):
- """
- Calculate delay until next timer expires, or None if no timers are
- set and we should wait indefinitely. Rounds up to avoid spinning
- in select() or poll(). We could calculate fractional seconds in
- the right units instead, but select() and poll() don't even take
- the same units (argh!), and we're not doing anything that
- hair-triggered, so rounding up is simplest.
- """
-
- if not timer_queue:
- return None
- now = rpki.sundial.now()
- if now >= timer_queue[0].when:
- return 0
- delay = timer_queue[0].when - now
- seconds = delay.convert_to_seconds()
- if delay.microseconds:
- seconds += 1
- return seconds
-
- @classmethod
- def clear(cls):
- """
- Cancel every timer on the queue. We could just throw away the
- queue content, but this way we can notify subclasses that provide
- their own cancel() method.
- """
-
- while timer_queue:
- timer_queue.pop(0).cancel()
-
-def _raiseExitNow(signum, frame):
- """
- Signal handler for event_loop().
- """
-
- raise ExitNow
-
-def exit_event_loop():
- """
- Force exit from event_loop().
- """
-
- raise ExitNow
-
-def event_defer(handler, delay = rpki.sundial.timedelta(seconds = 0)):
- """
- Use a near-term (default: zero interval) timer to schedule an event
- to run after letting the I/O system have a turn.
- """
-
- timer(handler).set(delay)
-
-## @var debug_event_timing
-# Enable insanely verbose logging of event timing
-
-debug_event_timing = False
-
-def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
- """
- Replacement for asyncore.loop(), adding timer and signal support.
- """
-
- old_signal_handlers = {}
- while True:
- save_sigs = len(old_signal_handlers) == 0
- try:
- for sig in catch_signals:
- old = signal.signal(sig, _raiseExitNow)
- if save_sigs:
- old_signal_handlers[sig] = old
- while asyncore.socket_map or timer_queue:
- t = timer.seconds_until_wakeup()
- if debug_event_timing:
- logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
- asyncore.poll(t, asyncore.socket_map)
- timer.runq()
- if timer.gc_debug:
- gc.collect()
- if gc.garbage:
- for i in gc.garbage:
- logger.debug("GC-cycle %r", i)
- del gc.garbage[:]
- except ExitNow:
- break
- except SystemExit:
- raise
- except ValueError, e:
- if str(e) == "filedescriptor out of range in select()":
- logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
- logger.error("Content of asyncore.socket_map:")
- for fd in sorted(asyncore.socket_map.iterkeys()):
- logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
- logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
- sys.exit(1)
- logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
- except Exception, e:
- logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
- else:
- break
- finally:
- for sig in old_signal_handlers:
- signal.signal(sig, old_signal_handlers[sig])
-
-class gc_summary(object):
- """
- Periodic summary of GC state, for tracking down memory bloat.
- """
-
- def __init__(self, interval, threshold = 0):
- if isinstance(interval, (int, long)):
- interval = rpki.sundial.timedelta(seconds = interval)
- self.interval = interval
- self.threshold = threshold
- self.timer = timer(handler = self.handler)
- self.timer.set(self.interval)
-
- def handler(self):
- """
- Collect and log GC state for this period, reset timer.
- """
-
- logger.debug("gc_summary: Running gc.collect()")
- gc.collect()
- logger.debug("gc_summary: Summarizing (threshold %d)", self.threshold)
- total = {}
- tuples = {}
- for g in gc.get_objects():
- k = type(g).__name__
- total[k] = total.get(k, 0) + 1
- if isinstance(g, tuple):
- k = ", ".join(type(x).__name__ for x in g)
- tuples[k] = tuples.get(k, 0) + 1
- logger.debug("gc_summary: Sorting result")
- total = total.items()
- total.sort(reverse = True, key = lambda x: x[1])
- tuples = tuples.items()
- tuples.sort(reverse = True, key = lambda x: x[1])
- logger.debug("gc_summary: Object type counts in descending order")
- for name, count in total:
- if count > self.threshold:
- logger.debug("gc_summary: %8d %s", count, name)
- logger.debug("gc_summary: Tuple content type signature counts in descending order")
- for types, count in tuples:
- if count > self.threshold:
- logger.debug("gc_summary: %8d (%s)", count, types)
- logger.debug("gc_summary: Scheduling next cycle")
- self.timer.set(self.interval)