aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/async.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-19 16:12:26 +0000
committerRob Austein <sra@hactrn.net>2012-08-19 16:12:26 +0000
commit01bc52850438c23b31347440825dedfcd125bfbb (patch)
tree22628b29003006f0aed50fb43b37d21d81d7465c /rpkid/rpki/async.py
parentb2eb70ea0d75d8e969b29502a264704599e790d7 (diff)
Simplify core I/O loop, back to what I originally intended before
accidently digging myself into a hole with a circular reference that confused Python's garbage collector. See #275. This version is much too noisy, and could use some cleanup, but basic code seems to work, and I want to test it on multiple machines in parallel, so checking it in now. svn path=/branches/tk274/; revision=4642
Diffstat (limited to 'rpkid/rpki/async.py')
-rw-r--r--rpkid/rpki/async.py94
1 files changed, 37 insertions, 57 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index c2a02e4f..7563f589 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -94,6 +94,12 @@ class iterator(object):
if self.done_callback is not None:
self.done_callback()
+
+## @var queue
+# Timer queue.
+
+timer_queue = []
+
class timer(object):
"""
Timer construct for event-driven code. It can be used in either of two ways:
@@ -117,10 +123,6 @@ class timer(object):
# Verbose chatter about timers being run.
run_debug = False
- ## @var queue
- # Timer queue, shared by all timer instances (there can be only one queue).
- queue = []
-
def __init__(self, handler = None, errback = None):
if handler is not None:
self.set_handler(handler)
@@ -150,9 +152,9 @@ class timer(object):
else:
self.when = when
assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
- if self not in self.queue:
- self.queue.append(self)
- self.queue.sort(key = lambda x: x.when)
+ if self not in timer_queue:
+ timer_queue.append(self)
+ timer_queue.sort(key = lambda x: x.when)
def __cmp__(self, other):
return cmp(id(self), id(other))
@@ -169,7 +171,7 @@ class timer(object):
self.trace("Canceling %r" % self)
try:
while True:
- self.queue.remove(self)
+ timer_queue.remove(self)
except ValueError:
pass
@@ -177,7 +179,7 @@ class timer(object):
"""
Test whether this timer is currently set.
"""
- return self in self.queue
+ return self in timer_queue
def handler(self):
"""
@@ -215,8 +217,8 @@ class timer(object):
Run the timer queue: for each timer whose call time has passed,
pull the timer off the queue and call its handler() method.
"""
- while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
- t = cls.queue.pop(0)
+ if timer_queue and rpki.sundial.now() >= timer_queue[0].when:
+ t = timer_queue.pop(0)
if cls.run_debug:
rpki.log.debug("Running %r" % t)
try:
@@ -239,12 +241,12 @@ class timer(object):
the same units (argh!), and we're not doing anything that
hair-triggered, so rounding up is simplest.
"""
- if not cls.queue:
+ if not timer_queue:
return None
now = rpki.sundial.now()
- if now >= cls.queue[0].when:
+ if now >= timer_queue[0].when:
return 0
- delay = cls.queue[0].when - now
+ delay = timer_queue[0].when - now
seconds = delay.convert_to_seconds()
if delay.microseconds:
seconds += 1
@@ -257,40 +259,32 @@ class timer(object):
queue content, but this way we can notify subclasses that provide
their own cancel() method.
"""
- while cls.queue:
- cls.queue.pop(0).cancel()
-
-## @var deferred_queue
-# List to hold deferred actions. We used to do this with the timer
-# queue, but that appears to confuse the garbage collector, and is
-# overengineering for simple deferred actions in any case.
+ while timer_queue:
+ timer_queue.pop(0).cancel()
-deferred_queue = []
-
-def defer(thunk):
+def _raiseExitNow(signum, frame):
"""
- Defer an action until the next pass through the event loop.
+ Signal handler for event_loop().
"""
- deferred_queue.append(thunk)
+ raise ExitNow
-def run_deferred():
+def exit_event_loop():
"""
- Run deferred actions.
+ Force exit from event_loop().
"""
- while deferred_queue:
- try:
- deferred_queue.pop(0)()
- except (ExitNow, SystemExit):
- raise
- except Exception, e:
- rpki.log.error("Unhandled exception from deferred action %s: %s" % (e.__class__.__name__, e))
- rpki.log.traceback()
+ raise ExitNow
-def _raiseExitNow(signum, frame):
+def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 0)):
"""
- Signal handler for event_loop().
+ Use a near-term timer to schedule an event after letting the timer
+ and I/O systems run.
"""
- raise ExitNow
+ t = timer(handler)
+ t.set(delay)
+
+# Backwards compatability -- clean this up if the change works
+
+defer = event_yield
def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
"""
@@ -304,10 +298,10 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
old = signal.signal(sig, _raiseExitNow)
if save_sigs:
old_signal_handlers[sig] = old
- while asyncore.socket_map or deferred_queue or timer.queue:
- run_deferred()
- asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
- run_deferred()
+ while asyncore.socket_map or timer_queue:
+ t = timer.seconds_until_wakeup()
+ rpki.log.debug("Dismissing to asyncore.poll(), t = %s, q = %r" % (t, timer_queue))
+ asyncore.poll(t, asyncore.socket_map)
timer.runq()
if timer.gc_debug:
gc.collect()
@@ -394,20 +388,6 @@ class sync_wrapper(object):
else:
raise self.err
-def exit_event_loop():
- """
- Force exit from event_loop().
- """
- raise ExitNow
-
-def event_yield(handler, delay = rpki.sundial.timedelta(seconds = 2)):
- """
- Use a near-term timer to schedule an event after letting the timer
- and I/O systems run.
- """
- t = timer(handler)
- t.set(delay)
-
class gc_summary(object):
"""
Periodic summary of GC state, for tracking down memory bloat.