RPKI Engine 1.0
|
00001 """ 00002 Utilities for event-driven programming. 00003 00004 $Id: async.py 3793 2011-04-27 04:34:52Z sra $ 00005 00006 Copyright (C) 2009--2010 Internet Systems Consortium ("ISC") 00007 00008 Permission to use, copy, modify, and distribute this software for any 00009 purpose with or without fee is hereby granted, provided that the above 00010 copyright notice and this permission notice appear in all copies. 00011 00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH 00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, 00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00018 PERFORMANCE OF THIS SOFTWARE. 00019 """ 00020 00021 import asyncore, signal, traceback, gc, sys 00022 import rpki.log, rpki.sundial 00023 00024 ExitNow = asyncore.ExitNow 00025 00026 class iterator(object): 00027 """ 00028 Iteration construct for event-driven code. Takes three 00029 arguments: 00030 00031 - Some kind of iterable object 00032 00033 - A callback to call on each item in the iteration 00034 00035 - A callback to call after the iteration terminates. 00036 00037 The item callback receives two arguments: the callable iterator 00038 object and the current value of the iteration. It should call the 00039 iterator (or arrange for the iterator to be called) when it is time 00040 to continue to the next item in the iteration. 00041 00042 The termination callback receives no arguments. 00043 """ 00044 00045 def __init__(self, iterable, item_callback, done_callback, unwind_stack = True): 00046 self.item_callback = item_callback 00047 self.done_callback = done_callback 00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] 00049 self.unwind_stack = unwind_stack 00050 try: 00051 self.iterator = iter(iterable) 00052 except (ExitNow, SystemExit): 00053 raise 00054 except: 00055 rpki.log.debug("Problem constructing iterator for %r" % (iterable,)) 00056 raise 00057 self.doit() 00058 00059 def __repr__(self): 00060 return ("<%s created at %s:%s %s at 0x%x>" % 00061 (self.__class__.__name__, 00062 self.caller_file, self.caller_line, self.caller_function, id(self))) 00063 00064 def __call__(self): 00065 if self.unwind_stack: 00066 defer(self.doit) 00067 else: 00068 self.doit() 00069 00070 def doit(self): 00071 """ 00072 Implement the iterator protocol: attempt to call the item handler 00073 with the next iteration value, call the termination handler if the 00074 iterator signaled StopIteration. 00075 """ 00076 try: 00077 self.item_callback(self, self.iterator.next()) 00078 except StopIteration: 00079 if self.done_callback is not None: 00080 self.done_callback() 00081 00082 class timer(object): 00083 """ 00084 Timer construct for event-driven code. It can be used in either of two ways: 00085 00086 - As a virtual class, in which case the subclass should provide a 00087 handler() method to receive the wakup event when the timer expires; or 00088 00089 - By setting an explicit handler callback, either via the 00090 constructor or the set_handler() method. 00091 00092 Subclassing is probably more Pythonic, but setting an explict 00093 handler turns out to be very convenient when combined with bound 00094 methods to other objects. 00095 """ 00096 00097 ## @var gc_debug 00098 # Verbose chatter about timers states and garbage collection. 00099 gc_debug = False 00100 00101 ## @var run_debug 00102 # Verbose chatter about timers being run. 00103 run_debug = False 00104 00105 ## @var queue 00106 # Timer queue, shared by all timer instances (there can be only one queue). 00107 queue = [] 00108 00109 def __init__(self, handler = None, errback = None): 00110 if handler is not None: 00111 self.set_handler(handler) 00112 if errback is not None: 00113 self.set_errback(errback) 00114 self.when = None 00115 if self.gc_debug: 00116 self.trace("Creating %r" % self) 00117 00118 def trace(self, msg): 00119 """ 00120 Debug logging. 00121 """ 00122 if self.gc_debug: 00123 bt = traceback.extract_stack(limit = 3) 00124 rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1])) 00125 00126 def set(self, when): 00127 """ 00128 Set a timer. Argument can be a datetime, to specify an absolute 00129 time, or a timedelta, to specify an offset time. 00130 """ 00131 if self.gc_debug: 00132 self.trace("Setting %r to %r" % (self, when)) 00133 if isinstance(when, rpki.sundial.timedelta): 00134 self.when = rpki.sundial.now() + when 00135 else: 00136 self.when = when 00137 assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) 00138 if self not in self.queue: 00139 self.queue.append(self) 00140 self.queue.sort(key = lambda x: x.when) 00141 00142 def __cmp__(self, other): 00143 return cmp(id(self), id(other)) 00144 00145 if gc_debug: 00146 def __del__(self): 00147 rpki.log.debug("Deleting %r" % self) 00148 00149 def cancel(self): 00150 """ 00151 Cancel a timer, if it was set. 00152 """ 00153 if self.gc_debug: 00154 self.trace("Canceling %r" % self) 00155 try: 00156 while True: 00157 self.queue.remove(self) 00158 except ValueError: 00159 pass 00160 00161 def is_set(self): 00162 """ 00163 Test whether this timer is currently set. 00164 """ 00165 return self in self.queue 00166 00167 def handler(self): 00168 """ 00169 Handle a timer that has expired. This must either be overriden by 00170 a subclass or set dynamically by set_handler(). 00171 """ 00172 raise NotImplementedError 00173 00174 def set_handler(self, handler): 00175 """ 00176 Set timer's expiration handler. This is an alternative to 00177 subclassing the timer class, and may be easier to use when 00178 integrating timers into other classes (eg, the handler can be a 00179 bound method to an object in a class representing a network 00180 connection). 00181 """ 00182 self.handler = handler 00183 00184 def errback(self, e): 00185 """ 00186 Error callback. May be overridden, or set with set_errback(). 00187 """ 00188 rpki.log.error("Unhandled exception from timer: %s" % e) 00189 rpki.log.traceback() 00190 00191 def set_errback(self, errback): 00192 """ 00193 Set a timer's errback. Like set_handler(), for errbacks. 00194 """ 00195 self.errback = errback 00196 00197 @classmethod 00198 def runq(cls): 00199 """ 00200 Run the timer queue: for each timer whose call time has passed, 00201 pull the timer off the queue and call its handler() method. 00202 """ 00203 while cls.queue and rpki.sundial.now() >= cls.queue[0].when: 00204 t = cls.queue.pop(0) 00205 if cls.run_debug: 00206 rpki.log.debug("Running %r" % t) 00207 try: 00208 t.handler() 00209 except (ExitNow, SystemExit): 00210 raise 00211 except Exception, e: 00212 t.errback(e) 00213 00214 def __repr__(self): 00215 return rpki.log.log_repr(self, self.when, repr(self.handler)) 00216 00217 @classmethod 00218 def seconds_until_wakeup(cls): 00219 """ 00220 Calculate delay until next timer expires, or None if no timers are 00221 set and we should wait indefinitely. Rounds up to avoid spinning 00222 in select() or poll(). We could calculate fractional seconds in 00223 the right units instead, but select() and poll() don't even take 00224 the same units (argh!), and we're not doing anything that 00225 hair-triggered, so rounding up is simplest. 00226 """ 00227 if not cls.queue: 00228 return None 00229 now = rpki.sundial.now() 00230 if now >= cls.queue[0].when: 00231 return 0 00232 delay = cls.queue[0].when - now 00233 seconds = delay.convert_to_seconds() 00234 if delay.microseconds: 00235 seconds += 1 00236 return seconds 00237 00238 @classmethod 00239 def clear(cls): 00240 """ 00241 Cancel every timer on the queue. We could just throw away the 00242 queue content, but this way we can notify subclasses that provide 00243 their own cancel() method. 00244 """ 00245 while cls.queue: 00246 cls.queue.pop(0).cancel() 00247 00248 ## @var deferred_queue 00249 # List to hold deferred actions. We used to do this with the timer 00250 # queue, but that appears to confuse the garbage collector, and is 00251 # overengineering for simple deferred actions in any case. 00252 00253 deferred_queue = [] 00254 00255 def defer(thunk): 00256 """ 00257 Defer an action until the next pass through the event loop. 00258 """ 00259 deferred_queue.append(thunk) 00260 00261 def run_deferred(): 00262 """ 00263 Run deferred actions. 00264 """ 00265 while deferred_queue: 00266 try: 00267 deferred_queue.pop(0)() 00268 except (ExitNow, SystemExit): 00269 raise 00270 except Exception, e: 00271 rpki.log.error("Unhandled exception from deferred action: %s" % e) 00272 rpki.log.traceback() 00273 00274 def _raiseExitNow(signum, frame): 00275 """ 00276 Signal handler for event_loop(). 00277 """ 00278 raise ExitNow 00279 00280 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): 00281 """ 00282 Replacement for asyncore.loop(), adding timer and signal support. 00283 """ 00284 while True: 00285 old_signal_handlers = {} 00286 try: 00287 for sig in catch_signals: 00288 old_signal_handlers[sig] = signal.signal(sig, _raiseExitNow) 00289 while asyncore.socket_map or deferred_queue or timer.queue: 00290 run_deferred() 00291 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) 00292 run_deferred() 00293 timer.runq() 00294 if timer.gc_debug: 00295 gc.collect() 00296 if gc.garbage: 00297 for i in gc.garbage: 00298 rpki.log.debug("GC-cycle %r" % i) 00299 del gc.garbage[:] 00300 except ExitNow: 00301 break 00302 except SystemExit: 00303 raise 00304 except Exception, e: 00305 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e) 00306 else: 00307 break 00308 finally: 00309 for sig in old_signal_handlers: 00310 signal.signal(sig, old_signal_handlers[sig]) 00311 00312 class sync_wrapper(object): 00313 """ 00314 Synchronous wrapper around asynchronous functions. Running in 00315 asynchronous mode at all times makes sense for event-driven daemons, 00316 but is kind of tedious for simple scripts, hence this wrapper. 00317 00318 The wrapped function should take at least two arguments: a callback 00319 function and an errback function. If any arguments are passed to 00320 the wrapper, they will be passed as additional arguments to the 00321 wrapped function. 00322 """ 00323 00324 res = None 00325 err = None 00326 00327 def __init__(self, func): 00328 self.func = func 00329 00330 def cb(self, res = None): 00331 """ 00332 Wrapped code has requested normal termination. Store result, and 00333 exit the event loop. 00334 """ 00335 self.res = res 00336 raise ExitNow 00337 00338 def eb(self, err): 00339 """ 00340 Wrapped code raised an exception. Store exception data, then exit 00341 the event loop. 00342 """ 00343 exc_info = sys.exc_info() 00344 self.err = exc_info if exc_info[1] is err else err 00345 raise ExitNow 00346 00347 def __call__(self, *args, **kwargs): 00348 00349 def thunk(): 00350 """ 00351 Deferred action to call the wrapped code once event system is 00352 running. 00353 """ 00354 try: 00355 self.func(self.cb, self.eb, *args, **kwargs) 00356 except ExitNow: 00357 raise 00358 except Exception, e: 00359 self.eb(e) 00360 00361 defer(thunk) 00362 event_loop() 00363 if self.err is not None: 00364 if isinstance(self.err, tuple): 00365 raise self.err[0], self.err[1], self.err[2] 00366 else: 00367 raise self.err 00368 else: 00369 return self.res 00370 00371 def exit_event_loop(): 00372 """ 00373 Force exit from event_loop(). 00374 """ 00375 raise ExitNow 00376 00377 class gc_summary(object): 00378 """ 00379 Periodic summary of GC state, for tracking down memory bloat. 00380 """ 00381 00382 def __init__(self, interval, threshold = 0): 00383 if isinstance(interval, (int, long)): 00384 interval = rpki.sundial.timedelta(seconds = interval) 00385 self.interval = interval 00386 self.threshold = threshold 00387 self.timer = timer(handler = self.handler) 00388 self.timer.set(self.interval) 00389 00390 def handler(self): 00391 """ 00392 Collect and log GC state for this period, reset timer. 00393 """ 00394 rpki.log.debug("gc_summary: Running gc.collect()") 00395 gc.collect() 00396 rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold) 00397 total = {} 00398 tuples = {} 00399 for g in gc.get_objects(): 00400 k = type(g).__name__ 00401 total[k] = total.get(k, 0) + 1 00402 if isinstance(g, tuple): 00403 k = ", ".join(type(x).__name__ for x in g) 00404 tuples[k] = tuples.get(k, 0) + 1 00405 rpki.log.debug("gc_summary: Sorting result") 00406 total = total.items() 00407 total.sort(reverse = True, key = lambda x: x[1]) 00408 tuples = tuples.items() 00409 tuples.sort(reverse = True, key = lambda x: x[1]) 00410 rpki.log.debug("gc_summary: Object type counts in descending order") 00411 for name, count in total: 00412 if count > self.threshold: 00413 rpki.log.debug("gc_summary: %8d %s" % (count, name)) 00414 rpki.log.debug("gc_summary: Tuple content type signature counts in descending order") 00415 for types, count in tuples: 00416 if count > self.threshold: 00417 rpki.log.debug("gc_summary: %8d (%s)" % (count, types)) 00418 rpki.log.debug("gc_summary: Scheduling next cycle") 00419 self.timer.set(self.interval)