00001 """
00002 Utilities for event-driven programming.
00003
00004 $Id: async.py 2926 2010-01-04 22:28:34Z sra $
00005
00006 Copyright (C) 2009 Internet Systems Consortium ("ISC")
00007
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020
00021 import asyncore, signal, traceback, gc
00022 import rpki.log, rpki.sundial
00023
00024 ExitNow = asyncore.ExitNow
00025
00026 class iterator(object):
00027 """
00028 Iteration construct for event-driven code. Takes three
00029 arguments:
00030
00031 - Some kind of iterable object
00032
00033 - A callback to call on each item in the iteration
00034
00035 - A callback to call after the iteration terminates.
00036
00037 The item callback receives two arguments: the callable iterator
00038 object and the current value of the iteration. It should call the
00039 iterator (or arrange for the iterator to be called) when it is time
00040 to continue to the next item in the iteration.
00041
00042 The termination callback receives no arguments.
00043 """
00044
00045 def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
00046 self.item_callback = item_callback
00047 self.done_callback = done_callback
00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049 self.unwind_stack = unwind_stack
00050 try:
00051 self.iterator = iter(iterable)
00052 except (ExitNow, SystemExit):
00053 raise
00054 except:
00055 rpki.log.debug("Problem constructing iterator for %r" % (iterable,))
00056 raise
00057 self.doit()
00058
00059 def __repr__(self):
00060 return ("<%s created at %s:%s %s at 0x%x>" %
00061 (self.__class__.__name__,
00062 self.caller_file, self.caller_line, self.caller_function, id(self)))
00063
00064 def __call__(self):
00065 if self.unwind_stack:
00066 defer(self.doit)
00067 else:
00068 self.doit()
00069
00070 def doit(self):
00071 try:
00072 self.item_callback(self, self.iterator.next())
00073 except StopIteration:
00074 if self.done_callback is not None:
00075 self.done_callback()
00076
00077 def ignore(self, ignored):
00078 self()
00079
00080 class timer(object):
00081 """
00082 Timer construct for event-driven code. It can be used in either of two ways:
00083
00084 - As a virtual class, in which case the subclass should provide a
00085 handler() method to receive the wakup event when the timer expires; or
00086
00087 - By setting an explicit handler callback, either via the
00088 constructor or the set_handler() method.
00089
00090 Subclassing is probably more Pythonic, but setting an explict
00091 handler turns out to be very convenient when combined with bound
00092 methods to other objects.
00093 """
00094
00095
00096
00097 gc_debug = False
00098
00099
00100
00101 run_debug = False
00102
00103
00104
00105 queue = []
00106
00107 def __init__(self, handler = None, errback = None):
00108 if handler is not None:
00109 self.set_handler(handler)
00110 if errback is not None:
00111 self.set_errback(errback)
00112 self.when = None
00113 self.trace("Creating %r" % self)
00114
00115 def trace(self, msg):
00116 """
00117 Debug logging.
00118 """
00119 if self.gc_debug:
00120 bt = traceback.extract_stack(limit = 3)
00121 rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1]))
00122
00123 def set(self, when):
00124 """
00125 Set a timer. Argument can be a datetime, to specify an absolute
00126 time, or a timedelta, to specify an offset time.
00127 """
00128 self.trace("Setting %r to %r" % (self, when))
00129 if isinstance(when, rpki.sundial.timedelta):
00130 self.when = rpki.sundial.now() + when
00131 else:
00132 self.when = when
00133 assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
00134 if self not in self.queue:
00135 self.queue.append(self)
00136 self.queue.sort()
00137
00138 def __cmp__(self, other):
00139 return cmp(self.when, other.when)
00140
00141 if gc_debug:
00142 def __del__(self):
00143 rpki.log.debug("Deleting %r" % self)
00144
00145 def cancel(self):
00146 """
00147 Cancel a timer, if it was set.
00148 """
00149 self.trace("Canceling %r" % self)
00150 try:
00151 self.queue.remove(self)
00152 except ValueError:
00153 pass
00154
00155 def is_set(self):
00156 """Test whether this timer is currently set."""
00157 return self in self.queue
00158
00159 def handler(self):
00160 """
00161 Handle a timer that has expired. This must either be overriden by
00162 a subclass or set dynamically by set_handler().
00163 """
00164 raise NotImplementedError
00165
00166 def set_handler(self, handler):
00167 """
00168 Set timer's expiration handler. This is an alternative to
00169 subclassing the timer class, and may be easier to use when
00170 integrating timers into other classes (eg, the handler can be a
00171 bound method to an object in a class representing a network
00172 connection).
00173 """
00174 self.handler = handler
00175
00176 def errback(self, e):
00177 """
00178 Error callback. May be overridden, or set with set_errback().
00179 """
00180 rpki.log.error("Unhandled exception from timer: %s" % e)
00181 rpki.log.traceback()
00182
00183 def set_errback(self, errback):
00184 """Set a timer's errback. Like set_handler(), for errbacks."""
00185 self.errback = errback
00186
00187 @classmethod
00188 def runq(cls):
00189 """
00190 Run the timer queue: for each timer whose call time has passed,
00191 pull the timer off the queue and call its handler() method.
00192 """
00193 while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00194 t = cls.queue.pop(0)
00195 if cls.run_debug:
00196 rpki.log.debug("Running %r" % t)
00197 try:
00198 t.handler()
00199 except (ExitNow, SystemExit):
00200 raise
00201 except Exception, e:
00202 t.errback(e)
00203
00204 def __repr__(self):
00205 return "<%s %r %r at 0x%x>" % (self.__class__.__name__, self.when, self.handler, id(self))
00206
00207 @classmethod
00208 def seconds_until_wakeup(cls):
00209 """
00210 Calculate delay until next timer expires, or None if no timers are
00211 set and we should wait indefinitely. Rounds up to avoid spinning
00212 in select() or poll(). We could calculate fractional seconds in
00213 the right units instead, but select() and poll() don't even take
00214 the same units (argh!), and we're not doing anything that
00215 hair-triggered, so rounding up is simplest.
00216 """
00217 if not cls.queue:
00218 return None
00219 now = rpki.sundial.now()
00220 if now >= cls.queue[0].when:
00221 return 0
00222 delay = cls.queue[0].when - now
00223 seconds = delay.convert_to_seconds()
00224 if delay.microseconds:
00225 seconds += 1
00226 return seconds
00227
00228 @classmethod
00229 def clear(cls):
00230 """
00231 Cancel every timer on the queue. We could just throw away the
00232 queue content, but this way we can notify subclasses that provide
00233 their own cancel() method.
00234 """
00235 while cls.queue:
00236 cls.queue.pop(0).cancel()
00237
00238
00239
00240
00241
00242
00243 deferred_queue = []
00244
00245 def defer(thunk):
00246 """
00247 Defer an action until the next pass through the event loop.
00248 """
00249 deferred_queue.append(thunk)
00250
00251 def run_deferred():
00252 """
00253 Run deferred actions.
00254 """
00255 while deferred_queue:
00256 try:
00257 deferred_queue.pop(0)()
00258 except (ExitNow, SystemExit):
00259 raise
00260 except Exception, e:
00261 rpki.log.error("Unhandled exception from deferred action: %s" % e)
00262 rpki.log.traceback()
00263
00264 def _raiseExitNow(signum, frame):
00265 """Signal handler for event_loop()."""
00266 raise ExitNow
00267
00268 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00269 """
00270 Replacement for asyncore.loop(), adding timer and signal support.
00271 """
00272 while True:
00273 old_signal_handlers = {}
00274 try:
00275 for sig in catch_signals:
00276 old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
00277 while asyncore.socket_map or deferred_queue or timer.queue:
00278 run_deferred()
00279 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00280 run_deferred()
00281 timer.runq()
00282 if timer.gc_debug:
00283 gc.collect()
00284 if gc.garbage:
00285 for i in gc.garbage:
00286 rpki.log.debug("GC-cycle %r" % i)
00287 del gc.garbage[:]
00288 except ExitNow:
00289 break
00290 except SystemExit:
00291 raise
00292 except Exception, e:
00293 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00294 else:
00295 break
00296 finally:
00297 for sig in old_signal_handlers:
00298 signal.signal(sig, old_signal_handlers[sig])
00299
00300 class sync_wrapper(object):
00301 """
00302 Synchronous wrapper around asynchronous functions. Running in
00303 asynchronous mode at all times makes sense for event-driven daemons,
00304 but is kind of tedious for simple scripts, hence this wrapper.
00305
00306 The wrapped function should take at least two arguments: a callback
00307 function and an errback function. If any arguments are passed to
00308 the wrapper, they will be passed as additional arguments to the
00309 wrapped function.
00310 """
00311
00312 res = None
00313 err = None
00314
00315 def __init__(self, func):
00316 self.func = func
00317
00318 def cb(self, res = None):
00319 self.res = res
00320 raise ExitNow
00321
00322 def eb(self, err):
00323 self.err = err
00324 raise ExitNow
00325
00326 def __call__(self, *args, **kwargs):
00327
00328 def thunk():
00329 try:
00330 self.func(self.cb, self.eb, *args, **kwargs)
00331 except ExitNow:
00332 raise
00333 except Exception, e:
00334 self.eb(e)
00335
00336 defer(thunk)
00337 event_loop()
00338 if self.err is not None:
00339 raise self.err
00340 else:
00341 return self.res
00342
00343 def exit_event_loop():
00344 """Force exit from event_loop()."""
00345 raise ExitNow
00346
00347 class gc_summary(object):
00348 """
00349 Periodic summary of GC state, for tracking down memory bloat.
00350 """
00351
00352 def __init__(self, interval):
00353 if isinstance(interval, (int, long)):
00354 interval = rpki.sundial.timedelta(seconds = interval)
00355 self.interval = interval
00356 self.timer = timer(handler = self.handler)
00357 self.timer.set(self.interval)
00358
00359 def handler(self):
00360 rpki.log.debug("gc_summary: Running gc.collect()")
00361 gc.collect()
00362 rpki.log.debug("gc_summary: Summarizing")
00363 total = {}
00364 for g in gc.get_objects():
00365 t = type(g).__name__
00366 if t in total:
00367 total[t] += 1
00368 else:
00369 total[t] = 1
00370 rpki.log.debug("gc_summary: Sorting result")
00371 total = total.items()
00372 total.sort(reverse = True, key = lambda x: x[1])
00373 rpki.log.debug("gc_summary: Object type counts in descending order")
00374 for name, count in total:
00375 rpki.log.debug("gc_summary: %8d %s" % (count, name))
00376 rpki.log.debug("gc_summary: Scheduling next cycle")
00377 self.timer.set(self.interval)