00001 """
00002 Utilities for event-driven programming.
00003
00004 $Id: async.py 3449 2010-09-16 21:30:30Z sra $
00005
00006 Copyright (C) 2009--2010 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020
00021 import asyncore, signal, traceback, gc, sys
00022 import rpki.log, rpki.sundial
00023
00024 ExitNow = asyncore.ExitNow
00025
00026 class iterator(object):
00027 """
00028 Iteration construct for event-driven code. Takes three
00029 arguments:
00030
00031 - Some kind of iterable object
00032
00033 - A callback to call on each item in the iteration
00034
00035 - A callback to call after the iteration terminates.
00036
00037 The item callback receives two arguments: the callable iterator
00038 object and the current value of the iteration. It should call the
00039 iterator (or arrange for the iterator to be called) when it is time
00040 to continue to the next item in the iteration.
00041
00042 The termination callback receives no arguments.
00043 """
00044
00045 def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
00046 self.item_callback = item_callback
00047 self.done_callback = done_callback
00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049 self.unwind_stack = unwind_stack
00050 try:
00051 self.iterator = iter(iterable)
00052 except (ExitNow, SystemExit):
00053 raise
00054 except:
00055 rpki.log.debug("Problem constructing iterator for %r" % (iterable,))
00056 raise
00057 self.doit()
00058
00059 def __repr__(self):
00060 return ("<%s created at %s:%s %s at 0x%x>" %
00061 (self.__class__.__name__,
00062 self.caller_file, self.caller_line, self.caller_function, id(self)))
00063
00064 def __call__(self):
00065 if self.unwind_stack:
00066 defer(self.doit)
00067 else:
00068 self.doit()
00069
00070 def doit(self):
00071 """
00072 Implement the iterator protocol: attempt to call the item handler
00073 with the next iteration value, call the termination handler if the
00074 iterator signaled StopIteration.
00075 """
00076 try:
00077 self.item_callback(self, self.iterator.next())
00078 except StopIteration:
00079 if self.done_callback is not None:
00080 self.done_callback()
00081
00082 class timer(object):
00083 """
00084 Timer construct for event-driven code. It can be used in either of two ways:
00085
00086 - As a virtual class, in which case the subclass should provide a
00087 handler() method to receive the wakup event when the timer expires; or
00088
00089 - By setting an explicit handler callback, either via the
00090 constructor or the set_handler() method.
00091
00092 Subclassing is probably more Pythonic, but setting an explict
00093 handler turns out to be very convenient when combined with bound
00094 methods to other objects.
00095 """
00096
00097
00098
00099 gc_debug = False
00100
00101
00102
00103 run_debug = False
00104
00105
00106
00107 queue = []
00108
00109 def __init__(self, handler = None, errback = None):
00110 if handler is not None:
00111 self.set_handler(handler)
00112 if errback is not None:
00113 self.set_errback(errback)
00114 self.when = None
00115 if self.gc_debug:
00116 self.trace("Creating %r" % self)
00117
00118 def trace(self, msg):
00119 """
00120 Debug logging.
00121 """
00122 if self.gc_debug:
00123 bt = traceback.extract_stack(limit = 3)
00124 rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1]))
00125
00126 def set(self, when):
00127 """
00128 Set a timer. Argument can be a datetime, to specify an absolute
00129 time, or a timedelta, to specify an offset time.
00130 """
00131 if self.gc_debug:
00132 self.trace("Setting %r to %r" % (self, when))
00133 if isinstance(when, rpki.sundial.timedelta):
00134 self.when = rpki.sundial.now() + when
00135 else:
00136 self.when = when
00137 assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
00138 if self not in self.queue:
00139 self.queue.append(self)
00140 self.queue.sort(key = lambda x: x.when)
00141
00142 def __cmp__(self, other):
00143 return cmp(id(self), id(other))
00144
00145 if gc_debug:
00146 def __del__(self):
00147 rpki.log.debug("Deleting %r" % self)
00148
00149 def cancel(self):
00150 """
00151 Cancel a timer, if it was set.
00152 """
00153 if self.gc_debug:
00154 self.trace("Canceling %r" % self)
00155 try:
00156 while True:
00157 self.queue.remove(self)
00158 except ValueError:
00159 pass
00160
00161 def is_set(self):
00162 """Test whether this timer is currently set."""
00163 return self in self.queue
00164
00165 def handler(self):
00166 """
00167 Handle a timer that has expired. This must either be overriden by
00168 a subclass or set dynamically by set_handler().
00169 """
00170 raise NotImplementedError
00171
00172 def set_handler(self, handler):
00173 """
00174 Set timer's expiration handler. This is an alternative to
00175 subclassing the timer class, and may be easier to use when
00176 integrating timers into other classes (eg, the handler can be a
00177 bound method to an object in a class representing a network
00178 connection).
00179 """
00180 self.handler = handler
00181
00182 def errback(self, e):
00183 """
00184 Error callback. May be overridden, or set with set_errback().
00185 """
00186 rpki.log.error("Unhandled exception from timer: %s" % e)
00187 rpki.log.traceback()
00188
00189 def set_errback(self, errback):
00190 """Set a timer's errback. Like set_handler(), for errbacks."""
00191 self.errback = errback
00192
00193 @classmethod
00194 def runq(cls):
00195 """
00196 Run the timer queue: for each timer whose call time has passed,
00197 pull the timer off the queue and call its handler() method.
00198 """
00199 while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00200 t = cls.queue.pop(0)
00201 if cls.run_debug:
00202 rpki.log.debug("Running %r" % t)
00203 try:
00204 t.handler()
00205 except (ExitNow, SystemExit):
00206 raise
00207 except Exception, e:
00208 t.errback(e)
00209
00210 def __repr__(self):
00211 return "<%s %r %r at 0x%x>" % (self.__class__.__name__, self.when, self.handler, id(self))
00212
00213 @classmethod
00214 def seconds_until_wakeup(cls):
00215 """
00216 Calculate delay until next timer expires, or None if no timers are
00217 set and we should wait indefinitely. Rounds up to avoid spinning
00218 in select() or poll(). We could calculate fractional seconds in
00219 the right units instead, but select() and poll() don't even take
00220 the same units (argh!), and we're not doing anything that
00221 hair-triggered, so rounding up is simplest.
00222 """
00223 if not cls.queue:
00224 return None
00225 now = rpki.sundial.now()
00226 if now >= cls.queue[0].when:
00227 return 0
00228 delay = cls.queue[0].when - now
00229 seconds = delay.convert_to_seconds()
00230 if delay.microseconds:
00231 seconds += 1
00232 return seconds
00233
00234 @classmethod
00235 def clear(cls):
00236 """
00237 Cancel every timer on the queue. We could just throw away the
00238 queue content, but this way we can notify subclasses that provide
00239 their own cancel() method.
00240 """
00241 while cls.queue:
00242 cls.queue.pop(0).cancel()
00243
00244
00245
00246
00247
00248
00249 deferred_queue = []
00250
00251 def defer(thunk):
00252 """
00253 Defer an action until the next pass through the event loop.
00254 """
00255 deferred_queue.append(thunk)
00256
00257 def run_deferred():
00258 """
00259 Run deferred actions.
00260 """
00261 while deferred_queue:
00262 try:
00263 deferred_queue.pop(0)()
00264 except (ExitNow, SystemExit):
00265 raise
00266 except Exception, e:
00267 rpki.log.error("Unhandled exception from deferred action: %s" % e)
00268 rpki.log.traceback()
00269
00270 def _raiseExitNow(signum, frame):
00271 """Signal handler for event_loop()."""
00272 raise ExitNow
00273
00274 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00275 """
00276 Replacement for asyncore.loop(), adding timer and signal support.
00277 """
00278 while True:
00279 old_signal_handlers = {}
00280 try:
00281 for sig in catch_signals:
00282 old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
00283 while asyncore.socket_map or deferred_queue or timer.queue:
00284 run_deferred()
00285 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00286 run_deferred()
00287 timer.runq()
00288 if timer.gc_debug:
00289 gc.collect()
00290 if gc.garbage:
00291 for i in gc.garbage:
00292 rpki.log.debug("GC-cycle %r" % i)
00293 del gc.garbage[:]
00294 except ExitNow:
00295 break
00296 except SystemExit:
00297 raise
00298 except Exception, e:
00299 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00300 else:
00301 break
00302 finally:
00303 for sig in old_signal_handlers:
00304 signal.signal(sig, old_signal_handlers[sig])
00305
00306 class sync_wrapper(object):
00307 """
00308 Synchronous wrapper around asynchronous functions. Running in
00309 asynchronous mode at all times makes sense for event-driven daemons,
00310 but is kind of tedious for simple scripts, hence this wrapper.
00311
00312 The wrapped function should take at least two arguments: a callback
00313 function and an errback function. If any arguments are passed to
00314 the wrapper, they will be passed as additional arguments to the
00315 wrapped function.
00316 """
00317
00318 res = None
00319 err = None
00320
00321 def __init__(self, func):
00322 self.func = func
00323
00324 def cb(self, res = None):
00325 """
00326 Wrapped code has requested normal termination. Store result, and
00327 exit the event loop.
00328 """
00329 self.res = res
00330 raise ExitNow
00331
00332 def eb(self, err):
00333 """
00334 Wrapped code raised an exception. Store exception data, then exit
00335 the event loop.
00336 """
00337 exc_info = sys.exc_info()
00338 self.err = exc_info if exc_info[1] is err else err
00339 raise ExitNow
00340
00341 def __call__(self, *args, **kwargs):
00342
00343 def thunk():
00344 """
00345 Deferred action to call the wrapped code once event system is
00346 running.
00347 """
00348 try:
00349 self.func(self.cb, self.eb, *args, **kwargs)
00350 except ExitNow:
00351 raise
00352 except Exception, e:
00353 self.eb(e)
00354
00355 defer(thunk)
00356 event_loop()
00357 if self.err is not None:
00358 if isinstance(self.err, tuple):
00359 raise self.err[0], self.err[1], self.err[2]
00360 else:
00361 raise self.err
00362 else:
00363 return self.res
00364
00365 def exit_event_loop():
00366 """Force exit from event_loop()."""
00367 raise ExitNow
00368
00369 class gc_summary(object):
00370 """
00371 Periodic summary of GC state, for tracking down memory bloat.
00372 """
00373
00374 def __init__(self, interval, threshold = 0):
00375 if isinstance(interval, (int, long)):
00376 interval = rpki.sundial.timedelta(seconds = interval)
00377 self.interval = interval
00378 self.threshold = threshold
00379 self.timer = timer(handler = self.handler)
00380 self.timer.set(self.interval)
00381
00382 def handler(self):
00383 """
00384 Collect and log GC state for this period, reset timer.
00385 """
00386 rpki.log.debug("gc_summary: Running gc.collect()")
00387 gc.collect()
00388 rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold)
00389 total = {}
00390 tuples = {}
00391 for g in gc.get_objects():
00392 k = type(g).__name__
00393 total[k] = total.get(k, 0) + 1
00394 if isinstance(g, tuple):
00395 k = ", ".join(type(x).__name__ for x in g)
00396 tuples[k] = tuples.get(k, 0) + 1
00397 rpki.log.debug("gc_summary: Sorting result")
00398 total = total.items()
00399 total.sort(reverse = True, key = lambda x: x[1])
00400 tuples = tuples.items()
00401 tuples.sort(reverse = True, key = lambda x: x[1])
00402 rpki.log.debug("gc_summary: Object type counts in descending order")
00403 for name, count in total:
00404 if count > self.threshold:
00405 rpki.log.debug("gc_summary: %8d %s" % (count, name))
00406 rpki.log.debug("gc_summary: Tuple content type signature counts in descending order")
00407 for types, count in tuples:
00408 if count > self.threshold:
00409 rpki.log.debug("gc_summary: %8d (%s)" % (count, types))
00410 rpki.log.debug("gc_summary: Scheduling next cycle")
00411 self.timer.set(self.interval)