RPKI Engine  1.0
async.py (4014)
Go to the documentation of this file.
00001 """
00002 Utilities for event-driven programming.
00003 
00004 $Id: async.py 4014 2011-10-05 16:30:24Z sra $
00005 
00006 Copyright (C) 2009--2011  Internet Systems Consortium ("ISC")
00007 
00008 Permission to use, copy, modify, and distribute this software for any
00009 purpose with or without fee is hereby granted, provided that the above
00010 copyright notice and this permission notice appear in all copies.
00011 
00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
00014 AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
00018 PERFORMANCE OF THIS SOFTWARE.
00019 """
00020 
00021 import asyncore, signal, traceback, gc, sys
00022 import rpki.log, rpki.sundial
00023 
00024 ExitNow = asyncore.ExitNow
00025 
00026 class iterator(object):
00027   """
00028   Iteration construct for event-driven code.  Takes three
00029   arguments:
00030 
00031   - Some kind of iterable object
00032 
00033   - A callback to call on each item in the iteration
00034 
00035   - A callback to call after the iteration terminates.
00036 
00037   The item callback receives two arguments: the callable iterator
00038   object and the current value of the iteration.  It should call the
00039   iterator (or arrange for the iterator to be called) when it is time
00040   to continue to the next item in the iteration.
00041 
00042   The termination callback receives no arguments.
00043   """
00044 
00045   def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
00046     self.item_callback = item_callback
00047     self.done_callback = done_callback
00048     self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
00049     self.unwind_stack = unwind_stack
00050     try:
00051       self.iterator = iter(iterable)
00052     except (ExitNow, SystemExit):
00053       raise
00054     except Exception:
00055       rpki.log.debug("Problem constructing iterator for %r" % (iterable,))
00056       raise
00057     self.doit()
00058 
00059   def __repr__(self):
00060     return ("<%s created at %s:%s %s at 0x%x>" %
00061             (self.__class__.__name__,
00062              self.caller_file, self.caller_line, self.caller_function, id(self)))
00063 
00064   def __call__(self):
00065     if self.unwind_stack:
00066       defer(self.doit)
00067     else:
00068       self.doit()
00069 
00070   def doit(self):
00071     """
00072     Implement the iterator protocol: attempt to call the item handler
00073     with the next iteration value, call the termination handler if the
00074     iterator signaled StopIteration.
00075     """
00076     try:
00077       self.item_callback(self, self.iterator.next())
00078     except StopIteration:
00079       if self.done_callback is not None:
00080         self.done_callback()
00081 
00082 class timer(object):
00083   """
00084   Timer construct for event-driven code.  It can be used in either of two ways:
00085 
00086   - As a virtual class, in which case the subclass should provide a
00087     handler() method to receive the wakup event when the timer expires; or
00088 
00089   - By setting an explicit handler callback, either via the
00090     constructor or the set_handler() method.
00091 
00092   Subclassing is probably more Pythonic, but setting an explict
00093   handler turns out to be very convenient when combined with bound
00094   methods to other objects.
00095   """
00096 
00097   ## @var gc_debug
00098   # Verbose chatter about timers states and garbage collection.
00099   gc_debug = False
00100 
00101   ## @var run_debug
00102   # Verbose chatter about timers being run.
00103   run_debug = False
00104 
00105   ## @var queue
00106   # Timer queue, shared by all timer instances (there can be only one queue).
00107   queue = []
00108 
00109   def __init__(self, handler = None, errback = None):
00110     if handler is not None:
00111       self.set_handler(handler)
00112     if errback is not None:
00113       self.set_errback(errback)
00114     self.when = None
00115     if self.gc_debug:
00116       self.trace("Creating %r" % self)
00117 
00118   def trace(self, msg):
00119     """
00120     Debug logging.
00121     """
00122     if self.gc_debug:
00123       bt = traceback.extract_stack(limit = 3)
00124       rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1]))
00125 
00126   def set(self, when):
00127     """
00128     Set a timer.  Argument can be a datetime, to specify an absolute
00129     time, or a timedelta, to specify an offset time.
00130     """
00131     if self.gc_debug:
00132       self.trace("Setting %r to %r" % (self, when))
00133     if isinstance(when, rpki.sundial.timedelta):
00134       self.when = rpki.sundial.now() + when
00135     else:
00136       self.when = when
00137     assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
00138     if self not in self.queue:
00139       self.queue.append(self)
00140     self.queue.sort(key = lambda x: x.when)
00141 
00142   def __cmp__(self, other):
00143     return cmp(id(self), id(other))
00144 
00145   if gc_debug:
00146     def __del__(self):
00147       rpki.log.debug("Deleting %r" % self)
00148 
00149   def cancel(self):
00150     """
00151     Cancel a timer, if it was set.
00152     """
00153     if self.gc_debug:
00154       self.trace("Canceling %r" % self)
00155     try:
00156       while True:
00157         self.queue.remove(self)
00158     except ValueError:
00159       pass
00160 
00161   def is_set(self):
00162     """
00163     Test whether this timer is currently set.
00164     """
00165     return self in self.queue
00166 
00167   def handler(self):
00168     """
00169     Handle a timer that has expired.  This must either be overriden by
00170     a subclass or set dynamically by set_handler().
00171     """
00172     raise NotImplementedError
00173 
00174   def set_handler(self, handler):
00175     """
00176     Set timer's expiration handler.  This is an alternative to
00177     subclassing the timer class, and may be easier to use when
00178     integrating timers into other classes (eg, the handler can be a
00179     bound method to an object in a class representing a network
00180     connection).
00181     """
00182     self.handler = handler
00183 
00184   def errback(self, e):
00185     """
00186     Error callback.  May be overridden, or set with set_errback().
00187     """
00188     rpki.log.error("Unhandled exception from timer: %s" % e)
00189     rpki.log.traceback()
00190 
00191   def set_errback(self, errback):
00192     """
00193     Set a timer's errback.  Like set_handler(), for errbacks.
00194     """
00195     self.errback = errback
00196 
00197   @classmethod
00198   def runq(cls):
00199     """
00200     Run the timer queue: for each timer whose call time has passed,
00201     pull the timer off the queue and call its handler() method.
00202     """
00203     while cls.queue and rpki.sundial.now() >= cls.queue[0].when:
00204       t = cls.queue.pop(0)
00205       if cls.run_debug:
00206         rpki.log.debug("Running %r" % t)
00207       try:
00208         t.handler()
00209       except (ExitNow, SystemExit):
00210         raise
00211       except Exception, e:
00212         t.errback(e)
00213 
00214   def __repr__(self):
00215     return rpki.log.log_repr(self, self.when, repr(self.handler))
00216 
00217   @classmethod
00218   def seconds_until_wakeup(cls):
00219     """
00220     Calculate delay until next timer expires, or None if no timers are
00221     set and we should wait indefinitely.  Rounds up to avoid spinning
00222     in select() or poll().  We could calculate fractional seconds in
00223     the right units instead, but select() and poll() don't even take
00224     the same units (argh!), and we're not doing anything that
00225     hair-triggered, so rounding up is simplest.
00226     """
00227     if not cls.queue:
00228       return None
00229     now = rpki.sundial.now()
00230     if now >= cls.queue[0].when:
00231       return 0
00232     delay = cls.queue[0].when - now
00233     seconds = delay.convert_to_seconds()
00234     if delay.microseconds:
00235       seconds += 1
00236     return seconds
00237 
00238   @classmethod
00239   def clear(cls):
00240     """
00241     Cancel every timer on the queue.  We could just throw away the
00242     queue content, but this way we can notify subclasses that provide
00243     their own cancel() method.
00244     """
00245     while cls.queue:
00246       cls.queue.pop(0).cancel()
00247 
00248 ## @var deferred_queue
00249 # List to hold deferred actions.  We used to do this with the timer
00250 # queue, but that appears to confuse the garbage collector, and is
00251 # overengineering for simple deferred actions in any case.
00252 
00253 deferred_queue = []
00254 
00255 def defer(thunk):
00256   """
00257   Defer an action until the next pass through the event loop.
00258   """
00259   deferred_queue.append(thunk)
00260 
00261 def run_deferred():
00262   """
00263   Run deferred actions.
00264   """
00265   while deferred_queue:
00266     try:
00267       deferred_queue.pop(0)()
00268     except (ExitNow, SystemExit):
00269       raise
00270     except Exception, e:
00271       rpki.log.error("Unhandled exception from deferred action: %s" % e)
00272       rpki.log.traceback()
00273 
00274 def _raiseExitNow(signum, frame):
00275   """
00276   Signal handler for event_loop().
00277   """
00278   raise ExitNow
00279 
00280 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
00281   """
00282   Replacement for asyncore.loop(), adding timer and signal support.
00283   """
00284   old_signal_handlers = {}
00285   while True:
00286     save_sigs = len(old_signal_handlers) == 0
00287     try:
00288       for sig in catch_signals:
00289         old = signal.signal(sig, _raiseExitNow)
00290         if save_sigs:
00291           old_signal_handlers[sig] = old
00292       while asyncore.socket_map or deferred_queue or timer.queue:
00293         run_deferred()
00294         asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map)
00295         run_deferred()
00296         timer.runq()
00297         if timer.gc_debug:
00298           gc.collect()
00299           if gc.garbage:
00300             for i in gc.garbage:
00301               rpki.log.debug("GC-cycle %r" % i)
00302             del gc.garbage[:]
00303     except ExitNow:
00304       break
00305     except SystemExit:
00306       raise
00307     except ValueError, e:
00308       if str(e) == "filedescriptor out of range in select()":
00309         rpki.log.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
00310         rpki.log.error("Content of asyncore.socket_map:")
00311         for fd in sorted(asyncore.socket_map.iterkeys()):
00312           rpki.log.error("  fd %s obj %r" % (fd, asyncore.socket_map[fd]))
00313         rpki.log.error("Not safe to continue due to risk of spin loop on select().  Exiting.")
00314         sys.exit(1)
00315       rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00316     except Exception, e:
00317       rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e)
00318     else:
00319       break
00320     finally:
00321       for sig in old_signal_handlers:
00322         signal.signal(sig, old_signal_handlers[sig])
00323 
00324 class sync_wrapper(object):
00325   """
00326   Synchronous wrapper around asynchronous functions.  Running in
00327   asynchronous mode at all times makes sense for event-driven daemons,
00328   but is kind of tedious for simple scripts, hence this wrapper.
00329 
00330   The wrapped function should take at least two arguments: a callback
00331   function and an errback function.  If any arguments are passed to
00332   the wrapper, they will be passed as additional arguments to the
00333   wrapped function.
00334   """
00335 
00336   res = None
00337   err = None
00338 
00339   def __init__(self, func):
00340     self.func = func
00341 
00342   def cb(self, res = None):
00343     """
00344     Wrapped code has requested normal termination.  Store result, and
00345     exit the event loop.
00346     """
00347     self.res = res
00348     raise ExitNow
00349 
00350   def eb(self, err):
00351     """
00352     Wrapped code raised an exception.  Store exception data, then exit
00353     the event loop.
00354     """
00355     exc_info = sys.exc_info()
00356     self.err = exc_info if exc_info[1] is err else err
00357     raise ExitNow
00358 
00359   def __call__(self, *args, **kwargs):
00360 
00361     def thunk():
00362       """
00363       Deferred action to call the wrapped code once event system is
00364       running.
00365       """
00366       try:
00367         self.func(self.cb, self.eb, *args, **kwargs)
00368       except ExitNow:
00369         raise
00370       except Exception, e:
00371         self.eb(e)
00372       
00373     defer(thunk)
00374     event_loop()
00375     if self.err is None:
00376       return self.res
00377     elif isinstance(self.err, tuple):
00378       raise self.err[0], self.err[1], self.err[2]
00379     else:
00380       raise self.err
00381 
00382 def exit_event_loop():
00383   """
00384   Force exit from event_loop().
00385   """
00386   raise ExitNow
00387 
00388 class gc_summary(object):
00389   """
00390   Periodic summary of GC state, for tracking down memory bloat.
00391   """
00392 
00393   def __init__(self, interval, threshold = 0):
00394     if isinstance(interval, (int, long)):
00395       interval = rpki.sundial.timedelta(seconds = interval)
00396     self.interval = interval
00397     self.threshold = threshold
00398     self.timer = timer(handler = self.handler)
00399     self.timer.set(self.interval)
00400 
00401   def handler(self):
00402     """
00403     Collect and log GC state for this period, reset timer.
00404     """
00405     rpki.log.debug("gc_summary: Running gc.collect()")
00406     gc.collect()
00407     rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold)
00408     total = {}
00409     tuples = {}
00410     for g in gc.get_objects():
00411       k = type(g).__name__
00412       total[k] = total.get(k, 0) + 1
00413       if isinstance(g, tuple):
00414         k = ", ".join(type(x).__name__ for x in g)
00415         tuples[k] = tuples.get(k, 0) + 1
00416     rpki.log.debug("gc_summary: Sorting result")
00417     total = total.items()
00418     total.sort(reverse = True, key = lambda x: x[1])
00419     tuples = tuples.items()
00420     tuples.sort(reverse = True, key = lambda x: x[1])
00421     rpki.log.debug("gc_summary: Object type counts in descending order")
00422     for name, count in total:
00423       if count > self.threshold:
00424         rpki.log.debug("gc_summary: %8d %s" % (count, name))
00425     rpki.log.debug("gc_summary: Tuple content type signature counts in descending order")
00426     for types, count in tuples:
00427       if count > self.threshold:
00428         rpki.log.debug("gc_summary: %8d (%s)" % (count, types))
00429     rpki.log.debug("gc_summary: Scheduling next cycle")
00430     self.timer.set(self.interval)
 All Classes Namespaces Files Functions Variables Properties