RPKI Engine
1.0
|
00001 """ 00002 Utilities for event-driven programming. 00003 00004 $Id: async.py 4014 2011-10-05 16:30:24Z sra $ 00005 00006 Copyright (C) 2009--2011 Internet Systems Consortium ("ISC") 00007 00008 Permission to use, copy, modify, and distribute this software for any 00009 purpose with or without fee is hereby granted, provided that the above 00010 copyright notice and this permission notice appear in all copies. 00011 00012 THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH 00013 REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 00014 AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, 00015 INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM 00016 LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE 00017 OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR 00018 PERFORMANCE OF THIS SOFTWARE. 00019 """ 00020 00021 import asyncore, signal, traceback, gc, sys 00022 import rpki.log, rpki.sundial 00023 00024 ExitNow = asyncore.ExitNow 00025 00026 class iterator(object): 00027 """ 00028 Iteration construct for event-driven code. Takes three 00029 arguments: 00030 00031 - Some kind of iterable object 00032 00033 - A callback to call on each item in the iteration 00034 00035 - A callback to call after the iteration terminates. 00036 00037 The item callback receives two arguments: the callable iterator 00038 object and the current value of the iteration. It should call the 00039 iterator (or arrange for the iterator to be called) when it is time 00040 to continue to the next item in the iteration. 00041 00042 The termination callback receives no arguments. 00043 """ 00044 00045 def __init__(self, iterable, item_callback, done_callback, unwind_stack = True): 00046 self.item_callback = item_callback 00047 self.done_callback = done_callback 00048 self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] 00049 self.unwind_stack = unwind_stack 00050 try: 00051 self.iterator = iter(iterable) 00052 except (ExitNow, SystemExit): 00053 raise 00054 except Exception: 00055 rpki.log.debug("Problem constructing iterator for %r" % (iterable,)) 00056 raise 00057 self.doit() 00058 00059 def __repr__(self): 00060 return ("<%s created at %s:%s %s at 0x%x>" % 00061 (self.__class__.__name__, 00062 self.caller_file, self.caller_line, self.caller_function, id(self))) 00063 00064 def __call__(self): 00065 if self.unwind_stack: 00066 defer(self.doit) 00067 else: 00068 self.doit() 00069 00070 def doit(self): 00071 """ 00072 Implement the iterator protocol: attempt to call the item handler 00073 with the next iteration value, call the termination handler if the 00074 iterator signaled StopIteration. 00075 """ 00076 try: 00077 self.item_callback(self, self.iterator.next()) 00078 except StopIteration: 00079 if self.done_callback is not None: 00080 self.done_callback() 00081 00082 class timer(object): 00083 """ 00084 Timer construct for event-driven code. It can be used in either of two ways: 00085 00086 - As a virtual class, in which case the subclass should provide a 00087 handler() method to receive the wakup event when the timer expires; or 00088 00089 - By setting an explicit handler callback, either via the 00090 constructor or the set_handler() method. 00091 00092 Subclassing is probably more Pythonic, but setting an explict 00093 handler turns out to be very convenient when combined with bound 00094 methods to other objects. 00095 """ 00096 00097 ## @var gc_debug 00098 # Verbose chatter about timers states and garbage collection. 00099 gc_debug = False 00100 00101 ## @var run_debug 00102 # Verbose chatter about timers being run. 00103 run_debug = False 00104 00105 ## @var queue 00106 # Timer queue, shared by all timer instances (there can be only one queue). 00107 queue = [] 00108 00109 def __init__(self, handler = None, errback = None): 00110 if handler is not None: 00111 self.set_handler(handler) 00112 if errback is not None: 00113 self.set_errback(errback) 00114 self.when = None 00115 if self.gc_debug: 00116 self.trace("Creating %r" % self) 00117 00118 def trace(self, msg): 00119 """ 00120 Debug logging. 00121 """ 00122 if self.gc_debug: 00123 bt = traceback.extract_stack(limit = 3) 00124 rpki.log.debug("%s from %s:%d" % (msg, bt[0][0], bt[0][1])) 00125 00126 def set(self, when): 00127 """ 00128 Set a timer. Argument can be a datetime, to specify an absolute 00129 time, or a timedelta, to specify an offset time. 00130 """ 00131 if self.gc_debug: 00132 self.trace("Setting %r to %r" % (self, when)) 00133 if isinstance(when, rpki.sundial.timedelta): 00134 self.when = rpki.sundial.now() + when 00135 else: 00136 self.when = when 00137 assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) 00138 if self not in self.queue: 00139 self.queue.append(self) 00140 self.queue.sort(key = lambda x: x.when) 00141 00142 def __cmp__(self, other): 00143 return cmp(id(self), id(other)) 00144 00145 if gc_debug: 00146 def __del__(self): 00147 rpki.log.debug("Deleting %r" % self) 00148 00149 def cancel(self): 00150 """ 00151 Cancel a timer, if it was set. 00152 """ 00153 if self.gc_debug: 00154 self.trace("Canceling %r" % self) 00155 try: 00156 while True: 00157 self.queue.remove(self) 00158 except ValueError: 00159 pass 00160 00161 def is_set(self): 00162 """ 00163 Test whether this timer is currently set. 00164 """ 00165 return self in self.queue 00166 00167 def handler(self): 00168 """ 00169 Handle a timer that has expired. This must either be overriden by 00170 a subclass or set dynamically by set_handler(). 00171 """ 00172 raise NotImplementedError 00173 00174 def set_handler(self, handler): 00175 """ 00176 Set timer's expiration handler. This is an alternative to 00177 subclassing the timer class, and may be easier to use when 00178 integrating timers into other classes (eg, the handler can be a 00179 bound method to an object in a class representing a network 00180 connection). 00181 """ 00182 self.handler = handler 00183 00184 def errback(self, e): 00185 """ 00186 Error callback. May be overridden, or set with set_errback(). 00187 """ 00188 rpki.log.error("Unhandled exception from timer: %s" % e) 00189 rpki.log.traceback() 00190 00191 def set_errback(self, errback): 00192 """ 00193 Set a timer's errback. Like set_handler(), for errbacks. 00194 """ 00195 self.errback = errback 00196 00197 @classmethod 00198 def runq(cls): 00199 """ 00200 Run the timer queue: for each timer whose call time has passed, 00201 pull the timer off the queue and call its handler() method. 00202 """ 00203 while cls.queue and rpki.sundial.now() >= cls.queue[0].when: 00204 t = cls.queue.pop(0) 00205 if cls.run_debug: 00206 rpki.log.debug("Running %r" % t) 00207 try: 00208 t.handler() 00209 except (ExitNow, SystemExit): 00210 raise 00211 except Exception, e: 00212 t.errback(e) 00213 00214 def __repr__(self): 00215 return rpki.log.log_repr(self, self.when, repr(self.handler)) 00216 00217 @classmethod 00218 def seconds_until_wakeup(cls): 00219 """ 00220 Calculate delay until next timer expires, or None if no timers are 00221 set and we should wait indefinitely. Rounds up to avoid spinning 00222 in select() or poll(). We could calculate fractional seconds in 00223 the right units instead, but select() and poll() don't even take 00224 the same units (argh!), and we're not doing anything that 00225 hair-triggered, so rounding up is simplest. 00226 """ 00227 if not cls.queue: 00228 return None 00229 now = rpki.sundial.now() 00230 if now >= cls.queue[0].when: 00231 return 0 00232 delay = cls.queue[0].when - now 00233 seconds = delay.convert_to_seconds() 00234 if delay.microseconds: 00235 seconds += 1 00236 return seconds 00237 00238 @classmethod 00239 def clear(cls): 00240 """ 00241 Cancel every timer on the queue. We could just throw away the 00242 queue content, but this way we can notify subclasses that provide 00243 their own cancel() method. 00244 """ 00245 while cls.queue: 00246 cls.queue.pop(0).cancel() 00247 00248 ## @var deferred_queue 00249 # List to hold deferred actions. We used to do this with the timer 00250 # queue, but that appears to confuse the garbage collector, and is 00251 # overengineering for simple deferred actions in any case. 00252 00253 deferred_queue = [] 00254 00255 def defer(thunk): 00256 """ 00257 Defer an action until the next pass through the event loop. 00258 """ 00259 deferred_queue.append(thunk) 00260 00261 def run_deferred(): 00262 """ 00263 Run deferred actions. 00264 """ 00265 while deferred_queue: 00266 try: 00267 deferred_queue.pop(0)() 00268 except (ExitNow, SystemExit): 00269 raise 00270 except Exception, e: 00271 rpki.log.error("Unhandled exception from deferred action: %s" % e) 00272 rpki.log.traceback() 00273 00274 def _raiseExitNow(signum, frame): 00275 """ 00276 Signal handler for event_loop(). 00277 """ 00278 raise ExitNow 00279 00280 def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): 00281 """ 00282 Replacement for asyncore.loop(), adding timer and signal support. 00283 """ 00284 old_signal_handlers = {} 00285 while True: 00286 save_sigs = len(old_signal_handlers) == 0 00287 try: 00288 for sig in catch_signals: 00289 old = signal.signal(sig, _raiseExitNow) 00290 if save_sigs: 00291 old_signal_handlers[sig] = old 00292 while asyncore.socket_map or deferred_queue or timer.queue: 00293 run_deferred() 00294 asyncore.poll(timer.seconds_until_wakeup(), asyncore.socket_map) 00295 run_deferred() 00296 timer.runq() 00297 if timer.gc_debug: 00298 gc.collect() 00299 if gc.garbage: 00300 for i in gc.garbage: 00301 rpki.log.debug("GC-cycle %r" % i) 00302 del gc.garbage[:] 00303 except ExitNow: 00304 break 00305 except SystemExit: 00306 raise 00307 except ValueError, e: 00308 if str(e) == "filedescriptor out of range in select()": 00309 rpki.log.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.") 00310 rpki.log.error("Content of asyncore.socket_map:") 00311 for fd in sorted(asyncore.socket_map.iterkeys()): 00312 rpki.log.error(" fd %s obj %r" % (fd, asyncore.socket_map[fd])) 00313 rpki.log.error("Not safe to continue due to risk of spin loop on select(). Exiting.") 00314 sys.exit(1) 00315 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e) 00316 except Exception, e: 00317 rpki.log.error("event_loop() exited with exception %r, this is not supposed to happen, restarting" % e) 00318 else: 00319 break 00320 finally: 00321 for sig in old_signal_handlers: 00322 signal.signal(sig, old_signal_handlers[sig]) 00323 00324 class sync_wrapper(object): 00325 """ 00326 Synchronous wrapper around asynchronous functions. Running in 00327 asynchronous mode at all times makes sense for event-driven daemons, 00328 but is kind of tedious for simple scripts, hence this wrapper. 00329 00330 The wrapped function should take at least two arguments: a callback 00331 function and an errback function. If any arguments are passed to 00332 the wrapper, they will be passed as additional arguments to the 00333 wrapped function. 00334 """ 00335 00336 res = None 00337 err = None 00338 00339 def __init__(self, func): 00340 self.func = func 00341 00342 def cb(self, res = None): 00343 """ 00344 Wrapped code has requested normal termination. Store result, and 00345 exit the event loop. 00346 """ 00347 self.res = res 00348 raise ExitNow 00349 00350 def eb(self, err): 00351 """ 00352 Wrapped code raised an exception. Store exception data, then exit 00353 the event loop. 00354 """ 00355 exc_info = sys.exc_info() 00356 self.err = exc_info if exc_info[1] is err else err 00357 raise ExitNow 00358 00359 def __call__(self, *args, **kwargs): 00360 00361 def thunk(): 00362 """ 00363 Deferred action to call the wrapped code once event system is 00364 running. 00365 """ 00366 try: 00367 self.func(self.cb, self.eb, *args, **kwargs) 00368 except ExitNow: 00369 raise 00370 except Exception, e: 00371 self.eb(e) 00372 00373 defer(thunk) 00374 event_loop() 00375 if self.err is None: 00376 return self.res 00377 elif isinstance(self.err, tuple): 00378 raise self.err[0], self.err[1], self.err[2] 00379 else: 00380 raise self.err 00381 00382 def exit_event_loop(): 00383 """ 00384 Force exit from event_loop(). 00385 """ 00386 raise ExitNow 00387 00388 class gc_summary(object): 00389 """ 00390 Periodic summary of GC state, for tracking down memory bloat. 00391 """ 00392 00393 def __init__(self, interval, threshold = 0): 00394 if isinstance(interval, (int, long)): 00395 interval = rpki.sundial.timedelta(seconds = interval) 00396 self.interval = interval 00397 self.threshold = threshold 00398 self.timer = timer(handler = self.handler) 00399 self.timer.set(self.interval) 00400 00401 def handler(self): 00402 """ 00403 Collect and log GC state for this period, reset timer. 00404 """ 00405 rpki.log.debug("gc_summary: Running gc.collect()") 00406 gc.collect() 00407 rpki.log.debug("gc_summary: Summarizing (threshold %d)" % self.threshold) 00408 total = {} 00409 tuples = {} 00410 for g in gc.get_objects(): 00411 k = type(g).__name__ 00412 total[k] = total.get(k, 0) + 1 00413 if isinstance(g, tuple): 00414 k = ", ".join(type(x).__name__ for x in g) 00415 tuples[k] = tuples.get(k, 0) + 1 00416 rpki.log.debug("gc_summary: Sorting result") 00417 total = total.items() 00418 total.sort(reverse = True, key = lambda x: x[1]) 00419 tuples = tuples.items() 00420 tuples.sort(reverse = True, key = lambda x: x[1]) 00421 rpki.log.debug("gc_summary: Object type counts in descending order") 00422 for name, count in total: 00423 if count > self.threshold: 00424 rpki.log.debug("gc_summary: %8d %s" % (count, name)) 00425 rpki.log.debug("gc_summary: Tuple content type signature counts in descending order") 00426 for types, count in tuples: 00427 if count > self.threshold: 00428 rpki.log.debug("gc_summary: %8d (%s)" % (count, types)) 00429 rpki.log.debug("gc_summary: Scheduling next cycle") 00430 self.timer.set(self.interval)