RPKI Engine 1.0

async.py (3793)

Go to the documentation of this file.
00001 """
00002 Utilities for event-driven programming.
00003 
00004 $Id: async.py 3793 2011-04-27 04:34:52Z sra $
00005 
00006 Copyright (C) 2009--2010  Internet Systems Consortium ("ISC")
00007 
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011 
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020 
00021 import asyncore, signal, traceback, gc, sys
00022 import rpki.log, rpki.sundial
00023 
00024 ExitNow = asyncore.ExitNow
00025 
00026 class iterator(object):
00027   """
00028   Iteration construct for event-driven code.  Takes three
00029   arguments:
00030 
00031   - Some kind of iterable object
00032 
00033   - A callback to call on each item in the iteration
00034 
00035   - A callback to call after the iteration terminates.
00036 
00037   The item callback receives two arguments: the callable iterator
00038   object and the current value of the iteration.  It should call the
00039   iterator (or arrange for the iterator to be called) when it is time
00040   to continue to the next item in the iteration.
00041 
00042   The termination callback receives no arguments.
00043   """
00044 
00045   def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
00046     self.item_callback = item_callback
00047     self.done_callback = done_callback
00048     self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049     self.unwind_stack = unwind_stack
00050     try:
00051       self.iterator = iter(iterable)
00052     except (ExitNow, SystemExit):
00053       raise
00054     except:
00055       rpki.log.debug("Problem constructing iterator for %r" % (iterable,))
00056       raise
00057     self.doit()
00058 
00059   def __repr__(self):
00060     return ("<%s created at %s:%s %s at 0x%x>" %
00061             (self.__class__.__name__,
00062              self.caller_file, self.caller_line, self.caller_function, id(self)))
00063 
00064   def __call__(self):
00065     if self.unwind_stack:
00066       defer(self.doit)
00067     else:
00068       self.doit()
00069 
00070   def doit(self):
00071     """
00072     Implement the iterator protocol: attempt to call the item handler
00073     with the next iteration value, call the termination handler if the
00074     iterator signaled StopIteration.
00075     """
00076     try:
00077       self.item_callback(self, self.iterator.next())
00078     except StopIteration:
00079       if self.done_callback is not None:
00080         self.done_callback()
00081 
00082 class timer(object):
00083   """
00084   Timer construct for event-driven code.  It can be used in either of two ways:
00085 
00086   - As a virtual class, in which case the subclass should provide a
00087     handler() method to receive the wakup event when the timer expires; or
00088 
00089   - By setting an explicit handler callback, either via the
00090     constructor or the set_handler() method.
00091 
00092   Subclassing is probably more Pythonic, but setting an explict
00093   handler turns out to be very convenient when combined with bound
00094   methods to other objects.
00095   """
00096 
00097   ## @var gc_debug
00098   # Verbose chatter about timers states and garbage collection.
00099   gc_debug = False
00100 
00101   ## @var run_debug
00102   # Verbose chatter about timers being run.
00103   run_debug = False
00104 
00105   ## @var queue
00106   # Timer queue, shared by all timer instances (there can be only one queue).
00107   queue = []
00108 
00109   def __init__(self, handler = None, errback = None):
00110     if handler is not None:
00111       self.set_handler(handler)
00112     if errback is not None:
00113       self.set_errback(errback)
00114     self.when = None
00115     if self.gc_debug:
00116       self.trace("Creating %r" % self)
00117 
00118   def trace(self, msg):
00119     """
00120     Debug logging.
00121     """
00122     if self.gc_debug:
00123       bt = traceback.extract_stack(limit = 3)
00124       rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1]))
00125 
00126   def set(self, when):
00127     """
00128     Set a timer.  Argument can be a datetime, to specify an absolute
00129     time, or a timedelta, to specify an offset time.
00130     """
00131     if self.gc_debug:
00132       self.trace("Setting %r to %r" % (self, when))
00133     if isinstance(when, rpki.sundial.timedelta):
00134       self.when = rpki.sundial.now() + when
00135     else:
00136       self.when = when
00137     assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
00138     if self not in self.queue:
00139       self.queue.append(self)
00140     self.queue.sort(key = lambda x: x.when)
00141 
00142   def __cmp__(self, other):
00143     return cmp(id(self), id(other))
00144 
00145   if gc_debug:
00146     def __del__(self):
00147       rpki.log.debug("Deleting %r" % self)
00148 
00149   def cancel(self):
00150     """
00151     Cancel a timer, if it was set.
00152     """
00153     if self.gc_debug:
00154       self.trace("Canceling %r" % self)
00155     try:
00156       while True:
00157         self.queue.remove(self)
00158     except ValueError:
00159       pass
00160 
00161   def is_set(self):
00162     """
00163     Test whether this timer is currently set.
00164     """
00165     return self in self.queue
00166 
00167   def handler(self):
00168     """
00169     Handle a timer that has expired.  This must either be overriden by
00170     a subclass or set dynamically by set_handler().
00171     """
00172     raise NotImplementedError
00173 
00174   def set_handler(self, handler):
00175     """
00176     Set timer's expiration handler.  This is an alternative to
00177     subclassing the timer class, and may be easier to use when
00178     integrating timers into other classes (eg, the handler can be a
00179     bound method to an object in a class representing a network
00180     connection).
00181     """
00182     self.handler = handler
00183 
00184   def errback(self, e):
00185     """
00186     Error callback.  May be overridden, or set with set_errback().
00187     """
00188     rpki.log.error("Unhandled exception from timer: %s" % e)
00189     rpki.log.traceback()
00190 
00191   def set_errback(self, errback):
00192     """
00193     Set a timer's errback.  Like set_handler(), for errbacks.
00194     """
00195     self.errback = errback
00196 
00197   @classmethod
00198   def runq(cls):
00199     """
00200     Run the timer queue: for each timer whose call time has passed,
00201     pull the timer off the queue and call its handler() method.
00202     """
00203     while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00204       t = cls.queue.pop(0)
00205       if cls.run_debug:
00206         rpki.log.debug("Running %r" % t)
00207       try:
00208         t.handler()
00209       except (ExitNow, SystemExit):
00210         raise
00211       except Exception, e:
00212         t.errback(e)
00213 
00214   def __repr__(self):
00215     return rpki.log.log_repr(self, self.when, repr(self.handler))
00216 
00217   @classmethod
00218   def seconds_until_wakeup(cls):
00219     """
00220     Calculate delay until next timer expires, or None if no timers are
00221     set and we should wait indefinitely.  Rounds up to avoid spinning
00222     in select() or poll().  We could calculate fractional seconds in
00223     the right units instead, but select() and poll() don't even take
00224     the same units (argh!), and we're not doing anything that
00225     hair-triggered, so rounding up is simplest.
00226     """
00227     if not cls.queue:
00228       return None
00229     now = rpki.sundial.now()
00230     if now >= cls.queue[0].when:
00231       return 0
00232     delay = cls.queue[0].when - now
00233     seconds = delay.convert_to_seconds()
00234     if delay.microseconds:
00235       seconds += 1
00236     return seconds
00237 
00238   @classmethod
00239   def clear(cls):
00240     """
00241     Cancel every timer on the queue.  We could just throw away the
00242     queue content, but this way we can notify subclasses that provide
00243     their own cancel() method.
00244     """
00245     while cls.queue:
00246       cls.queue.pop(0).cancel()
00247 
00248 ## @var deferred_queue
00249 # List to hold deferred actions.  We used to do this with the timer
00250 # queue, but that appears to confuse the garbage collector, and is
00251 # overengineering for simple deferred actions in any case.
00252 
00253 deferred_queue = []
00254 
00255 def defer(thunk):
00256   """
00257   Defer an action until the next pass through the event loop.
00258   """
00259   deferred_queue.append(thunk)
00260 
00261 def run_deferred():
00262   """
00263   Run deferred actions.
00264   """
00265   while deferred_queue:
00266     try:
00267       deferred_queue.pop(0)()
00268     except (ExitNow, SystemExit):
00269       raise
00270     except Exception, e:
00271       rpki.log.error("Unhandled exception from deferred action: %s" % e)
00272       rpki.log.traceback()
00273 
00274 def _raiseExitNow(signum, frame):
00275   """
00276   Signal handler for event_loop().
00277   """
00278   raise ExitNow
00279 
00280 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00281   """
00282   Replacement for asyncore.loop(), adding timer and signal support.
00283   """
00284   while True:
00285     old_signal_handlers = {}
00286     try:
00287       for sig in catch_signals:
00288         old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow)
00289       while asyncore.socket_map or deferred_queue or timer.queue:
00290         run_deferred()
00291         asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00292         run_deferred()
00293         timer.runq()
00294         if timer.gc_debug:
00295           gc.collect()
00296           if gc.garbage:
00297             for i in gc.garbage:
00298               rpki.log.debug("GC-cycle %r" % i)
00299             del gc.garbage[:]
00300     except ExitNow:
00301       break
00302     except SystemExit:
00303       raise
00304     except Exception, e:
00305       rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00306     else:
00307       break
00308     finally:
00309       for sig in old_signal_handlers:
00310         signal.signal(sig, old_signal_handlers[sig])
00311 
00312 class sync_wrapper(object):
00313   """
00314   Synchronous wrapper around asynchronous functions.  Running in
00315   asynchronous mode at all times makes sense for event-driven daemons,
00316   but is kind of tedious for simple scripts, hence this wrapper.
00317 
00318   The wrapped function should take at least two arguments: a callback
00319   function and an errback function.  If any arguments are passed to
00320   the wrapper, they will be passed as additional arguments to the
00321   wrapped function.
00322   """
00323 
00324   res = None
00325   err = None
00326 
00327   def __init__(self, func):
00328     self.func = func
00329 
00330   def cb(self, res = None):
00331     """
00332     Wrapped code has requested normal termination.  Store result, and
00333     exit the event loop.
00334     """
00335     self.res = res
00336     raise ExitNow
00337 
00338   def eb(self, err):
00339     """
00340     Wrapped code raised an exception.  Store exception data, then exit
00341     the event loop.
00342     """
00343     exc_info = sys.exc_info()
00344     self.err = exc_info if exc_info[1] is err else err
00345     raise ExitNow
00346 
00347   def __call__(self, *args, **kwargs):
00348 
00349     def thunk():
00350       """
00351       Deferred action to call the wrapped code once event system is
00352       running.
00353       """
00354       try:
00355         self.func(self.cb, self.eb, *args, **kwargs)
00356       except ExitNow:
00357         raise
00358       except Exception, e:
00359         self.eb(e)
00360       
00361     defer(thunk)
00362     event_loop()
00363     if self.err is not None:
00364       if isinstance(self.err, tuple):
00365         raise self.err[0], self.err[1], self.err[2]
00366       else:
00367         raise self.err
00368     else:
00369       return self.res
00370 
00371 def exit_event_loop():
00372   """
00373   Force exit from event_loop().
00374   """
00375   raise ExitNow
00376 
00377 class gc_summary(object):
00378   """
00379   Periodic summary of GC state, for tracking down memory bloat.
00380   """
00381 
00382   def __init__(self, interval, threshold = 0):
00383     if isinstance(interval, (int, long)):
00384       interval = rpki.sundial.timedelta(seconds = interval)
00385     self.interval = interval
00386     self.threshold = threshold
00387     self.timer = timer(handler = self.handler)
00388     self.timer.set(self.interval)
00389 
00390   def handler(self):
00391     """
00392     Collect and log GC state for this period, reset timer.
00393     """
00394     rpki.log.debug("gc_summary: Running gc.collect()")
00395     gc.collect()
00396     rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold)
00397     total = {}
00398     tuples = {}
00399     for g in gc.get_objects():
00400       k = type(g).__name__
00401       total[k] = total.get(k, 0) + 1
00402       if isinstance(g, tuple):
00403         k = ", ".join(type(x).__name__ for x in g)
00404         tuples[k] = tuples.get(k, 0) + 1
00405     rpki.log.debug("gc_summary: Sorting result")
00406     total = total.items()
00407     total.sort(reverse = True, key = lambda x: x[1])
00408     tuples = tuples.items()
00409     tuples.sort(reverse = True, key = lambda x: x[1])
00410     rpki.log.debug("gc_summary: Object type counts in descending order")
00411     for name, count in total:
00412       if count > self.threshold:
00413         rpki.log.debug("gc_summary: %8d %s" % (count, name))
00414     rpki.log.debug("gc_summary: Tuple content type signature counts in descending order")
00415     for types, count in tuples:
00416       if count > self.threshold:
00417         rpki.log.debug("gc_summary: %8d (%s)" % (count, types))
00418     rpki.log.debug("gc_summary: Scheduling next cycle")
00419     self.timer.set(self.interval)
 All Classes Namespaces Files Functions Variables