aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/async.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/async.py')
-rw-r--r--rpkid/rpki/async.py196
1 files changed, 89 insertions, 107 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index 5eaa34f9..aee7770f 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -3,7 +3,7 @@ Utilities for event-driven programming.
$Id$
-Copyright (C) 2009--2011 Internet Systems Consortium ("ISC")
+Copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -18,8 +18,13 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
-import asyncore, signal, traceback, gc, sys
-import rpki.log, rpki.sundial
+import asyncore
+import signal
+import traceback
+import gc
+import sys
+import rpki.log
+import rpki.sundial
ExitNow = asyncore.ExitNow
@@ -40,15 +45,24 @@ class iterator(object):
to continue to the next item in the iteration.
The termination callback receives no arguments.
+
+ Special case for memory constrained cases: if keyword argument
+ pop_list is True, iterable must be a list, which is modified in
+ place, popping items off of it until it's empty.
"""
- def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
+ def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False):
+ assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list"
self.item_callback = item_callback
- self.done_callback = done_callback
+ self.done_callback = done_callback if done_callback is not None else lambda: None
self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
self.unwind_stack = unwind_stack
+ self.pop_list = pop_list
try:
- self.iterator = iter(iterable)
+ if self.pop_list:
+ self.iterator = iterable
+ else:
+ self.iterator = iter(iterable)
except (ExitNow, SystemExit):
raise
except Exception:
@@ -57,13 +71,14 @@ class iterator(object):
self.doit()
def __repr__(self):
- return ("<%s created at %s:%s %s at 0x%x>" %
- (self.__class__.__name__,
- self.caller_file, self.caller_line, self.caller_function, id(self)))
+ return rpki.log.log_repr(self,
+ "created at %s:%s" % (self.caller_file,
+ self.caller_line),
+ self.caller_function)
def __call__(self):
if self.unwind_stack:
- defer(self.doit)
+ event_defer(self.doit)
else:
self.doit()
@@ -73,25 +88,25 @@ class iterator(object):
with the next iteration value, call the termination handler if the
iterator signaled StopIteration.
"""
- try:
- self.item_callback(self, self.iterator.next())
- except StopIteration:
- if self.done_callback is not None:
- self.done_callback()
-class timer(object):
- """
- Timer construct for event-driven code. It can be used in either of two ways:
+ try:
+ if self.pop_list:
+ val = self.iterator.pop(0)
+ else:
+ val = self.iterator.next()
+ except (IndexError, StopIteration):
+ self.done_callback()
+ else:
+ self.item_callback(self, val)
- - As a virtual class, in which case the subclass should provide a
- handler() method to receive the wakup event when the timer expires; or
+## @var timer_queue
+# Timer queue.
- - By setting an explicit handler callback, either via the
- constructor or the set_handler() method.
+timer_queue = []
- Subclassing is probably more Pythonic, but setting an explict
- handler turns out to be very convenient when combined with bound
- methods to other objects.
+class timer(object):
+ """
+ Timer construct for event-driven code.
"""
## @var gc_debug
@@ -102,15 +117,9 @@ class timer(object):
# Verbose chatter about timers being run.
run_debug = False
- ## @var queue
- # Timer queue, shared by all timer instances (there can be only one queue).
- queue = []
-
def __init__(self, handler = None, errback = None):
- if handler is not None:
- self.set_handler(handler)
- if errback is not None:
- self.set_errback(errback)
+ self.set_handler(handler)
+ self.set_errback(errback)
self.when = None
if self.gc_debug:
self.trace("Creating %r" % self)
@@ -135,9 +144,9 @@ class timer(object):
else:
self.when = when
assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
- if self not in self.queue:
- self.queue.append(self)
- self.queue.sort(key = lambda x: x.when)
+ if self not in timer_queue:
+ timer_queue.append(self)
+ timer_queue.sort(key = lambda x: x.when)
def __cmp__(self, other):
return cmp(id(self), id(other))
@@ -154,7 +163,7 @@ class timer(object):
self.trace("Canceling %r" % self)
try:
while True:
- self.queue.remove(self)
+ timer_queue.remove(self)
except ValueError:
pass
@@ -162,14 +171,7 @@ class timer(object):
"""
Test whether this timer is currently set.
"""
- return self in self.queue
-
- def handler(self):
- """
- Handle a timer that has expired. This must either be overriden by
- a subclass or set dynamically by set_handler().
- """
- raise NotImplementedError
+ return self in timer_queue
def set_handler(self, handler):
"""
@@ -181,13 +183,6 @@ class timer(object):
"""
self.handler = handler
- def errback(self, e):
- """
- Error callback. May be overridden, or set with set_errback().
- """
- rpki.log.error("Unhandled exception from timer: %s" % e)
- rpki.log.traceback()
-
def set_errback(self, errback):
"""
Set a timer's errback. Like set_handler(), for errbacks.
@@ -199,17 +194,29 @@ class timer(object):
"""
Run the timer queue: for each timer whose call time has passed,
pull the timer off the queue and call its handler() method.
+
+ Comparisions are made against time at which this function was
+ called, so that even if new events keep getting scheduled, we'll
+ return to the I/O loop reasonably quickly.
"""
- while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
- t = cls.queue.pop(0)
+ now = rpki.sundial.now()
+ while timer_queue and now >= timer_queue[0].when:
+ t = timer_queue.pop(0)
if cls.run_debug:
rpki.log.debug("Running %r" % t)
try:
- t.handler()
+ if t.handler is not None:
+ t.handler()
+ else:
+ rpki.log.warn("Timer %r expired with no handler set" % t)
except (ExitNow, SystemExit):
raise
except Exception, e:
- t.errback(e)
+ if t.errback is not None:
+ t.errback(e)
+ else:
+ rpki.log.error("Unhandled exception from timer %r: %s" % (t, e))
+ rpki.log.traceback()
def __repr__(self):
return rpki.log.log_repr(self, self.when, repr(self.handler))
@@ -224,12 +231,12 @@ class timer(object):
the same units (argh!), and we're not doing anything that
hair-triggered, so rounding up is simplest.
"""
- if not cls.queue:
+ if not timer_queue:
return None
now = rpki.sundial.now()
- if now >= cls.queue[0].when:
+ if now >= timer_queue[0].when:
return 0
- delay = cls.queue[0].when - now
+ delay = timer_queue[0].when - now
seconds = delay.convert_to_seconds()
if delay.microseconds:
seconds += 1
@@ -242,40 +249,32 @@ class timer(object):
queue content, but this way we can notify subclasses that provide
their own cancel() method.
"""
- while cls.queue:
- cls.queue.pop(0).cancel()
-
-## @var deferred_queue
-# List to hold deferred actions. We used to do this with the timer
-# queue, but that appears to confuse the garbage collector, and is
-# overengineering for simple deferred actions in any case.
-
-deferred_queue = []
+ while timer_queue:
+ timer_queue.pop(0).cancel()
-def defer(thunk):
+def _raiseExitNow(signum, frame):
"""
- Defer an action until the next pass through the event loop.
+ Signal handler for event_loop().
"""
- deferred_queue.append(thunk)
+ raise ExitNow
-def run_deferred():
+def exit_event_loop():
"""
- Run deferred actions.
+ Force exit from event_loop().
"""
- while deferred_queue:
- try:
- deferred_queue.pop(0)()
- except (ExitNow, SystemExit):
- raise
- except Exception, e:
- rpki.log.error("Unhandled exception from deferred action %s: %s" % (e.__class__.__name__, e))
- rpki.log.traceback()
+ raise ExitNow
-def _raiseExitNow(signum, frame):
+def event_defer(handler, delay = rpki.sundial.timedelta(seconds = 0)):
"""
- Signal handler for event_loop().
+ Use a near-term (default: zero interval) timer to schedule an event
+ to run after letting the I/O system have a turn.
"""
- raise ExitNow
+ timer(handler).set(delay)
+
+## @var debug_event_timing
+# Enable insanely verbose logging of event timing
+
+debug_event_timing = False
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
@@ -289,10 +288,11 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
- while asyncore.socket_map or deferred_queue or timer.queue:
- run_deferred()
- asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
- run_deferred()
+ while asyncore.socket_map or timer_queue:
+ t = timer.seconds_until_wakeup()
+ if debug_event_timing:
+ rpki.log.debug("Dismissing to asyncore.poll(), t = %s, q = %r" % (t, timer_queue))
+ asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
@@ -359,10 +359,6 @@ class sync_wrapper(object):
def __call__(self, *args, **kwargs):
def thunk():
- """
- Deferred action to call the wrapped code once event system is
- running.
- """
try:
self.func(self.cb, self.eb, *args, **kwargs)
except ExitNow:
@@ -370,7 +366,7 @@ class sync_wrapper(object):
except Exception, e:
self.eb(e)
- defer(thunk)
+ event_defer(thunk)
event_loop()
if self.err is None:
return self.res
@@ -379,20 +375,6 @@ class sync_wrapper(object):
else:
raise self.err
-def exit_event_loop():
- """
- Force exit from event_loop().
- """
- raise ExitNow
-
-def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)):
- """
- Use a near-term timer to schedule an event after letting the timer
- and I/O systems run.
- """
- t = timer(handler)
- t.set(delay)
-
class gc_summary(object):
"""
Periodic summary of GC state, for tracking down memory bloat.