diff options
-rw-r--r-- | rpkid/rpki/async.py | 27 | ||||
-rw-r--r-- | rpkid/rpki/https.py | 27 |
2 files changed, 37 insertions, 17 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index 7721634b..fce6498c 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -46,7 +46,7 @@ class iterator(object): self.item_callback = item_callback self.done_callback = done_callback self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] - self.timer = timer(handler = self.doit) if unwind_stack else None + self.unwind_stack = unwind_stack try: self.iterator = iter(iterable) except (ExitNow, SystemExit): @@ -62,16 +62,15 @@ class iterator(object): self.caller_file, self.caller_line, self.caller_function, id(self))) def __call__(self): - if self.timer is None: - self.doit() + if self.unwind_stack: + defer(self.doit) else: - self.timer.set(None) + self.doit() def doit(self): try: self.item_callback(self, self.iterator.next()) except StopIteration: - self.timer = None if self.done_callback is not None: self.done_callback() @@ -126,6 +125,7 @@ class timer(object): """ self.trace("Setting %r to %r" % (self, when)) if when is None: + rpki.log.warn("Obsolete timer usage: convert this to use rpki.async.defer() instead: %r" % self) self.when = rpki.sundial.now() elif isinstance(when, rpki.sundial.timedelta): self.when = rpki.sundial.now() + when @@ -235,6 +235,19 @@ class timer(object): while cls.queue: cls.queue.pop(0).cancel() +## @var _deferred +# List to hold deferred actions. We used to do this with the timer +# queue, but that appears to confuse the garbage collector, and is +# overengineering for simple deferred actions in any case. + +_deferred = [] + +def defer(thunk): + """ + Defer an action until the next pass through the event loop. + """ + _deferred.append(thunk) + def _raiseExitNow(signum, frame): """Signal handler for event_loop().""" raise ExitNow @@ -249,8 +262,10 @@ def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow) if timer.debug: import gc - while asyncore.socket_map or timer.queue: + while asyncore.socket_map or _deferred or timer.queue: asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) + while _deferred: + _deferred.pop(0)() timer.runq() if timer.debug: gc.collect() diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py index 091ca752..ed1f8546 100644 --- a/rpkid/rpki/https.py +++ b/rpkid/rpki/https.py @@ -645,16 +645,21 @@ class http_queue(object): self.queue.extend(requests) def restart(self): - if self.client is None: - client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta) - self.log("Attaching client %r" % client) - self.client = client - self.client.start() - elif self.client.state == "idle": - self.log("Sending request to existing client %r" % self.client) - self.send_request() - else: - self.log("Client %r exists in state %r" % (self.client, self.client.state)) + try: + if self.client is None: + client = http_client(self, self.hostport, cert = self.cert, key = self.key, ta = self.ta) + self.log("Attaching client %r" % client) + self.client = client + self.client.start() + elif self.client.state == "idle": + self.log("Sending request to existing client %r" % self.client) + self.send_request() + else: + self.log("Client %r exists in state %r" % (self.client, self.client.state)) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + self.return_result(e) def send_request(self): if self.queue: @@ -737,7 +742,7 @@ def client(msg, client_key, client_cert, server_ta, url, callback, errback): if debug_http: rpki.log.debug("Scheduling connection startup for %r" % request) - rpki.async.timer(client_queues[hostport].restart, errback).set(None) + rpki.async.defer(client_queues[hostport].restart) def server(handlers, server_key, server_cert, port, host ="", client_ta = (), dynamic_https_trust_anchor = None): """ |