aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpkid/rpki/async.py27
-rw-r--r--rpkid/rpki/https.py27
2 files changed, 37 insertions, 17 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index 7721634b..fce6498c 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -46,7 +46,7 @@ class iterator(object):
self.item_callback = item_callback
self.done_callback = done_callback
self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
- self.timer = timer(handler = self.doit) if unwind_stack else None
+ self.unwind_stack = unwind_stack
try:
self.iterator = iter(iterable)
except (ExitNow, SystemExit):
@@ -62,16 +62,15 @@ class iterator(object):
self.caller_file, self.caller_line, self.caller_function, id(self)))
def __call__(self):
- if self.timer is None:
- self.doit()
+ if self.unwind_stack:
+ defer(self.doit)
else:
- self.timer.set(None)
+ self.doit()
def doit(self):
try:
self.item_callback(self, self.iterator.next())
except StopIteration:
- self.timer = None
if self.done_callback is not None:
self.done_callback()
@@ -126,6 +125,7 @@ class timer(object):
"""
self.trace("Setting %r to %r" % (self, when))
if when is None:
+ rpki.log.warn("Obsolete timer usage: convert this to use rpki.async.defer() instead: %r" % self)
self.when = rpki.sundial.now()
elif isinstance(when, rpki.sundial.timedelta):
self.when = rpki.sundial.now() + when
@@ -235,6 +235,19 @@ class timer(object):
while cls.queue:
cls.queue.pop(0).cancel()
+## @var _deferred
+# List to hold deferred actions. We used to do this with the timer
+# queue, but that appears to confuse the garbage collector, and is
+# overengineering for simple deferred actions in any case.
+
+_deferred = []
+
+def defer(thunk):
+ """
+ Defer an action until the next pass through the event loop.
+ """
+ _deferred.append(thunk)
+
def _raiseExitNow(signum, frame):
"""Signal handler for event_loop()."""
raise ExitNow
@@ -249,8 +262,10 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
if timer.debug:
import gc
- while asyncore.socket_map or timer.queue:
+ while asyncore.socket_map or _deferred or timer.queue:
asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
+ while _deferred:
+ _deferred.pop(0)()
timer.runq()
if timer.debug:
gc.collect()
diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py
index 091ca752..ed1f8546 100644
--- a/rpkid/rpki/https.py
+++ b/rpkid/rpki/https.py
@@ -645,16 +645,21 @@ class http_queue(object):
self.queue.extend(requests)
def restart(self):
- if self.client is None:
- client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta)
- self.log("Attaching client %r" % client)
- self.client = client
- self.client.start()
- elif self.client.state == "idle":
- self.log("Sending request to existing client %r" % self.client)
- self.send_request()
- else:
- self.log("Client %r exists in state %r" % (self.client, self.client.state))
+ try:
+ if self.client is None:
+ client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta)
+ self.log("Attaching client %r" % client)
+ self.client = client
+ self.client.start()
+ elif self.client.state == "idle":
+ self.log("Sending request to existing client %r" % self.client)
+ self.send_request()
+ else:
+ self.log("Client %r exists in state %r" % (self.client, self.client.state))
+ except (rpki.async.ExitNow, SystemExit):
+ raise
+ except Exception, e:
+ self.return_result(e)
def send_request(self):
if self.queue:
@@ -737,7 +742,7 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback):
if debug_http:
rpki.log.debug("Scheduling connection startup for %r" % request)
- rpki.async.timer(client_queues[hostport].restart, errback).set(None)
+ rpki.async.defer(client_queues[hostport].restart)
def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None):
"""