00001 """
00002 Utilities for event-driven programming.
00003
00004 $Id: async.py 2571 2009-07-04 20:13:22Z sra $
00005
00006 Copyright (C) 2009 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020
00021 import asyncore, signal, traceback
00022 import rpki.log, rpki.sundial
00023
00024 ExitNow = asyncore.ExitNow
00025
00026 class iterator(object):
00027 """
00028 Iteration construct for event-driven code. Takes three
00029 arguments:
00030
00031 - Some kind of iterable object
00032
00033 - A callback to call on each item in the iteration
00034
00035 - A callback to call after the iteration terminates.
00036
00037 The item callback receives two arguments: the callable iterator
00038 object and the current value of the iteration. It should call the
00039 iterator (or arrange for the iterator to be called) when it is time
00040 to continue to the next item in the iteration.
00041
00042 The termination callback receives no arguments.
00043 """
00044
00045 def __init__(self, iterable, item_callback, done_callback):
00046 self.item_callback = item_callback
00047 self.done_callback = done_callback
00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049
00050 try:
00051 self.iterator = iter(iterable)
00052 except (ExitNow, SystemExit):
00053 raise
00054 except:
00055 rpki.log.debug("Problem constructing iterator for %s" % repr(iterable))
00056 raise
00057 self()
00058
00059 def __repr__(self):
00060 return "<asynciterator created at %s:%d %s at 0x%x>" % (self.caller_file, self.caller_line, self.caller_function, id(self))
00061
00062 def __call__(self, *args):
00063 if args != ():
00064 rpki.log.warn("Arguments passed to %r: %r" % (self, args))
00065 for x in traceback.format_stack():
00066 rpki.log.warn(x.strip())
00067 assert args == ()
00068 try:
00069 self.item_callback(self, self.iterator.next())
00070 except StopIteration:
00071 if self.done_callback is not None:
00072 self.done_callback()
00073
00074 def ignore(self, ignored):
00075 self()
00076
00077 class timer(object):
00078 """
00079 Timer construct for event-driven code. It can be used in either of two ways:
00080
00081 - As a virtual class, in which case the subclass should provide a
00082 handler() method to receive the wakup event when the timer expires; or
00083
00084 - By setting an explicit handler callback, either via the
00085 constructor or the set_handler() method.
00086
00087 Subclassing is probably more Pythonic, but setting an explict
00088 handler turns out to be very convenient when combined with bound
00089 methods to other objects.
00090 """
00091
00092
00093
00094
00095 queue = []
00096
00097 def __init__(self, handler = None, errback = None):
00098 if handler is not None:
00099 self.set_handler(handler)
00100 if errback is not None:
00101 self.set_errback(errback)
00102
00103 def set(self, when):
00104 """
00105 Set a timer. Argument can be a datetime, to specify an absolute
00106 time, a timedelta, to specify an offset time, or None, to indicate
00107 that the timer should expire immediately, which can be useful in
00108 avoiding an excessively deep call stack.
00109 """
00110 if when is None:
00111 self.when = rpki.sundial.now()
00112 elif isinstance(when, rpki.sundial.timedelta):
00113 self.when = rpki.sundial.now() + when
00114 else:
00115 self.when = when
00116 assert isinstance(self.when, rpki.sundial.datetime)
00117 if self not in self.queue:
00118 self.queue.append(self)
00119 self.queue.sort()
00120
00121 def __cmp__(self, other):
00122 return cmp(self.when, other.when)
00123
00124 def cancel(self):
00125 """
00126 Cancel a timer, if it was set.
00127 """
00128 try:
00129 self.queue.remove(self)
00130 except ValueError:
00131 pass
00132
00133 def is_set(self):
00134 """Test whether this timer is currently set."""
00135 return self in self.queue
00136
00137 def handler(self):
00138 """
00139 Handle a timer that has expired. This must either be overriden by
00140 a subclass or set dynamically by set_handler().
00141 """
00142 raise NotImplementedError
00143
00144 def set_handler(self, handler):
00145 """
00146 Set timer's expiration handler. This is an alternative to
00147 subclassing the timer class, and may be easier to use when
00148 integrating timers into other classes (eg, the handler can be a
00149 bound method to an object in a class representing a network
00150 connection).
00151 """
00152 self.handler = handler
00153
00154 def errback(self, e):
00155 """
00156 Error callback. May be overridden, or set with set_errback().
00157 """
00158 rpki.log.error("Unhandled exception from timer: %s" % e)
00159 rpki.log.traceback()
00160
00161 def set_errback(self, errback):
00162 """Set a timer's errback. Like set_handler(), for errbacks."""
00163 self.errback = errback
00164
00165 @classmethod
00166 def runq(cls):
00167 """
00168 Run the timer queue: for each timer whose call time has passed,
00169 pull the timer off the queue and call its handler() method.
00170 """
00171 while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00172 t = cls.queue.pop(0)
00173 try:
00174 t.handler()
00175 except (ExitNow, SystemExit):
00176 raise
00177 except Exception, e:
00178 t.errback(e)
00179
00180 def __repr__(self):
00181 return "<%s %r %r>" % (self.__class__.__name__, self.when, self.handler)
00182
00183 @classmethod
00184 def seconds_until_wakeup(cls):
00185 """
00186 Calculate delay until next timer expires, or None if no timers are
00187 set and we should wait indefinitely. Rounds up to avoid spinning
00188 in select() or poll(). We could calculate fractional seconds in
00189 the right units instead, but select() and poll() don't even take
00190 the same units (argh!), and we're not doing anything that
00191 hair-triggered, so rounding up is simplest.
00192 """
00193 if not cls.queue:
00194 return None
00195 now = rpki.sundial.now()
00196 if now >= cls.queue[0].when:
00197 return 0
00198 else:
00199 delay = cls.queue[0].when - now
00200 seconds = delay.convert_to_seconds()
00201 if delay.microseconds:
00202 seconds += 1
00203 return seconds
00204
00205 @classmethod
00206 def clear(cls):
00207 """
00208 Cancel every timer on the queue. We could just throw away the
00209 queue content, but this way we can notify subclasses that provide
00210 their own cancel() method.
00211 """
00212 while cls.queue:
00213 cls.queue.pop(0).cancel()
00214
00215 def _raiseExitNow(signum, frame):
00216 """Signal handler for event_loop()."""
00217 raise ExitNow
00218
00219 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00220 """
00221 Replacement for asyncore.loop(), adding timer and signal support.
00222 """
00223 old_signal_handlers = {}
00224 try:
00225 for sig in catch_signals:
00226 old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
00227 while asyncore.socket_map or timer.queue:
00228 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00229 timer.runq()
00230 except ExitNow:
00231 pass
00232 finally:
00233 for sig in old_signal_handlers:
00234 signal.signal(sig, old_signal_handlers[sig])
00235
00236 class sync_wrapper(object):
00237 """
00238 Synchronous wrapper around asynchronous functions. Running in
00239 asynchronous mode at all times makes sense for event-driven daemons,
00240 but is kind of tedious for simple scripts, hence this wrapper.
00241
00242 The wrapped function should take at least two arguments: a callback
00243 function and an errback function. If any arguments are passed to
00244 the wrapper, they will be passed as additional arguments to the
00245 wrapped function.
00246 """
00247
00248 res = None
00249 err = None
00250
00251 def __init__(self, func):
00252 self.func = func
00253
00254 def cb(self, res = None):
00255 self.res = res
00256 raise ExitNow
00257
00258 def eb(self, err):
00259 self.err = err
00260 raise ExitNow
00261
00262 def __call__(self, *args, **kwargs):
00263 timer(lambda: self.func(self.cb, self.eb, *args, **kwargs), self.eb).set(None)
00264 event_loop()
00265 if self.err is not None:
00266 raise self.err
00267 return self.res
00268
00269 def exit_event_loop():
00270 """Force exit from event_loop()."""
00271 raise ExitNow