00001 """
00002 Utilities for event-driven programming.
00003
00004 $Id: async.py 3191 2010-04-12 23:07:16Z sra $
00005
00006 Copyright (C) 2009-2010 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020
00021 import asyncore, signal, traceback, gc, sys
00022 import rpki.log, rpki.sundial
00023
00024 ExitNow = asyncore.ExitNow
00025
00026 class iterator(object):
00027 """
00028 Iteration construct for event-driven code. Takes three
00029 arguments:
00030
00031 - Some kind of iterable object
00032
00033 - A callback to call on each item in the iteration
00034
00035 - A callback to call after the iteration terminates.
00036
00037 The item callback receives two arguments: the callable iterator
00038 object and the current value of the iteration. It should call the
00039 iterator (or arrange for the iterator to be called) when it is time
00040 to continue to the next item in the iteration.
00041
00042 The termination callback receives no arguments.
00043 """
00044
00045 def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
00046 self.item_callback = item_callback
00047 self.done_callback = done_callback
00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049 self.unwind_stack = unwind_stack
00050 try:
00051 self.iterator = iter(iterable)
00052 except (ExitNow, SystemExit):
00053 raise
00054 except:
00055 rpki.log.debug("Problem constructing iterator for %r" % (iterable,))
00056 raise
00057 self.doit()
00058
00059 def __repr__(self):
00060 return ("<%s created at %s:%s %s at 0x%x>" %
00061 (self.__class__.__name__,
00062 self.caller_file, self.caller_line, self.caller_function, id(self)))
00063
00064 def __call__(self):
00065 if self.unwind_stack:
00066 defer(self.doit)
00067 else:
00068 self.doit()
00069
00070 def doit(self):
00071 """
00072 Implement the iterator protocol: attempt to call the item handler
00073 with the next iteration value, call the termination handler if the
00074 iterator signaled StopIteration.
00075 """
00076 try:
00077 self.item_callback(self, self.iterator.next())
00078 except StopIteration:
00079 if self.done_callback is not None:
00080 self.done_callback()
00081
00082 class timer(object):
00083 """
00084 Timer construct for event-driven code. It can be used in either of two ways:
00085
00086 - As a virtual class, in which case the subclass should provide a
00087 handler() method to receive the wakup event when the timer expires; or
00088
00089 - By setting an explicit handler callback, either via the
00090 constructor or the set_handler() method.
00091
00092 Subclassing is probably more Pythonic, but setting an explict
00093 handler turns out to be very convenient when combined with bound
00094 methods to other objects.
00095 """
00096
00097
00098
00099 gc_debug = False
00100
00101
00102
00103 run_debug = False
00104
00105
00106
00107 queue = []
00108
00109 def __init__(self, handler = None, errback = None):
00110 if handler is not None:
00111 self.set_handler(handler)
00112 if errback is not None:
00113 self.set_errback(errback)
00114 self.when = None
00115 if self.gc_debug:
00116 self.trace("Creating %r" % self)
00117
00118 def trace(self, msg):
00119 """
00120 Debug logging.
00121 """
00122 if self.gc_debug:
00123 bt = traceback.extract_stack(limit = 3)
00124 rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1]))
00125
00126 def set(self, when):
00127 """
00128 Set a timer. Argument can be a datetime, to specify an absolute
00129 time, or a timedelta, to specify an offset time.
00130 """
00131 if self.gc_debug:
00132 self.trace("Setting %r to %r" % (self, when))
00133 if isinstance(when, rpki.sundial.timedelta):
00134 self.when = rpki.sundial.now() + when
00135 else:
00136 self.when = when
00137 assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
00138 if self not in self.queue:
00139 self.queue.append(self)
00140 self.queue.sort()
00141
00142 def __cmp__(self, other):
00143 return cmp(self.when, other.when)
00144
00145 if gc_debug:
00146 def __del__(self):
00147 rpki.log.debug("Deleting %r" % self)
00148
00149 def cancel(self):
00150 """
00151 Cancel a timer, if it was set.
00152 """
00153 if self.gc_debug:
00154 self.trace("Canceling %r" % self)
00155 try:
00156 self.queue.remove(self)
00157 except ValueError:
00158 pass
00159
00160 def is_set(self):
00161 """Test whether this timer is currently set."""
00162 return self in self.queue
00163
00164 def handler(self):
00165 """
00166 Handle a timer that has expired. This must either be overriden by
00167 a subclass or set dynamically by set_handler().
00168 """
00169 raise NotImplementedError
00170
00171 def set_handler(self, handler):
00172 """
00173 Set timer's expiration handler. This is an alternative to
00174 subclassing the timer class, and may be easier to use when
00175 integrating timers into other classes (eg, the handler can be a
00176 bound method to an object in a class representing a network
00177 connection).
00178 """
00179 self.handler = handler
00180
00181 def errback(self, e):
00182 """
00183 Error callback. May be overridden, or set with set_errback().
00184 """
00185 rpki.log.error("Unhandled exception from timer: %s" % e)
00186 rpki.log.traceback()
00187
00188 def set_errback(self, errback):
00189 """Set a timer's errback. Like set_handler(), for errbacks."""
00190 self.errback = errback
00191
00192 @classmethod
00193 def runq(cls):
00194 """
00195 Run the timer queue: for each timer whose call time has passed,
00196 pull the timer off the queue and call its handler() method.
00197 """
00198 while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00199 t = cls.queue.pop(0)
00200 if cls.run_debug:
00201 rpki.log.debug("Running %r" % t)
00202 try:
00203 t.handler()
00204 except (ExitNow, SystemExit):
00205 raise
00206 except Exception, e:
00207 t.errback(e)
00208
00209 def __repr__(self):
00210 return "<%s %r %r at 0x%x>" % (self.__class__.__name__, self.when, self.handler, id(self))
00211
00212 @classmethod
00213 def seconds_until_wakeup(cls):
00214 """
00215 Calculate delay until next timer expires, or None if no timers are
00216 set and we should wait indefinitely. Rounds up to avoid spinning
00217 in select() or poll(). We could calculate fractional seconds in
00218 the right units instead, but select() and poll() don't even take
00219 the same units (argh!), and we're not doing anything that
00220 hair-triggered, so rounding up is simplest.
00221 """
00222 if not cls.queue:
00223 return None
00224 now = rpki.sundial.now()
00225 if now >= cls.queue[0].when:
00226 return 0
00227 delay = cls.queue[0].when - now
00228 seconds = delay.convert_to_seconds()
00229 if delay.microseconds:
00230 seconds += 1
00231 return seconds
00232
00233 @classmethod
00234 def clear(cls):
00235 """
00236 Cancel every timer on the queue. We could just throw away the
00237 queue content, but this way we can notify subclasses that provide
00238 their own cancel() method.
00239 """
00240 while cls.queue:
00241 cls.queue.pop(0).cancel()
00242
00243
00244
00245
00246
00247
00248 deferred_queue = []
00249
00250 def defer(thunk):
00251 """
00252 Defer an action until the next pass through the event loop.
00253 """
00254 deferred_queue.append(thunk)
00255
00256 def run_deferred():
00257 """
00258 Run deferred actions.
00259 """
00260 while deferred_queue:
00261 try:
00262 deferred_queue.pop(0)()
00263 except (ExitNow, SystemExit):
00264 raise
00265 except Exception, e:
00266 rpki.log.error("Unhandled exception from deferred action: %s" % e)
00267 rpki.log.traceback()
00268
00269 def _raiseExitNow(signum, frame):
00270 """Signal handler for event_loop()."""
00271 raise ExitNow
00272
00273 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00274 """
00275 Replacement for asyncore.loop(), adding timer and signal support.
00276 """
00277 while True:
00278 old_signal_handlers = {}
00279 try:
00280 for sig in catch_signals:
00281 old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
00282 while asyncore.socket_map or deferred_queue or timer.queue:
00283 run_deferred()
00284 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00285 run_deferred()
00286 timer.runq()
00287 if timer.gc_debug:
00288 gc.collect()
00289 if gc.garbage:
00290 for i in gc.garbage:
00291 rpki.log.debug("GC-cycle %r" % i)
00292 del gc.garbage[:]
00293 except ExitNow:
00294 break
00295 except SystemExit:
00296 raise
00297 except Exception, e:
00298 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00299 else:
00300 break
00301 finally:
00302 for sig in old_signal_handlers:
00303 signal.signal(sig, old_signal_handlers[sig])
00304
00305 class sync_wrapper(object):
00306 """
00307 Synchronous wrapper around asynchronous functions. Running in
00308 asynchronous mode at all times makes sense for event-driven daemons,
00309 but is kind of tedious for simple scripts, hence this wrapper.
00310
00311 The wrapped function should take at least two arguments: a callback
00312 function and an errback function. If any arguments are passed to
00313 the wrapper, they will be passed as additional arguments to the
00314 wrapped function.
00315 """
00316
00317 res = None
00318 err = None
00319
00320 def __init__(self, func):
00321 self.func = func
00322
00323 def cb(self, res = None):
00324 """
00325 Wrapped code has requested normal termination. Store result, and
00326 exit the event loop.
00327 """
00328 self.res = res
00329 raise ExitNow
00330
00331 def eb(self, err):
00332 """
00333 Wrapped code raised an exception. Store exception data, then exit
00334 the event loop.
00335 """
00336 exc_info = sys.exc_info()
00337 self.err = exc_info if exc_info[1] is err else err
00338 raise ExitNow
00339
00340 def __call__(self, *args, **kwargs):
00341
00342 def thunk():
00343 """
00344 Deferred action to call the wrapped code once event system is
00345 running.
00346 """
00347 try:
00348 self.func(self.cb, self.eb, *args, **kwargs)
00349 except ExitNow:
00350 raise
00351 except Exception, e:
00352 self.eb(e)
00353
00354 defer(thunk)
00355 event_loop()
00356 if self.err is not None:
00357 if isinstance(self.err, tuple):
00358 raise self.err[0], self.err[1], self.err[2]
00359 else:
00360 raise self.err
00361 else:
00362 return self.res
00363
00364 def exit_event_loop():
00365 """Force exit from event_loop()."""
00366 raise ExitNow
00367
00368 class gc_summary(object):
00369 """
00370 Periodic summary of GC state, for tracking down memory bloat.
00371 """
00372
00373 def __init__(self, interval, threshold = 0):
00374 if isinstance(interval, (int, long)):
00375 interval = rpki.sundial.timedelta(seconds = interval)
00376 self.interval = interval
00377 self.threshold = threshold
00378 self.timer = timer(handler = self.handler)
00379 self.timer.set(self.interval)
00380
00381 def handler(self):
00382 """
00383 Collect and log GC state for this period, reset timer.
00384 """
00385 rpki.log.debug("gc_summary: Running gc.collect()")
00386 gc.collect()
00387 rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold)
00388 total = {}
00389 tuples = {}
00390 for g in gc.get_objects():
00391 k = type(g).__name__
00392 total[k] = total.get(k, 0) + 1
00393 if isinstance(g, tuple):
00394 k = ", ".join(type(x).__name__ for x in g)
00395 tuples[k] = tuples.get(k, 0) + 1
00396 rpki.log.debug("gc_summary: Sorting result")
00397 total = total.items()
00398 total.sort(reverse = True, key = lambda x: x[1])
00399 tuples = tuples.items()
00400 tuples.sort(reverse = True, key = lambda x: x[1])
00401 rpki.log.debug("gc_summary: Object type counts in descending order")
00402 for name, count in total:
00403 if count > self.threshold:
00404 rpki.log.debug("gc_summary: %8d %s" % (count, name))
00405 rpki.log.debug("gc_summary: Tuple content type signature counts in descending order")
00406 for types, count in tuples:
00407 if count > self.threshold:
00408 rpki.log.debug("gc_summary: %8d (%s)" % (count, types))
00409 rpki.log.debug("gc_summary: Scheduling next cycle")
00410 self.timer.set(self.interval)