diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-22 02:02:23 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-22 02:02:23 +0000 |
commit | 32d43381dfc0370acb774951f9cdd0cdb1ab7f1b (patch) | |
tree | cfb37241df8d869ed08a24b3021a4a599cb98bdb | |
parent | 1ca142b706eab552f9e2c575ef15d1862516393b (diff) |
First cut at replacing rpkid's HTTP and I/O system with Tornado. Not
quite working perfectly yet (cron is a bit wonky) but manages to
produce an initial set of ROAs without thowing any exceptions, and
code is already much cleaner than the old callback-based horror.
svn path=/branches/tk705/; revision=6139
-rw-r--r-- | buildtools/pylint.rc | 2 | ||||
-rw-r--r-- | ca/tests/smoketest.py | 7 | ||||
-rw-r--r-- | rpki/adns.py | 5 | ||||
-rw-r--r-- | rpki/async.py | 382 | ||||
-rw-r--r-- | rpki/cli.py | 2 | ||||
-rw-r--r-- | rpki/config.py | 42 | ||||
-rw-r--r-- | rpki/exceptions.py | 3 | ||||
-rw-r--r-- | rpki/fields.py | 2 | ||||
-rw-r--r-- | rpki/http.py | 1061 | ||||
-rw-r--r-- | rpki/left_right.py | 13 | ||||
-rw-r--r-- | rpki/publication.py | 11 | ||||
-rw-r--r-- | rpki/rpkic.py | 1 | ||||
-rw-r--r-- | rpki/rpkid.py | 442 | ||||
-rw-r--r-- | rpki/rpkid_tasks.py | 705 | ||||
-rw-r--r-- | rpki/rpkidb/models.py | 990 | ||||
-rw-r--r-- | rpki/up_down.py | 4 | ||||
-rw-r--r-- | rpki/x509.py | 13 |
17 files changed, 1012 insertions, 2673 deletions
diff --git a/buildtools/pylint.rc b/buildtools/pylint.rc index 34302f74..8eba84da 100644 --- a/buildtools/pylint.rc +++ b/buildtools/pylint.rc @@ -54,7 +54,7 @@ disable-msg-cat= #enable-msg= # Disable the message(s) with the given id(s). -disable=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212,R0914,W0603,W0142,I0011,C0111,C0103,R0401,C0326,R0911,C0325,C0330,W0311,E1124 +disable=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212,R0914,W0603,W0142,I0011,C0111,C0103,R0401,C0326,R0911,C0325,C0330,W0311,E1124,W0702 [REPORTS] diff --git a/ca/tests/smoketest.py b/ca/tests/smoketest.py index 5f18119c..3960981a 100644 --- a/ca/tests/smoketest.py +++ b/ca/tests/smoketest.py @@ -48,7 +48,6 @@ import rpki.log import rpki.left_right import rpki.config import rpki.publication_control -import rpki.async from rpki.mysql_import import MySQLdb @@ -784,7 +783,7 @@ class allocation(object): for sql in rpki_sql: try: cur.execute(sql) - except Exception: + except: if "DROP TABLE IF EXISTS" not in sql.upper(): raise db.close() @@ -795,7 +794,7 @@ class allocation(object): for sql in irdb_sql: try: cur.execute(sql) - except Exception: + except: if "DROP TABLE IF EXISTS" not in sql.upper(): raise for s in [self] + self.hosts: @@ -1244,7 +1243,7 @@ def setup_publication(pubd_sql, irdb_db_name): for sql in pubd_sql: try: cur.execute(sql) - except Exception: + except: if "DROP TABLE IF EXISTS" not in sql.upper(): raise db.close() diff --git a/rpki/adns.py b/rpki/adns.py index 018bb7cf..c5af3549 100644 --- a/rpki/adns.py +++ b/rpki/adns.py @@ -27,7 +27,6 @@ import time import socket import logging import asyncore -import rpki.async import rpki.sundial import rpki.log @@ -62,12 +61,12 @@ for ns in resolver.nameservers: try: nameservers.append((socket.AF_INET, dns.ipv4.inet_aton(ns))) continue - except Exception: + except: pass try: nameservers.append((socket.AF_INET6, dns.ipv6.inet_aton(ns))) continue - except Exception: + except: pass logger.error("Couldn't parse nameserver address %r", ns) diff --git a/rpki/async.py b/rpki/async.py deleted file mode 100644 index 74143bd1..00000000 --- a/rpki/async.py +++ /dev/null @@ -1,382 +0,0 @@ -# $Id$ -# -# Copyright (C) 2009--2012 Internet Systems Consortium ("ISC") -# -# Permission to use, copy, modify, and distribute this software for any -# purpose with or without fee is hereby granted, provided that the above -# copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH -# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY -# AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, -# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM -# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE -# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -# PERFORMANCE OF THIS SOFTWARE. - -""" -Utilities for event-driven programming. -""" - -import gc -import sys -import signal -import logging -import asyncore -import traceback -import rpki.log -import rpki.sundial - -logger = logging.getLogger(__name__) - -ExitNow = asyncore.ExitNow - -class iterator(object): - """ - Iteration construct for event-driven code. Takes three - arguments: - - - Some kind of iterable object - - - A callback to call on each item in the iteration - - - A callback to call after the iteration terminates. - - The item callback receives two arguments: the callable iterator - object and the current value of the iteration. It should call the - iterator (or arrange for the iterator to be called) when it is time - to continue to the next item in the iteration. - - The termination callback receives no arguments. - - Special case for memory constrained cases: if keyword argument - pop_list is True, iterable must be a list, which is modified in - place, popping items off of it until it's empty. - """ - - def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False): - assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list" - self.item_callback = item_callback - self.done_callback = done_callback if done_callback is not None else lambda: None - self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] - self.unwind_stack = unwind_stack - self.pop_list = pop_list - try: - if self.pop_list: - self.iterator = iterable - else: - self.iterator = iter(iterable) - except (ExitNow, SystemExit): - raise - except Exception: - logger.debug("Problem constructing iterator for %s", repr(iterable)) - raise - self.doit() - - def __repr__(self): - return rpki.log.log_repr(self, - "created at %s:%s" % (self.caller_file, - self.caller_line), - self.caller_function) - - def __call__(self): - if self.unwind_stack: - event_defer(self.doit) - else: - self.doit() - - def doit(self): - """ - Implement the iterator protocol: attempt to call the item handler - with the next iteration value, call the termination handler if the - iterator signaled StopIteration. - """ - - try: - if self.pop_list: - val = self.iterator.pop(0) - else: - val = self.iterator.next() - except (IndexError, StopIteration): - self.done_callback() - else: - self.item_callback(self, val) - -## @var timer_queue -# Timer queue. - -timer_queue = [] - -class timer(object): - """ - Timer construct for event-driven code. - """ - - ## @var gc_debug - # Verbose chatter about timers states and garbage collection. - gc_debug = False - - ## @var run_debug - # Verbose chatter about timers being run. - run_debug = False - - def __init__(self, handler = None, errback = None): - self.set_handler(handler) - self.set_errback(errback) - self.when = None - if self.gc_debug: - self.trace("Creating %r" % self) - - def trace(self, msg): - """ - Debug logging. - """ - - if self.gc_debug: - bt = traceback.extract_stack(limit = 3) - logger.debug("%s from %s:%d", msg, bt[0][0], bt[0][1]) - - def set(self, when): - """ - Set a timer. Argument can be a datetime, to specify an absolute - time, or a timedelta, to specify an offset time. - """ - - if self.gc_debug: - self.trace("Setting %r to %r" % (self, when)) - if isinstance(when, rpki.sundial.timedelta): - self.when = rpki.sundial.now() + when - else: - self.when = when - assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when) - if self not in timer_queue: - timer_queue.append(self) - timer_queue.sort(key = lambda x: x.when) - - def __cmp__(self, other): - return cmp(id(self), id(other)) - - if gc_debug: - def __del__(self): - logger.debug("Deleting %r", self) - - def cancel(self): - """ - Cancel a timer, if it was set. - """ - - if self.gc_debug: - self.trace("Canceling %r" % self) - try: - while True: - timer_queue.remove(self) - except ValueError: - pass - - def is_set(self): - """ - Test whether this timer is currently set. - """ - - return self in timer_queue - - def set_handler(self, handler): - """ - Set timer's expiration handler. This is an alternative to - subclassing the timer class, and may be easier to use when - integrating timers into other classes (eg, the handler can be a - bound method to an object in a class representing a network - connection). - """ - - self.handler = handler - - def set_errback(self, errback): - """ - Set a timer's errback. Like set_handler(), for errbacks. - """ - - self.errback = errback - - @classmethod - def runq(cls): - """ - Run the timer queue: for each timer whose call time has passed, - pull the timer off the queue and call its handler() method. - - Comparisions are made against time at which this function was - called, so that even if new events keep getting scheduled, we'll - return to the I/O loop reasonably quickly. - """ - - now = rpki.sundial.now() - while timer_queue and now >= timer_queue[0].when: - t = timer_queue.pop(0) - if cls.run_debug: - logger.debug("Running %r", t) - try: - if t.handler is not None: - t.handler() - else: - logger.warning("Timer %r expired with no handler set", t) - except (ExitNow, SystemExit): - raise - except Exception, e: - if t.errback is not None: - t.errback(e) - else: - logger.exception("Unhandled exception from timer %r", t) - - def __repr__(self): - return rpki.log.log_repr(self, self.when, repr(self.handler)) - - @classmethod - def seconds_until_wakeup(cls): - """ - Calculate delay until next timer expires, or None if no timers are - set and we should wait indefinitely. Rounds up to avoid spinning - in select() or poll(). We could calculate fractional seconds in - the right units instead, but select() and poll() don't even take - the same units (argh!), and we're not doing anything that - hair-triggered, so rounding up is simplest. - """ - - if not timer_queue: - return None - now = rpki.sundial.now() - if now >= timer_queue[0].when: - return 0 - delay = timer_queue[0].when - now - seconds = delay.convert_to_seconds() - if delay.microseconds: - seconds += 1 - return seconds - - @classmethod - def clear(cls): - """ - Cancel every timer on the queue. We could just throw away the - queue content, but this way we can notify subclasses that provide - their own cancel() method. - """ - - while timer_queue: - timer_queue.pop(0).cancel() - -def _raiseExitNow(signum, frame): - """ - Signal handler for event_loop(). - """ - - raise ExitNow - -def exit_event_loop(): - """ - Force exit from event_loop(). - """ - - raise ExitNow - -def event_defer(handler, delay = rpki.sundial.timedelta(seconds = 0)): - """ - Use a near-term (default: zero interval) timer to schedule an event - to run after letting the I/O system have a turn. - """ - - timer(handler).set(delay) - -## @var debug_event_timing -# Enable insanely verbose logging of event timing - -debug_event_timing = False - -def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)): - """ - Replacement for asyncore.loop(), adding timer and signal support. - """ - - old_signal_handlers = {} - while True: - save_sigs = len(old_signal_handlers) == 0 - try: - for sig in catch_signals: - old = signal.signal(sig, _raiseExitNow) - if save_sigs: - old_signal_handlers[sig] = old - while asyncore.socket_map or timer_queue: - t = timer.seconds_until_wakeup() - if debug_event_timing: - logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue) - asyncore.poll(t, asyncore.socket_map) - timer.runq() - if timer.gc_debug: - gc.collect() - if gc.garbage: - for i in gc.garbage: - logger.debug("GC-cycle %r", i) - del gc.garbage[:] - except ExitNow: - break - except SystemExit: - raise - except ValueError, e: - if str(e) == "filedescriptor out of range in select()": - logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.") - logger.error("Content of asyncore.socket_map:") - for fd in sorted(asyncore.socket_map.iterkeys()): - logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd]) - logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.") - sys.exit(1) - logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting") - except Exception, e: - logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting") - else: - break - finally: - for sig in old_signal_handlers: - signal.signal(sig, old_signal_handlers[sig]) - -class gc_summary(object): - """ - Periodic summary of GC state, for tracking down memory bloat. - """ - - def __init__(self, interval, threshold = 0): - if isinstance(interval, (int, long)): - interval = rpki.sundial.timedelta(seconds = interval) - self.interval = interval - self.threshold = threshold - self.timer = timer(handler = self.handler) - self.timer.set(self.interval) - - def handler(self): - """ - Collect and log GC state for this period, reset timer. - """ - - logger.debug("gc_summary: Running gc.collect()") - gc.collect() - logger.debug("gc_summary: Summarizing (threshold %d)", self.threshold) - total = {} - tuples = {} - for g in gc.get_objects(): - k = type(g).__name__ - total[k] = total.get(k, 0) + 1 - if isinstance(g, tuple): - k = ", ".join(type(x).__name__ for x in g) - tuples[k] = tuples.get(k, 0) + 1 - logger.debug("gc_summary: Sorting result") - total = total.items() - total.sort(reverse = True, key = lambda x: x[1]) - tuples = tuples.items() - tuples.sort(reverse = True, key = lambda x: x[1]) - logger.debug("gc_summary: Object type counts in descending order") - for name, count in total: - if count > self.threshold: - logger.debug("gc_summary: %8d %s", count, name) - logger.debug("gc_summary: Tuple content type signature counts in descending order") - for types, count in tuples: - if count > self.threshold: - logger.debug("gc_summary: %8d (%s)", count, types) - logger.debug("gc_summary: Scheduling next cycle") - self.timer.set(self.interval) diff --git a/rpki/cli.py b/rpki/cli.py index e75b8430..35999cb0 100644 --- a/rpki/cli.py +++ b/rpki/cli.py @@ -77,7 +77,7 @@ class Cmd(cmd.Cmd): return False except BadCommandSyntax, e: print e - except Exception: + except: traceback.print_exc() self.last_command_failed = True return False diff --git a/rpki/config.py b/rpki/config.py index 3234294c..99041259 100644 --- a/rpki/config.py +++ b/rpki/config.py @@ -205,9 +205,7 @@ class parser(object): """ # pylint: disable=W0621 - import rpki.http import rpki.x509 - import rpki.async import rpki.log import rpki.daemonize @@ -219,46 +217,11 @@ class parser(object): logger.warning("Could not process configure_logger line %r: %s", line, e) try: - rpki.http.want_persistent_client = self.getboolean("want_persistent_client") - except ConfigParser.NoOptionError: - pass - - try: - rpki.http.want_persistent_server = self.getboolean("want_persistent_server") - except ConfigParser.NoOptionError: - pass - - try: - rpki.http.use_adns = self.getboolean("use_adns") - except ConfigParser.NoOptionError: - pass - - try: - rpki.http.enable_ipv6_clients = self.getboolean("enable_ipv6_clients") - except ConfigParser.NoOptionError: - pass - - try: - rpki.http.enable_ipv6_servers = self.getboolean("enable_ipv6_servers") - except ConfigParser.NoOptionError: - pass - - try: rpki.x509.CMS_object.debug_cms_certs = self.getboolean("debug_cms_certs") except ConfigParser.NoOptionError: pass try: - rpki.async.timer.gc_debug = self.getboolean("gc_debug") - except ConfigParser.NoOptionError: - pass - - try: - rpki.async.timer.run_debug = self.getboolean("timer_debug") - except ConfigParser.NoOptionError: - pass - - try: rpki.x509.XML_CMS_object.dump_outbound_cms = rpki.x509.DeadDrop(self.get("dump_outbound_cms")) except OSError, e: logger.warning("Couldn't initialize mailbox %s: %s", self.get("dump_outbound_cms"), e) @@ -283,11 +246,6 @@ class parser(object): pass try: - rpki.async.gc_summary(self.getint("gc_summary"), self.getint("gc_summary_threshold", 0)) - except ConfigParser.NoOptionError: - pass - - try: rpki.log.enable_tracebacks = self.getboolean("enable_tracebacks") except ConfigParser.NoOptionError: pass diff --git a/rpki/exceptions.py b/rpki/exceptions.py index b6889b0d..f456dfc5 100644 --- a/rpki/exceptions.py +++ b/rpki/exceptions.py @@ -238,3 +238,6 @@ class WrongEKU(RPKI_Exception): class UnexpectedUpDownResponse(RPKI_Exception): "Up-down message is not of the expected type." + +class BadContentType(RPKI_Exception): + "Bad HTTP Content-Type." diff --git a/rpki/fields.py b/rpki/fields.py index 6c71ac35..9fc57e51 100644 --- a/rpki/fields.py +++ b/rpki/fields.py @@ -173,7 +173,7 @@ class RSAPrivateKeyField(DERField): description = "RSA keypair" rpki_type = rpki.x509.RSA -KeyField = RSAPrivateKeyField # XXX backwards compatability +KeyField = RSAPrivateKeyField class PublicKeyField(DERField): description = "RSA keypair" diff --git a/rpki/http.py b/rpki/http.py deleted file mode 100644 index b991eeb0..00000000 --- a/rpki/http.py +++ /dev/null @@ -1,1061 +0,0 @@ -# $Id$ -# -# Copyright (C) 2013--2014 Dragon Research Labs ("DRL") -# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC") -# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") -# -# Permission to use, copy, modify, and distribute this software for any -# purpose with or without fee is hereby granted, provided that the above -# copyright notices and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL -# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL, -# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR -# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS -# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, -# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION -# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -""" -HTTP utilities, both client and server. -""" - -import time -import socket -import asyncore -import asynchat -import urlparse -import sys -import random -import logging -import rpki.async -import rpki.sundial -import rpki.x509 -import rpki.exceptions -import rpki.log -import rpki.POW - -logger = logging.getLogger(__name__) - -## @var default_content_type -# HTTP content type used for RPKI messages. -# Can be overriden on a per-client or per-server basis. -default_content_type = "application/x-rpki" - -## @var want_persistent_client -# Whether we want persistent HTTP client streams, when server also supports them. -want_persistent_client = False - -## @var want_persistent_server -# Whether we want persistent HTTP server streams, when client also supports them. -want_persistent_server = False - -## @var default_client_timeout -# Default HTTP client connection timeout. -default_client_timeout = rpki.sundial.timedelta(minutes = 5) - -## @var default_server_timeout -# Default HTTP server connection timeouts. Given our druthers, we'd -# prefer that the client close the connection, as this avoids the -# problem of client starting to reuse connection just as server closes -# it, so this should be longer than the client timeout. -default_server_timeout = rpki.sundial.timedelta(minutes = 10) - -## @var default_http_version -# Preferred HTTP version. -default_http_version = (1, 0) - -## @var default_tcp_port -# Default port for clients and servers that don't specify one. -default_tcp_port = 80 - -## @var enable_ipv6_servers -# Whether to enable IPv6 listeners. Enabled by default, as it should -# be harmless. Has no effect if kernel doesn't support IPv6. -enable_ipv6_servers = True - -## @var enable_ipv6_clients -# Whether to consider IPv6 addresses when making connections. -# Disabled by default, as IPv6 connectivity is still a bad joke in -# far too much of the world. -enable_ipv6_clients = False - -## @var have_ipv6 -# Whether the current machine claims to support IPv6. Note that just -# because the kernel supports it doesn't mean that the machine has -# usable IPv6 connectivity. I don't know of a simple portable way to -# probe for connectivity at runtime (the old test of "can you ping -# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set -# automatically by probing using the socket() system call at runtime. -try: - # pylint: disable=W0702,W0104 - socket.socket(socket.AF_INET6).close() - socket.IPPROTO_IPV6 - socket.IPV6_V6ONLY -except: - have_ipv6 = False -else: - have_ipv6 = True - -## @var use_adns - -# Whether to use rpki.adns code. This is still experimental, so it's -# not (yet) enabled by default. -use_adns = False -try: - import rpki.adns -except ImportError: - pass - -def supported_address_families(enable_ipv6): - """ - IP address families on which servers should listen, and to consider - when selecting addresses for client connections. - """ - - if enable_ipv6 and have_ipv6: - return (socket.AF_INET, socket.AF_INET6) - else: - return (socket.AF_INET,) - -def localhost_addrinfo(): - """ - Return pseudo-getaddrinfo results for localhost. - """ - - result = [(socket.AF_INET, "127.0.0.1")] - if enable_ipv6_clients and have_ipv6: - result.append((socket.AF_INET6, "::1")) - return result - -class http_message(object): - """ - Virtual class representing of one HTTP message. - """ - - software_name = "ISC RPKI library" - - def __init__(self, version = None, body = None, headers = None): - self.version = version - self.body = body - self.headers = headers - self.normalize_headers() - - def normalize_headers(self, headers = None): - """ - Clean up (some of) the horrible messes that HTTP allows in its - headers. - """ - - if headers is None: - headers = () if self.headers is None else self.headers.items() - translate_underscore = True - else: - translate_underscore = False - result = {} - for k, v in headers: - if translate_underscore: - k = k.replace("_", "-") - k = "-".join(s.capitalize() for s in k.split("-")) - v = v.strip() - if k in result: - result[k] += ", " + v - else: - result[k] = v - self.headers = result - - @classmethod - def parse_from_wire(cls, headers): - """ - Parse and normalize an incoming HTTP message. - """ - - self = cls() - headers = headers.split("\r\n") - self.parse_first_line(*headers.pop(0).split(None, 2)) - for i in xrange(len(headers) - 2, -1, -1): - if headers[i + 1][0].isspace(): - headers[i] += headers[i + 1] - del headers[i + 1] - self.normalize_headers([h.split(":", 1) for h in headers]) - return self - - def format(self): - """ - Format an outgoing HTTP message. - """ - - s = str(self.format_first_line()) - assert isinstance(s, str) - if self.body is not None: - assert isinstance(self.body, str) - self.headers["Content-Length"] = len(self.body) - for kv in self.headers.iteritems(): - h = str("%s: %s\r\n" % kv) - assert isinstance(h, str) - s += h - s += "\r\n" - assert isinstance(s, str) - if self.body is not None: - s += self.body - assert isinstance(s, str) - return s - - def __str__(self): - return self.format() - - def parse_version(self, version): - """ - Parse HTTP version, raise an exception if we can't. - """ - - if version[:5] != "HTTP/": - raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version) - self.version = tuple(int(i) for i in version[5:].split(".")) - - @property - def persistent(self): - """ - Figure out whether this HTTP message encourages a persistent connection. - """ - - c = self.headers.get("Connection") - if self.version == (1, 1): - return c is None or "close" not in c.lower() - elif self.version == (1, 0): - return c is not None and "keep-alive" in c.lower() - else: - return False - -class http_request(http_message): - """ - HTTP request message. - """ - - def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): - assert cmd == "POST" or body is None - http_message.__init__(self, version = version, body = body, headers = headers) - self.cmd = cmd - self.path = path - self.callback = callback - self.errback = errback - self.retried = False - - def parse_first_line(self, cmd, path, version): - """ - Parse first line of HTTP request message. - """ - - self.parse_version(version) - self.cmd = cmd - self.path = path - - def format_first_line(self): - """ - Format first line of HTTP request message, and set up the - User-Agent header. - """ - - self.headers.setdefault("User-Agent", self.software_name) - return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) - - def __repr__(self): - return rpki.log.log_repr(self, self.cmd, self.path) - -class http_response(http_message): - """ - HTTP response message. - """ - - def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): - http_message.__init__(self, version = version, body = body, headers = headers) - self.code = code - self.reason = reason - - def parse_first_line(self, version, code, reason): - """ - Parse first line of HTTP response message. - """ - - self.parse_version(version) - self.code = int(code) - self.reason = reason - - def format_first_line(self): - """ - Format first line of HTTP response message, and set up Date and - Server headers. - """ - - self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) - self.headers.setdefault("Server", self.software_name) - return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) - - def __repr__(self): - return rpki.log.log_repr(self, self.code, self.reason) - -def addr_to_string(addr): - """ - Convert socket addr tuple to printable string. Assumes 2-element - tuple is IPv4, 4-element tuple is IPv6, throws TypeError for - anything else. - """ - - if len(addr) == 2: - return "%s:%d" % (addr[0], addr[1]) - if len(addr) == 4: - return "%s.%d" % (addr[0], addr[1]) - raise TypeError - -@rpki.log.class_logger(logger) -class http_stream(asynchat.async_chat): - """ - Virtual class representing an HTTP message stream. - """ - - # Keep pylint happy; @class_logger overwrites this. - logger = None - - def __repr__(self): - status = ["connected"] if self.connected else [] - try: - status.append(addr_to_string(self.addr)) - except TypeError: - pass - return rpki.log.log_repr(self, *status) - - def __init__(self, sock = None): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - asynchat.async_chat.__init__(self, sock) - self.buffer = [] - self.timer = rpki.async.timer(self.handle_timeout) - self.restart() - - def restart(self): - """ - (Re)start HTTP message parser, reset timer. - """ - - assert not self.buffer - self.chunk_handler = None - self.set_terminator("\r\n\r\n") - self.update_timeout() - - def update_timeout(self): - """ - Put this stream's timer in known good state: set it to the - stream's timeout value if we're doing timeouts, otherwise clear - it. - """ - - if self.timeout is not None: - self.logger.debug("Setting timeout %s", self.timeout) - self.timer.set(self.timeout) - else: - self.logger.debug("Clearing timeout") - self.timer.cancel() - - def collect_incoming_data(self, data): - """ - Buffer incoming data from asynchat. - """ - - self.buffer.append(data) - self.update_timeout() - - def get_buffer(self): - """ - Consume data buffered from asynchat. - """ - - val = "".join(self.buffer) - self.buffer = [] - return val - - def found_terminator(self): - """ - Asynchat reported that it found whatever terminator we set, so - figure out what to do next. This can be messy, because we can be - in any of several different states: - - @li We might be handling chunked HTTP, in which case we have to - initialize the chunk decoder; - - @li We might have found the end of the message body, in which case - we can (finally) process it; or - - @li We might have just gotten to the end of the message headers, - in which case we have to parse them to figure out which of three - separate mechanisms (chunked, content-length, TCP close) is going - to tell us how to find the end of the message body. - """ - - self.update_timeout() - if self.chunk_handler: - self.chunk_handler() - elif not isinstance(self.get_terminator(), str): - self.handle_body() - else: - self.msg = self.parse_type.parse_from_wire(self.get_buffer()) - if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower(): - self.msg.body = [] - self.chunk_handler = self.chunk_header - self.set_terminator("\r\n") - elif "Content-Length" in self.msg.headers: - self.set_terminator(int(self.msg.headers["Content-Length"])) - else: - self.handle_no_content_length() - - def chunk_header(self): - """ - Asynchat just handed us what should be the header of one chunk of - a chunked encoding stream. If this chunk has a body, set the - stream up to read it; otherwise, this is the last chunk, so start - the process of exiting the chunk decoder. - """ - - n = int(self.get_buffer().partition(";")[0], 16) - self.logger.debug("Chunk length %s", n) - if n: - self.chunk_handler = self.chunk_body - self.set_terminator(n) - else: - self.msg.body = "".join(self.msg.body) - self.chunk_handler = self.chunk_discard_trailer - - def chunk_body(self): - """ - Asynchat just handed us what should be the body of a chunk of the - body of a chunked message (sic). Save it, and prepare to move on - to the next chunk. - """ - - self.logger.debug("Chunk body") - self.msg.body += self.buffer - self.buffer = [] - self.chunk_handler = self.chunk_discard_crlf - self.set_terminator("\r\n") - - def chunk_discard_crlf(self): - """ - Consume the CRLF that terminates a chunk, reinitialize chunk - decoder to be ready for the next chunk. - """ - - self.logger.debug("Chunk CRLF") - s = self.get_buffer() - assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) - self.chunk_handler = self.chunk_header - - def chunk_discard_trailer(self): - """ - Consume chunk trailer, which should be empty, then (finally!) exit - the chunk decoder and hand complete message off to the application. - """ - - self.logger.debug("Chunk trailer") - s = self.get_buffer() - assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) - self.chunk_handler = None - self.handle_message() - - def handle_body(self): - """ - Hand normal (not chunked) message off to the application. - """ - - self.msg.body = self.get_buffer() - self.handle_message() - - def handle_error(self): - """ - Asynchat (or asyncore, or somebody) raised an exception. See - whether it's one we should just pass along, otherwise log a stack - trace and close the stream. - """ - - self.timer.cancel() - etype = sys.exc_info()[0] - if etype in (SystemExit, rpki.async.ExitNow): - raise - if etype is not rpki.exceptions.HTTPClientAborted: - self.logger.exception("Closing due to error") - self.close() - - def handle_timeout(self): - """ - Inactivity timer expired, close connection with prejudice. - """ - - self.logger.debug("Timeout, closing") - self.close() - - def handle_close(self): - """ - Wrapper around asynchat connection close handler, so that we can - log the event, cancel timer, and so forth. - """ - - self.logger.debug("Close event in HTTP stream handler") - self.timer.cancel() - asynchat.async_chat.handle_close(self) - -@rpki.log.class_logger(logger) -class http_server(http_stream): - """ - HTTP server stream. - """ - - ## @var parse_type - # Stream parser should look for incoming HTTP request messages. - parse_type = http_request - - ## @var timeout - # Use the default server timeout value set in the module header. - timeout = default_server_timeout - - def __init__(self, sock, handlers): - self.handlers = handlers - self.received_content_type = None - http_stream.__init__(self, sock = sock) - self.expect_close = not want_persistent_server - self.logger.debug("Starting") - - def handle_no_content_length(self): - """ - Handle an incoming message that used neither chunking nor a - Content-Length header (that is: this message will be the last one - in this server stream). No special action required. - """ - - self.handle_message() - - def find_handler(self, path): - """ - Helper method to search self.handlers. - """ - - for h in self.handlers: - if path.startswith(h[0]): - return h[1], h[2] if len(h) > 2 else (default_content_type,) - return None, None - - def handle_message(self): - """ - HTTP layer managed to deliver a complete HTTP request to - us, figure out what to do with it. Check the command and - Content-Type, look for a handler, and if everything looks right, - pass the message body, path, and a reply callback to the handler. - """ - - self.logger.debug("Received request %r", self.msg) - if not self.msg.persistent: - self.expect_close = True - handler, allowed_content_types = self.find_handler(self.msg.path) - self.received_content_type = self.msg.headers["Content-Type"] - error = None - if self.msg.cmd != "POST": - error = 501, "No handler for method %s" % self.msg.cmd - elif self.received_content_type not in allowed_content_types: - error = 415, "No handler for Content-Type %s" % self.received_content_type - elif handler is None: - error = 404, "No handler for URL %s" % self.msg.path - if error is None: - try: - handler(self.msg.body, self.msg.path, self.send_reply) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - self.logger.exception("Unhandled exception while handling HTTP request") - self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) - else: - self.send_error(code = error[0], reason = error[1]) - - def send_error(self, code, reason): - """ - Send an error response to this request. - """ - - self.send_message(code = code, reason = reason) - - def send_reply(self, code, body = None, reason = "OK"): - """ - Send a reply to this request. - """ - - self.send_message(code = code, body = body, reason = reason) - - def send_message(self, code, reason = "OK", body = None): - """ - Queue up reply message. If both parties agree that connection is - persistant, and if no error occurred, restart this stream to - listen for next message; otherwise, queue up a close event for - this stream so it will shut down once the reply has been sent. - """ - - self.logger.debug("Sending response %s %s", code, reason) - if code >= 400: - self.expect_close = True - msg = http_response(code = code, reason = reason, body = body, - Content_Type = self.received_content_type, - Connection = "Close" if self.expect_close else "Keep-Alive") - self.push(msg.format()) - if self.expect_close: - self.logger.debug("Closing") - self.timer.cancel() - self.close_when_done() - else: - self.logger.debug("Listening for next message") - self.restart() - -@rpki.log.class_logger(logger) -class http_listener(asyncore.dispatcher): - """ - Listener for incoming HTTP connections. - """ - - def __repr__(self): - try: - status = (addr_to_string(self.addr),) - except TypeError: - status = () - return rpki.log.log_repr(self, *status) - - def __init__(self, handlers, addrinfo): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - asyncore.dispatcher.__init__(self) - self.handlers = handlers - try: - af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612 - self.create_socket(af, socktype) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - except AttributeError: - pass - if have_ipv6 and af == socket.AF_INET6: - self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) - self.bind(sockaddr) - self.listen(5) - except Exception: - self.logger.exception("Couldn't set up HTTP listener") - self.close() - for h in handlers: - self.logger.debug("Handling %s", h[0]) - - def handle_accept(self): - """ - Asyncore says we have an incoming connection, spawn an http_server - stream for it and pass along all of our handler data. - """ - - try: - res = self.accept() - if res is None: - raise - sock, addr = res # pylint: disable=W0633 - self.logger.debug("Accepting connection from %s", addr_to_string(addr)) - http_server(sock = sock, handlers = self.handlers) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.logger.exception("Unable to accept connection") - - def handle_error(self): - """ - Asyncore signaled an error, pass it along or log it. - """ - - if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): - raise - self.logger.exception("Error in HTTP listener") - -@rpki.log.class_logger(logger) -class http_client(http_stream): - """ - HTTP client stream. - """ - - ## @var parse_type - # Stream parser should look for incoming HTTP response messages. - parse_type = http_response - - ## @var timeout - # Use the default client timeout value set in the module header. - timeout = default_client_timeout - - ## @var state - # Application layer connection state. - state = None - - def __init__(self, queue, hostport): - http_stream.__init__(self) - self.logger.debug("Creating new connection to %s", addr_to_string(hostport)) - self.queue = queue - self.host = hostport[0] - self.port = hostport[1] - self.set_state("opening") - self.expect_close = not want_persistent_client - - def start(self): - """ - Create socket and request a connection. - """ - - if not use_adns: - self.logger.debug("Not using ADNS") - self.gotaddrinfo([(socket.AF_INET, self.host)]) - elif self.host == "localhost": - self.logger.debug("Bypassing DNS for localhost") - self.gotaddrinfo(localhost_addrinfo()) - else: - families = supported_address_families(enable_ipv6_clients) - self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families) - rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families) - - def dns_error(self, e): - """ - Handle DNS lookup errors. For now, just whack the connection. - Undoubtedly we should do something better with diagnostics here. - """ - - self.handle_error() - - def gotaddrinfo(self, addrinfo): - """ - Got address data from DNS, create socket and request connection. - """ - - try: - self.af, self.address = random.choice(addrinfo) - self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address) - self.create_socket(self.af, socket.SOCK_STREAM) - self.connect((self.address, self.port)) - if self.addr is None: - self.addr = (self.host, self.port) - self.update_timeout() - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.handle_error() - - def handle_connect(self): - """ - Asyncore says socket has connected. - """ - - self.logger.debug("Socket connected") - self.set_state("idle") - assert self.queue.client is self - self.queue.send_request() - - def set_state(self, state): - """ - Set HTTP client connection state. - """ - - self.logger.debug("State transition %s => %s", self.state, state) - self.state = state - - def handle_no_content_length(self): - """ - Handle response message that used neither chunking nor a - Content-Length header (that is: this message will be the last one - in this server stream). In this case we want to read until we - reach the end of the data stream. - """ - - self.set_terminator(None) - - def send_request(self, msg): - """ - Queue up request message and kickstart connection. - """ - - self.logger.debug("Sending request %r", msg) - assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) - self.set_state("request-sent") - msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" - self.push(msg.format()) - self.restart() - - def handle_message(self): - """ - Handle incoming HTTP response message. Make sure we're in a state - where we expect to see such a message (and allow the mysterious - empty messages that Apache sends during connection close, no idea - what that is supposed to be about). If everybody agrees that the - connection should stay open, put it into an idle state; otherwise, - arrange for the stream to shut down. - """ - - self.logger.debug("Message received, state %s", self.state) - - if not self.msg.persistent: - self.expect_close = True - - if self.state != "request-sent": - if self.state == "closing": - assert not self.msg.body - self.logger.debug("Ignoring empty response received while closing") - return - raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state)) - - if self.expect_close: - self.logger.debug("Closing") - self.set_state("closing") - self.close_when_done() - else: - self.logger.debug("Idling") - self.set_state("idle") - self.update_timeout() - - if self.msg.code != 200: - errmsg = "HTTP request failed" - if self.msg.code is not None: - errmsg += " with status %s" % self.msg.code - if self.msg.reason: - errmsg += ", reason %s" % self.msg.reason - if self.msg.body: - errmsg += ", response %s" % self.msg.body - raise rpki.exceptions.HTTPRequestFailed(errmsg) - self.queue.return_result(self, self.msg, detach = self.expect_close) - - def handle_close(self): - """ - Asyncore signaled connection close. If we were waiting for that - to find the end of a response message, process the resulting - message now; if we were waiting for the response to a request we - sent, signal the error. - """ - - http_stream.handle_close(self) - self.logger.debug("State %s", self.state) - if self.get_terminator() is None: - self.handle_body() - elif self.state == "request-sent": - raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event") - else: - self.queue.detach(self) - - def handle_timeout(self): - """ - Connection idle timer has expired. Shut down connection in any - case, noisily if we weren't idle. - """ - - bad = self.state not in ("idle", "closing") - if bad: - self.logger.warning("Timeout while in state %s", self.state) - http_stream.handle_timeout(self) - if bad: - try: - raise rpki.exceptions.HTTPTimeout - except: # pylint: disable=W0702 - self.handle_error() - else: - self.queue.detach(self) - - def handle_error(self): - """ - Asyncore says something threw an exception. Log it, then shut - down the connection and pass back the exception. - """ - - eclass, edata = sys.exc_info()[0:2] - self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata) - http_stream.handle_error(self) - self.queue.return_result(self, edata, detach = True) - -@rpki.log.class_logger(logger) -class http_queue(object): - """ - Queue of pending HTTP requests for a single destination. This class - is very tightly coupled to http_client; http_client handles the HTTP - stream itself, this class provides a slightly higher-level API. - """ - - def __repr__(self): - return rpki.log.log_repr(self, addr_to_string(self.hostport)) - - def __init__(self, hostport): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - self.hostport = hostport - self.client = None - self.logger.debug("Created") - self.queue = [] - - def request(self, *requests): - """ - Append http_request object(s) to this queue. - """ - - self.logger.debug("Adding requests %r", requests) - self.queue.extend(requests) - - def restart(self): - """ - Send next request for this queue, if we can. This may involve - starting a new http_client stream, reusing an existing idle - stream, or just ignoring this request if there's an active client - stream already; in the last case, handling of the response (or - exception, or timeout) for the query currently in progress will - call this method when it's time to kick out the next query. - """ - - try: - if self.client is None: - self.client = http_client(self, self.hostport) - self.logger.debug("Attached client %r", self.client) - self.client.start() - elif self.client.state == "idle": - self.logger.debug("Sending request to existing client %r", self.client) - self.send_request() - else: - self.logger.debug("Client %r exists in state %r", self.client, self.client.state) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - self.return_result(self.client, e, detach = True) - - def send_request(self): - """ - Kick out the next query in this queue, if any. - """ - - if self.queue: - self.client.send_request(self.queue[0]) - - def detach(self, client_): - """ - Detatch a client from this queue. Silently ignores attempting to - detach a client that is not attached to this queue, to simplify - handling of what otherwise would be a nasty set of race - conditions. - """ - - if client_ is self.client: - self.logger.debug("Detaching client %r", client_) - self.client = None - - def return_result(self, client, result, detach = False): # pylint: disable=W0621 - """ - Client stream has returned a result, which we need to pass along - to the original caller. Result may be either an HTTP response - message or an exception. In either case, once we're done - processing this result, kick off next message in the queue, if any. - """ - - if client is not self.client: - self.logger.warning("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r", result) - return - - if detach: - self.detach(client) - - try: - req = self.queue.pop(0) - self.logger.debug("Dequeuing request %r", req) - except IndexError: - self.logger.warning("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r", result) - return - - assert isinstance(result, http_response) or isinstance(result, Exception) - - if isinstance(result, http_response): - try: - self.logger.debug("Returning result %r to caller", result) - req.callback(result.body) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - result = e - - if isinstance(result, Exception): - try: - self.logger.warning("Returning exception %r to caller: %s", result, result) - req.errback(result) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.logger.exception("Exception in exception callback, may have lost event chain") - - self.logger.debug("Queue: %r", self.queue) - - if self.queue: - self.restart() - -## @var client_queues -# Map of (host, port) tuples to http_queue objects. -client_queues = {} - -def client(msg, url, callback, errback, content_type = default_content_type): - """ - Open client HTTP connection, send a message, set up callbacks to - handle response. - """ - - u = urlparse.urlparse(url) - - if (u.scheme not in ("", "http") or - u.username is not None or - u.password is not None or - u.params != "" or - u.query != "" or - u.fragment != ""): - raise rpki.exceptions.BadClientURL("Unusable URL %s" % url) - - logger.debug("Contacting %s", url) - - request = http_request( - cmd = "POST", - path = u.path, - body = msg, - callback = callback, - errback = errback, - Host = u.hostname, - Content_Type = content_type) - - hostport = (u.hostname or "localhost", u.port or default_tcp_port) - - logger.debug("Created request %r for %s", request, addr_to_string(hostport)) - if hostport not in client_queues: - client_queues[hostport] = http_queue(hostport) - client_queues[hostport].request(request) - - # Defer connection attempt until after we've had time to process any - # pending I/O events, in case connections have closed. - - logger.debug("Scheduling connection startup for %r", request) - rpki.async.event_defer(client_queues[hostport].restart) - -def server(handlers, port, host = ""): - """ - Run an HTTP server and wait (forever) for connections. - """ - - if not isinstance(handlers, (tuple, list)): - handlers = (("/", handlers),) - - # Yes, this is sick. So is getaddrinfo() returning duplicate - # records, which RedHat has the gall to claim is a feature. - ai = [] - for af in supported_address_families(enable_ipv6_servers): - try: - if host: - h = host - elif have_ipv6 and af == socket.AF_INET6: - h = "::" - else: - h = "0.0.0.0" - for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM): - if a not in ai: - ai.append(a) - except socket.gaierror: - pass - - for a in ai: - http_listener(addrinfo = a, handlers = handlers) - - rpki.async.event_loop() diff --git a/rpki/left_right.py b/rpki/left_right.py index 090de561..387e908f 100644 --- a/rpki/left_right.py +++ b/rpki/left_right.py @@ -25,19 +25,16 @@ import logging import rpki.x509 import rpki.exceptions -import rpki.http import rpki.up_down import rpki.relaxng import rpki.sundial import rpki.log import rpki.publication -import rpki.async import rpki.rpkid_tasks logger = logging.getLogger(__name__) - xmlns = rpki.relaxng.left_right.xmlns nsmap = rpki.relaxng.left_right.nsmap version = rpki.relaxng.left_right.version @@ -62,6 +59,16 @@ tag_tenant = xmlns + "tenant" tag_signing_cert = xmlns + "signing_cert" tag_signing_cert_crl = xmlns + "signing_cert_crl" +## @var content_type +# Content type to use when sending left-right queries +content_type = "application/x-rpki" + +## @var allowed_content_types +# Content types we consider acceptable for incoming left-right +# queries. + +allowed_content_types = (content_type,) + class cms_msg(rpki.x509.XML_CMS_object): """ diff --git a/rpki/publication.py b/rpki/publication.py index 7939b9de..16824d05 100644 --- a/rpki/publication.py +++ b/rpki/publication.py @@ -39,6 +39,17 @@ tag_withdraw = rpki.relaxng.publication.xmlns + "withdraw" tag_report_error = rpki.relaxng.publication.xmlns + "report_error" +## @var content_type +# Content type to use when sending left-right queries +content_type = "application/x-rpki" + +## @var allowed_content_types +# Content types we consider acceptable for incoming left-right +# queries. + +allowed_content_types = (content_type,) + + def raise_if_error(pdu): """ Raise an appropriate error if this is a <report_error/> PDU. diff --git a/rpki/rpkic.py b/rpki/rpkic.py index f5e77396..4b9ffedb 100644 --- a/rpki/rpkic.py +++ b/rpki/rpkic.py @@ -43,7 +43,6 @@ import rpki.relaxng import rpki.exceptions import rpki.left_right import rpki.x509 -import rpki.async import rpki.version from lxml.etree import SubElement diff --git a/rpki/rpkid.py b/rpki/rpkid.py index c6b1001e..896fe0be 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -28,20 +28,27 @@ import random import logging import argparse +import tornado.gen +import tornado.web +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + +from lxml.etree import Element, SubElement, tostring as ElementToString + import rpki.resource_set import rpki.up_down import rpki.left_right import rpki.x509 -import rpki.http import rpki.config import rpki.exceptions import rpki.relaxng import rpki.log -import rpki.async import rpki.daemonize + import rpki.rpkid_tasks -from lxml.etree import Element, SubElement, tostring as ElementToString logger = logging.getLogger(__name__) @@ -105,12 +112,10 @@ class main(object): logger.info("Running in profile mode with output to %s", self.profile) logger.debug("Initializing Django") - import django django.setup() logger.debug("Initializing rpkidb...") - global rpki # pylint: disable=W0602 import rpki.rpkidb # pylint: disable=W0621 @@ -127,41 +132,46 @@ class main(object): self.http_server_host = self.cfg.get("server-host", "") self.http_server_port = self.cfg.getint("server-port") - self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/") - self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), self.cfg.getint("initial-delay-max", 120)) # Should be much longer in production - self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120)) - self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0)) - if not self.cron_keepalive: - self.cron_keepalive = self.cron_period * 4 - self.cron_timeout = None + self.cron_period = self.cfg.getint("cron-period", 120) - self.start_cron() + if self.use_internal_cron: + logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) + tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) - rpki.http.server( - host = self.http_server_host, - port = self.http_server_port, - handlers = (("/left-right", self.left_right_handler), - ("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types), - ("/cronjob", self.cronjob_handler))) + rpkid = self - def start_cron(self): - """ - Start clock for rpkid's internal cron process. - """ + class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.left_right_handler(self) + + class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self, tenant_handle, child_handle): + yield rpkid.up_down_handler(self, tenant_handle, child_handle) + + class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.cronjob_handler(self) + + application = tornado.web.Application(( + (r"/left-right", LeftRightHandler), + (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler), + (r"/cronjob", CronjobHandler))) + + application.listen( + address = self.http_server_host, + port = self.http_server_port) + + tornado.ioloop.IOLoop.current().start() - if self.use_internal_cron: - self.cron_timer = rpki.async.timer(handler = self.cron) - when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay) - logger.debug("Scheduling initial cron pass at %s", when) - self.cron_timer.set(when) - else: - logger.debug("Not using internal clock, start_cron() call ignored") @staticmethod def _compose_left_right_query(): @@ -172,70 +182,86 @@ class main(object): return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, type = "query", version = rpki.left_right.version) - def irdb_query(self, q_msg, callback, errback): + + @tornado.gen.coroutine + def irdb_query(self, q_msg): """ Perform an IRDB callback query. """ - try: - q_tags = set(q_pdu.tag for q_pdu in q_msg) + q_tags = set(q_pdu.tag for q_pdu in q_msg) - q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) + q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) - def unwrap(r_der): - try: - r_cms = rpki.left_right.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) - self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - #rpki.left_right.check_response(r_msg) - if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): - raise rpki.exceptions.BadIRDBReply( - "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - callback(r_msg) - except Exception, e: - errback(e) + http_client = tornado.httpclient.AsyncHTTPClient() - rpki.http.client( - url = self.irdb_url, - msg = q_der, - callback = unwrap, - errback = errback) + http_request = tornado.httpclient.HTTPRequest( + url = self.irdb_url, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.left_right.content_type }) - except Exception, e: - errback(e) + http_response = yield http_client.fetch(http_request) + + # Tornado already checked http_response.code for us + + content_type = http_response.headers.get("Content-Type") + + if content_type not in rpki.left_right.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type)) + + r_der = http_response.body + + r_cms = rpki.left_right.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) + self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - def irdb_query_child_resources(self, tenant_handle, child_handle, callback, errback): + #rpki.left_right.check_response(r_msg) + + if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): + raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) + + raise tornado.gen.Return(r_msg) + + + @tornado.gen.coroutine + def irdb_query_child_resources(self, tenant_handle, child_handle): """ Ask IRDB about a child's resources. """ q_msg = self._compose_left_right_query() - SubElement(q_msg, rpki.left_right.tag_list_resources, - tenant_handle = tenant_handle, child_handle = child_handle) + SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle) + + r_msg = yield self.irdb_query(q_msg) - def done(r_msg): - if len(r_msg) != 1: - raise rpki.exceptions.BadIRDBReply( - "Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) - callback(rpki.resource_set.resource_bag( - asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), - v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), - v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), - valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))) + if len(r_msg) != 1: + raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) - self.irdb_query(q_msg, done, errback) + bag = rpki.resource_set.resource_bag( + asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), + v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), + v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), + valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))) - def irdb_query_roa_requests(self, tenant_handle, callback, errback): + raise tornado.gen.Return(bag) + + + @tornado.gen.coroutine + def irdb_query_roa_requests(self, tenant_handle): """ Ask IRDB about self's ROA requests. """ q_msg = self._compose_left_right_query() SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) + - def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles, callback, errback): + @tornado.gen.coroutine + def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): """ Ask IRDB about self's ghostbuster record requests. """ @@ -244,16 +270,20 @@ class main(object): for parent_handle in parent_handles: SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, tenant_handle = tenant_handle, parent_handle = parent_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - def irdb_query_ee_certificate_requests(self, tenant_handle, callback, errback): + @tornado.gen.coroutine + def irdb_query_ee_certificate_requests(self, tenant_handle): """ Ask IRDB about self's EE certificate requests. """ q_msg = self._compose_left_right_query() SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) + @property def left_right_models(self): @@ -273,6 +303,7 @@ class main(object): rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } return self._left_right_models + @property def left_right_trivial_handlers(self): """ @@ -287,6 +318,7 @@ class main(object): rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } return self._left_right_trivial_handlers + def handle_list_published_objects(self, q_pdu, r_msg): """ <list_published_objects/> server. @@ -317,6 +349,7 @@ class main(object): SubElement(r_msg, rpki.left_right.tag_list_published_objects, uri = c.uri, **kw).text = c.cert.get_Base64() + def handle_list_received_resources(self, q_pdu, r_msg): """ <list_received_resources/> server. @@ -344,28 +377,28 @@ class main(object): r_pdu.set("tag", msg_tag) - def left_right_handler(self, query, path, cb): + @tornado.gen.coroutine + def left_right_handler(self, handler): """ - Process one left-right PDU. + Process one left-right message. """ - # This handles five persistent classes (self, bsc, parent, child, - # repository) and two simple queries (list_published_objects and - # list_received_resources). The former probably need to dispatch - # via methods to the corresponding model classes; the latter - # probably just become calls to ordinary methods of this - # (rpki.rpkid.main) class. - # - # Need to clone logic from rpki.pubd.main.control_handler(). - logger.debug("Entering left_right_handler()") + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.left_right.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + raise tornado.gen.Return + + handler.set_header("Content-Type", rpki.left_right.content_type) + try: - q_cms = rpki.left_right.cms_msg(DER = query) + q_cms = rpki.left_right.cms_msg(DER = handler.request.body) q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, type = "reply", version = rpki.left_right.version) - self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path) + self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path) assert q_msg.tag.startswith(rpki.left_right.xmlns) assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) @@ -376,14 +409,37 @@ class main(object): if q_msg.get("type") != "query": raise rpki.exceptions.BadQuery("Message type is not query") - def done(): - cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + for q_pdu in q_msg: - def loop(iterator, q_pdu): + try: + action = q_pdu.get("action") + model = self.left_right_models.get(q_pdu.tag) + + if q_pdu.tag in self.left_right_trivial_handlers: + self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) + + elif action in ("get", "list"): + for obj in model.objects.xml_list(q_pdu): + obj.xml_template.encode(obj, q_pdu, r_msg) + + elif action == "destroy": + obj = model.objects.xml_get_for_delete(q_pdu) + yield obj.xml_pre_delete_hook(self) + obj.delete() + obj.xml_template.acknowledge(obj, q_pdu, r_msg) - logger.debug("left_right_handler():loop(%r)", q_pdu) + elif action in ("create", "set"): + obj = model.objects.xml_get_or_create(q_pdu) + obj.xml_template.decode(obj, q_pdu) + obj.xml_pre_save_hook(q_pdu) + obj.save() + yield obj.xml_post_save_hook(self, q_pdu) + obj.xml_template.acknowledge(obj, q_pdu, r_msg) - def fail(e): + else: + raise rpki.exceptions.BadQuery("Unrecognized action %r" % action) + + except Exception, e: if not isinstance(e, rpki.exceptions.NotFound): logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) error_tenant_handle = q_pdu.get("tenant_handle") @@ -394,102 +450,50 @@ class main(object): r_pdu.set("tag", error_tag) if error_tenant_handle is not None: r_pdu.set("tenant_handle", error_tenant_handle) - cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + break - try: - if q_pdu.tag in self.left_right_trivial_handlers: - logger.debug("left_right_handler(): trivial handler") - self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) - iterator() - - else: - action = q_pdu.get("action") - model = self.left_right_models[q_pdu.tag] - - logger.debug("left_right_handler(): action %s model %r", action, model) - - if action in ("get", "list"): - logger.debug("left_right_handler(): get/list") - for obj in model.objects.xml_list(q_pdu): - logger.debug("left_right_handler(): get/list: encoding %r", obj) - obj.xml_template.encode(obj, q_pdu, r_msg) - iterator() - - elif action == "destroy": - def destroy_cb(): - obj.delete() - obj.xml_template.acknowledge(obj, q_pdu, r_msg) - iterator() - logger.debug("left_right_handler(): destroy") - obj = model.objects.xml_get_for_delete(q_pdu) - obj.xml_pre_delete_hook(self, destroy_cb, fail) - - elif action in ("create", "set"): - def create_set_cb(): - obj.xml_template.acknowledge(obj, q_pdu, r_msg) - iterator() - logger.debug("left_right_handler(): create/set") - obj = model.objects.xml_get_or_create(q_pdu) - obj.xml_template.decode(obj, q_pdu) - obj.xml_pre_save_hook(q_pdu) - obj.save() - obj.xml_post_save_hook(self, q_pdu, create_set_cb, fail) - - else: - raise rpki.exceptions.BadQuery - - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - fail(e) - - rpki.async.iterator(q_msg, loop, done) - - except (rpki.async.ExitNow, SystemExit): - raise + handler.set_status(200) + handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + logger.debug("Normal exit from left_right_handler()") except Exception, e: logger.exception("Unhandled exception serving left-right request") - cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.finish() - up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I) - def up_down_handler(self, q_der, path, cb): + @tornado.gen.coroutine + def up_down_handler(self, handler, tenant_handle, child_handle): """ Process one up-down PDU. """ - def done(r_der): - cb(200, body = r_der) + logger.debug("Entering up_down_handler()") + + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.up_down.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + raise tornado.gen.Return try: - match = self.up_down_url_regexp.search(path) - if match is None: - raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path) - tenant_handle, child_handle = match.groups() - try: - child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) - except rpki.rpkidb.models.Child.DoesNotExist: - raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % ( - child_handle, tenant_handle)) - child.serve_up_down(self, q_der, done) - except (rpki.async.ExitNow, SystemExit): - raise - except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e: - logger.warning(str(e)) - cb(400, reason = str(e)) + child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) + q_der = handler.request.body + r_der = yield child.serve_up_down(self, q_der) + handler.set_header("Content-Type", rpki.up_down.content_type) + handler.set_status(200) + handler.finish(r_der) + + except rpki.rpkidb.models.Child.DoesNotExist: + logger.info("Child %r of tenant %r not found", child_handle, tenant_handle) + handler.set_status(400, "Child %r not found" % child_handle) + handler.finish() + except Exception, e: logger.exception("Unhandled exception processing up-down request") - cb(400, reason = "Could not process PDU: %s" % e) - - def checkpoint(self, force = False): - """ - Record that we were still alive when we got here, by resetting - keepalive timer. - """ + handler.set_status(400, "Could not process PDU: %s" % e) + handler.finish() - if force or self.cron_timeout is not None: - self.cron_timeout = rpki.sundial.now() + self.cron_keepalive def task_add(self, task): """ @@ -504,11 +508,10 @@ class main(object): logger.debug("Task %r was already in the task queue", task) return False + def task_next(self): """ - Pull next task from the task queue and put it the deferred event - queue (we don't want to run it directly, as that could eventually - blow out our call stack). + Schedule next task in the queue to be run. """ try: @@ -516,77 +519,67 @@ class main(object): except IndexError: self.task_current = None else: - rpki.async.event_defer(self.task_current) + tornado.ioloop.IOLoop.current().add_callback(self.task_current) + def task_run(self): """ - Run first task on the task queue, unless one is running already. + Schedule first queued task unless a task is running already. """ if self.task_current is None: self.task_next() - def cron(self, cb = None): + + @tornado.gen.coroutine + def cron_loop(self): """ - Periodic tasks. + Asynchronous infinite loop to drive cron cycle. """ - now = rpki.sundial.now() + assert self.use_internal_cron + yield tornado.gen.sleep(self.initial_delay) + while True: + yield self.cron_run() + yield tornado.gen.sleep(self.cron_period) - logger.debug("Starting cron run") - def done(): - self.cron_timeout = None - logger.info("Finished cron run started at %s", now) - if cb is not None: - cb() + @tornado.gen.coroutine + def cron_run(self): + """ + Periodic tasks. + """ - completion = rpki.rpkid_tasks.CompletionHandler(done) + now = rpki.sundial.now() + logger.debug("Starting cron run") + futures = [] try: - selves = rpki.rpkidb.models.Tenant.objects.all() - except Exception: - logger.exception("Error pulling selves from SQL, maybe SQL server is down?") + tenants = rpki.rpkidb.models.Tenant.objects.all() + except: + logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") else: - for s in selves: - s.schedule_cron_tasks(self, completion) - nothing_queued = completion.count == 0 - - assert self.use_internal_cron or self.cron_timeout is None - - if self.cron_timeout is not None and self.cron_timeout < now: - logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout) - self.cron_timeout = None - - if self.use_internal_cron: - when = now + self.cron_period - logger.debug("Scheduling next cron run at %s", when) - self.cron_timer.set(when) - - if self.cron_timeout is None: - self.checkpoint(self.use_internal_cron) - self.task_run() - - elif self.use_internal_cron: - logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout) + for tenant in tenants: + futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self)) + if futures: + yield futures + logger.info("Finished cron run started at %s", now) - if nothing_queued: - done() - def cronjob_handler(self, query, path, cb): + @tornado.gen.coroutine + def cronjob_handler(self, handler): """ External trigger for periodic tasks. This is somewhat obsolete now that we have internal timers, but the test framework still uses it. """ - def done(): - cb(200, body = "OK") - if self.use_internal_cron: - cb(500, reason = "Running cron internally") + handler.set_status(500, "Running cron internally") else: logger.debug("Starting externally triggered cron") - self.cron(done) + yield self.cron() + handler.set_status(200) + handler.finish() class publication_queue(object): @@ -661,19 +654,16 @@ class publication_queue(object): if self.replace: self.uris[uri] = pdu - def call_pubd(self, cb, eb): - def loop(iterator, rid): + @tornado.gen.coroutine + def call_pubd(self): + for rid in self.repositories: logger.debug("Calling pubd[%r]", self.repositories[rid]) - self.repositories[rid].call_pubd(self.rpkid, iterator, eb, self.msgs[rid], self.handlers) - def done(): - self.clear() - cb() - rpki.async.iterator(self.repositories, loop, done) + yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + self.clear() @property def size(self): return sum(len(self.msgs[rid]) for rid in self.repositories) def empty(self): - assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size) return not self.msgs diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py index 91fa787d..0a9c1654 100644 --- a/rpki/rpkid_tasks.py +++ b/rpki/rpkid_tasks.py @@ -22,9 +22,17 @@ because interactions with rpkid scheduler were getting too complicated. """ import logging + +import tornado.gen +import tornado.web +import tornado.locks +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + import rpki.log import rpki.rpkid -import rpki.async import rpki.up_down import rpki.sundial import rpki.publication @@ -44,45 +52,6 @@ def queue_task(cls): return cls -class CompletionHandler(object): - """ - Track one or more scheduled rpkid tasks and execute a callback when - the last of them terminates. - """ - - ## @var debug - # Debug logging. - - debug = False - - def __init__(self, cb): - self.cb = cb - self.tasks = set() - - def register(self, task): - if self.debug: - logger.debug("Completion handler %r registering task %r", self, task) - self.tasks.add(task) - task.register_completion(self.done) - - def done(self, task): - try: - self.tasks.remove(task) - except KeyError: - logger.warning("Completion handler %r called with unregistered task %r, blundering onwards", self, task) - else: - if self.debug: - logger.debug("Completion handler %r called with registered task %r", self, task) - if not self.tasks: - if self.debug: - logger.debug("Completion handler %r finished, calling %r", self, self.cb) - self.cb() - - @property - def count(self): - return len(self.tasks) - - class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. This just @@ -95,53 +64,61 @@ class AbstractTask(object): timeslice = rpki.sundial.timedelta(seconds = 15) - def __init__(self, rpkid, s, description = None): - self.rpkid = rpkid - self.tenant = s + def __init__(self, rpkid, tenant, description = None): + self.rpkid = rpkid + self.tenant = tenant self.description = description - self.completions = [] - self.continuation = None - self.due_date = None + self.resumed = tornado.locks.Condition() + self.completed = tornado.locks.Condition() + self.due_date = None + self.started = False self.clear() def __repr__(self): return rpki.log.log_repr(self, self.description) - def register_completion(self, completion): - self.completions.append(completion) - def exit(self): - while self.completions: - self.completions.pop(0)(self) - self.clear() + logger.debug("%r: Exiting", self) self.due_date = None + self.started = False + self.clear() + self.completed.notify_all() self.rpkid.task_next() - def postpone(self, continuation): - self.continuation = continuation + @tornado.gen.coroutine + def postpone(self): + logger.debug("%r: Postponed", self) self.due_date = None self.rpkid.task_add(self) self.rpkid.task_next() + yield self.resumed.wait() + @tornado.gen.coroutine def __call__(self): - self.due_date = rpki.sundial.now() + self.timeslice - if self.continuation is None: - logger.debug("Running task %r", self) - self.clear() - self.start() - else: - logger.debug("Restarting task %r at %r", self, self.continuation) - continuation = self.continuation - self.continuation = None - continuation() + try: + self.due_date = rpki.sundial.now() + self.timeslice + if self.started: + logger.debug("%r: Resuming", self) + self.resumed.notify() + else: + logger.debug("%r: Starting", self) + self.clear() + self.started = True + yield self.start() + except: + logger.exception("%r: Unhandled exception", self) + self.exit() + # + # Unclear whether we should re-raise the exception here or not, + # but re-raising it is probably safer until we know for sure. + # + raise @property def overdue(self): return rpki.sundial.now() > self.due_date - def __getattr__(self, name): - return getattr(self.tenant, name) - + @tornado.gen.coroutine def start(self): raise NotImplementedError @@ -152,340 +129,207 @@ class AbstractTask(object): @queue_task class PollParentTask(AbstractTask): """ - Run the regular client poll cycle with each of this self's + Run the regular client poll cycle with each of this tenant's parents, in turn. """ - def clear(self): - logger.debug("PollParentTask.clear()") - self.parent_iterator = None - self.parent = None - self.ca_map = None - self.class_iterator = None - self.started = False - + @tornado.gen.coroutine def start(self): - logger.debug("PollParentTask.start()") - self.rpkid.checkpoint() - logger.debug("Self %s[%r] polling parents", self.tenant_handle, self) - assert not self.started - self.started = True - rpki.async.iterator(self.parents.all(), self.parent_loop, self.exit) - - def parent_loop(self, parent_iterator, parent): - logger.debug("PollParentTask.parent_loop()") - self.parent_iterator = parent_iterator - self.parent = parent - parent.up_down_list_query(rpkid = self.rpkid, cb = self.got_list, eb = self.list_failed) - - def got_list(self, r_msg): - logger.debug("PollParentTask.got_list()") - self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas.all()) - self.rpkid.checkpoint() - rpki.async.iterator(r_msg.getiterator(rpki.up_down.tag_class), self.class_loop, self.class_done) - - def list_failed(self, e): - logger.debug("PollParentTask.list_failed()") - logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent) - self.parent_iterator() - - def class_loop(self, class_iterator, rc): - logger.debug("PollParentTask.class_loop()") - self.rpkid.checkpoint() - self.class_iterator = class_iterator - try: - ca = self.ca_map.pop(rc.get("class_name")) - except KeyError: - rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = self.parent, rc = rc, - cb = class_iterator, eb = self.class_create_failed) - else: - ca.check_for_updates(rpkid = self.rpkid, parent = self.parent, rc = rc, cb = class_iterator, eb = self.class_update_failed) - - def class_update_failed(self, e): - logger.debug("PollParentTask.class_update_failed()") - logger.exception("Couldn't update class, skipping") - self.class_iterator() - - def class_create_failed(self, e): - logger.debug("PollParentTask.class_create_failed()") - logger.exception("Couldn't create class, skipping") - self.class_iterator() - - def class_done(self): - logger.debug("PollParentTask.class_done()") - rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done) - - def ca_loop(self, iterator, ca): - logger.debug("PollParentTask.ca_loop()") - self.rpkid.checkpoint() - ca.destroy(self.parent, iterator) - - def ca_done(self): - logger.debug("PollParentTask.ca_done()") - self.rpkid.checkpoint() - self.parent_iterator() + logger.debug("%r: Polling parents", self) + + for parent in self.tenant.parents.all(): + try: + logger.debug("%r: Executing list query", self) + r_msg = yield parent.up_down_list_query(rpkid = self.rpkid) + except: + logger.exception("%r: Couldn't get resource class list from parent %r, skipping", self, parent) + continue + + logger.debug("%r: Parsing list response", self) + + ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas.all()) + + for rc in r_msg.getiterator(rpki.up_down.tag_class): + try: + class_name = rc.get("class_name") + ca = ca_map.pop(class_name, None) + if ca is None: + logger.debug("%r: Creating new CA for resource class %r", self, class_name) + rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = parent, rc = rc) + else: + logger.debug("%r: Checking updates for existing CA %r for resource class %r", self, ca, class_name) + yield ca.check_for_updates(rpkid = self.rpkid, parent = parent, rc = rc) + except: + logger.exception("Couldn't update resource class %r, skipping", class_name) + + for ca, class_name in ca_map.iteritems(): + logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name) + yield ca.destroy(parent) + + self.exit() @queue_task class UpdateChildrenTask(AbstractTask): """ - Check for updated IRDB data for all of this self's children and + Check for updated IRDB data for all of this tenant's children and issue new certs as necessary. Must handle changes both in resources and in expiration date. """ - def clear(self): - self.now = None - self.rsn = None - self.publisher = None - self.iterator = None - self.child = None - self.child_certs = None - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating children", self.tenant_handle, self) - assert not self.started - self.started = True - self.now = rpki.sundial.now() - self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) - self.publisher = rpki.rpkid.publication_queue(self.rpkid) - rpki.async.iterator(self.children.all(), self.loop, self.done) - - def loop(self, iterator, child): - self.rpkid.checkpoint() - self.iterator = iterator - self.child = child - self.child_certs = child.child_certs - if self.overdue: - self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed) - else: - self.do_child() - - def do_child(self): - if self.child_certs: - self.rpkid.irdb_query_child_resources(self.child.tenant.tenant_handle, self.child.child_handle, - self.got_resources, self.lose) - else: - self.iterator() - - def lose(self, e): - logger.exception("Couldn't update child %r, skipping", self.child) - self.iterator() - - def got_resources(self, irdb_resources): + logger.debug("%r: Updating children", self) + now = rpki.sundial.now() + rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin) + publisher = rpki.rpkid.publication_queue(self.rpkid) + + for child in self.tenant.children.all(): + try: + if self.overdue: + yield publisher.call_pubd() + yield self.postpone() + + child_certs = list(child.child_certs.filter(ca_detail__state = "active")) + + if child_certs: + irdb_resources = yield self.rpkid.irdb_query_child_resources(child.tenant.tenant_handle, child.child_handle) + + for child_cert in child_certs: + ca_detail = child_cert.ca_detail + old_resources = child_cert.cert.get_3779resources() + new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() + old_aia = child_cert.cert.get_AIA()[0] + new_aia = ca_detail.ca_cert_uri + + if new_resources.empty(): + logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI()) + child_cert.revoke(publisher = publisher) + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + + elif (old_resources != new_resources or old_aia != new_aia or (old_resources.valid_until < rsn and irdb_resources.valid_until > now and old_resources.valid_until != irdb_resources.valid_until)): + logger.debug("Need to reissue child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI()) + if old_resources != new_resources: + logger.debug("Child %s SKI %s resources changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources, new_resources) + if old_resources.valid_until != irdb_resources.valid_until: + logger.debug("Child %s SKI %s validity changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until) + + new_resources.valid_until = irdb_resources.valid_until + child_cert.reissue(ca_detail = ca_detail, resources = new_resources, publisher = publisher) + + elif old_resources.valid_until < now: + logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until) + child_cert.delete() + publisher.queue(uri = child_cert.uri, old_obj = child_cert.cert, repository = ca_detail.ca.parent.repository) + ca_detail.generate_manifest(publisher = publisher) + + except: + logger.exception("%r: Couldn't update child %r, skipping", self, child) + try: - for child_cert in self.child_certs.filter(ca_detail__state = "active"): - ca_detail = child_cert.ca_detail - old_resources = child_cert.cert.get_3779resources() - new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources() - old_aia = child_cert.cert.get_AIA()[0] - new_aia = ca_detail.ca_cert_uri - - if new_resources.empty(): - logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - child_cert.revoke(publisher = self.publisher) - ca_detail.generate_crl(publisher = self.publisher) - ca_detail.generate_manifest(publisher = self.publisher) - - elif (old_resources != new_resources or - old_aia != new_aia or - (old_resources.valid_until < self.rsn and - irdb_resources.valid_until > self.now and - old_resources.valid_until != irdb_resources.valid_until)): - - logger.debug("Need to reissue child %s certificate SKI %s", - self.child.child_handle, child_cert.cert.gSKI()) - if old_resources != new_resources: - logger.debug("Child %s SKI %s resources changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources, new_resources) - if old_resources.valid_until != irdb_resources.valid_until: - logger.debug("Child %s SKI %s validity changed: old %s new %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - - new_resources.valid_until = irdb_resources.valid_until - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources, - publisher = self.publisher) - - elif old_resources.valid_until < self.now: - logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", - self.child.child_handle, child_cert.cert.gSKI(), - old_resources.valid_until, irdb_resources.valid_until) - child_cert.delete() - self.publisher.queue( - uri = child_cert.uri, - old_obj = child_cert.cert, - repository = ca_detail.ca.parent.repository) - ca_detail.generate_manifest(publisher = self.publisher) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - self.rpkid.checkpoint() - self.lose(e) - else: - self.rpkid.checkpoint() - self.iterator() - - def done(self): - self.rpkid.checkpoint() - self.publisher.call_pubd(self.exit, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() + yield publisher.call_pubd() + except: + logger.exception("%r: Couldn't publish, skipping", self) + self.exit() @queue_task class UpdateROAsTask(AbstractTask): """ - Generate or update ROAs for this self. + Generate or update ROAs for this tenant. """ def clear(self): - self.orphans = None - self.updates = None - self.publisher = None + self.publisher = None self.ca_details = None - self.count = None - self.started = False + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating ROAs", self.tenant_handle, self) - assert not self.started - self.started = True - logger.debug("Issuing query for ROA requests") - self.rpkid.irdb_query_roa_requests(self.tenant_handle, self.got_roa_requests, self.roa_requests_failed) + logger.debug("%r: Updating ROAs", self) + + try: + r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle) + except: + logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle) + raise tornado.gen.Return - def got_roa_requests(self, r_msg): - self.rpkid.checkpoint() - logger.debug("Received response to query for ROA requests") + logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg) roas = {} seen = set() - self.orphans = [] - self.updates = [] + orphans = [] + updates = [] self.publisher = rpki.rpkid.publication_queue(self.rpkid) self.ca_details = set() - logger.debug("UpdateROAsTask.got_roa_requests(): setup done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX - for roa in self.tenant.roas.all(): - logger.debug("UpdateROAsTask.got_roa_requests(): roa loop, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: roas[k] = roa - elif (roa.roa is not None and - roa.cert is not None and - roa.ca_detail is not None and - roa.ca_detail.state == "active" and - (roas[k].roa is None or - roas[k].cert is None or - roas[k].ca_detail is None or - roas[k].ca_detail.state != "active")): - self.orphans.append(roas[k]) + elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): + orphans.append(roas[k]) roas[k] = roa else: - self.orphans.append(roa) - - logger.debug("UpdateROAsTask.got_roa_requests(): roa loop done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX + orphans.append(roa) for r_pdu in r_msg: - logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6")) if k in seen: - logger.warning("Skipping duplicate ROA request %r", r_pdu) + logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu) else: seen.add(k) roa = roas.pop(k, None) if roa is None: - roa = rpki.rpkidb.models.ROA(asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) - roa.tenant = self.tenant - logger.debug("Created new %r", roa) + roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6")) + logger.debug("%r: Created new %r", self, roa) else: - logger.debug("Found existing %r", roa) - self.updates.append(roa) + logger.debug("%r: Found existing %r", self, roa) + updates.append(roa) - logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop done, self.orphans %r", self.orphans) - assert isinstance(self.orphans, list) # XXX + orphans.extend(roas.itervalues()) - self.orphans.extend(roas.itervalues()) + while updates: + if self.overdue: + yield self.publish() + yield self.postpone() + roa = updates.pop(0) + try: + roa.update(publisher = self.publisher, fast = True) + self.ca_details.add(roa.ca_detail) + except rpki.exceptions.NoCoveringCertForROA: + logger.warning("%r: No covering certificate for %r, skipping", self, roa) + except: + logger.exception("%r: Could not update %r, skipping", self, roa) - if self.overdue: - self.postpone(self.begin_loop) - else: - self.begin_loop() + for roa in orphans: + try: + self.ca_details.add(roa.ca_detail) + roa.revoke(publisher = self.publisher, fast = True) + except: + logger.exception("%r: Could not revoke %r", self, roa) - def begin_loop(self): - self.count = 0 - rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True) + yield self.publish() - def loop(self, iterator, roa): - self.rpkid.checkpoint() - try: - roa.update(publisher = self.publisher, fast = True) - self.ca_details.add(roa.ca_detail) - except (SystemExit, rpki.async.ExitNow): - raise - except rpki.exceptions.NoCoveringCertForROA: - logger.warning("No covering certificate for %r, skipping", roa) - except Exception: - logger.exception("Could not update %r, skipping", roa) - self.count += 1 - if self.overdue: - self.publish(lambda: self.postpone(iterator)) - else: - iterator() - - def publish(self, done): + self.exit() + + @tornado.gen.coroutine + def publish(self): if not self.publisher.empty(): for ca_detail in self.ca_details: - logger.debug("Generating new CRL for %r", ca_detail) + logger.debug("%r: Generating new CRL for %r", self, ca_detail) ca_detail.generate_crl(publisher = self.publisher) - logger.debug("Generating new manifest for %r", ca_detail) + logger.debug("%r: Generating new manifest for %r", self, ca_detail) ca_detail.generate_manifest(publisher = self.publisher) + yield self.publisher.call_pubd() self.ca_details.clear() - self.rpkid.checkpoint() - self.publisher.call_pubd(done, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() - - def done(self): - for roa in self.orphans: - try: - self.ca_details.add(roa.ca_detail) - roa.revoke(publisher = self.publisher, fast = True) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not revoke %r", roa) - self.rpkid.checkpoint() - self.publish(self.exit) - - def roa_requests_failed(self, e): - logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant_handle) - self.exit() @queue_task class UpdateGhostbustersTask(AbstractTask): """ - Generate or update Ghostbuster records for this self. + Generate or update Ghostbuster records for this tenant. This was originally based on the ROA update code. It's possible that both could benefit from refactoring, but at this point the @@ -494,23 +338,13 @@ class UpdateGhostbustersTask(AbstractTask): exceptionally silly. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating Ghostbuster records", self.tenant_handle, self) - assert not self.started - self.started = True + logger.debug("%r: Updating Ghostbuster records", self) parent_handles = set(p.parent_handle for p in self.tenant.parents.all()) - self.rpkid.irdb_query_ghostbuster_requests(self.tenant_handle, parent_handles, - self.got_ghostbuster_requests, - self.ghostbuster_requests_failed) - - def got_ghostbuster_requests(self, r_msg): try: - self.rpkid.checkpoint() + r_msg = yield self.rpkid.irdb_query_ghostbuster_requests(self.tenant.tenant_handle, parent_handles) ghostbusters = {} orphans = [] @@ -529,22 +363,20 @@ class UpdateGhostbustersTask(AbstractTask): try: self.tenant.parents.get(parent_handle = r_pdu.get("parent_handle")) except rpki.rpkidb.models.Parent.DoesNotExist: - logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", r_pdu.get("parent_handle")) + logger.warning("%r: Unknown parent_handle %r in Ghostbuster request, skipping", self, r_pdu.get("parent_handle")) continue k = (r_pdu.get("parent_handle"), r_pdu.text) if k in seen: - logger.warning("Skipping duplicate Ghostbuster request %r", r_pdu) + logger.warning("%r: Skipping duplicate Ghostbuster request %r", self, r_pdu) continue seen.add(k) - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), - ca__parent__tenant = self.tenant, state = "active"): + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), ca__parent__tenant = self.tenant, state = "active"): ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None) if ghostbuster is None: - ghostbuster = rpki.rpkidb.models.Ghostbuster(ca_detail = ca_detail, vcard = r_pdu.text) - ghostbuster.tenant = self.tenant - logger.debug("Created new %r for %r", ghostbuster, r_pdu.get("parent_handle")) + ghostbuster = rpki.rpkidb.models.Ghostbuster(tenant = self.tenant, ca_detail = ca_detail, vcard = r_pdu.text) + logger.debug("%r: Created new %r for %r", self, ghostbuster, r_pdu.get("parent_handle")) else: - logger.debug("Found existing %r for %s", ghostbuster, r_pdu.get("parent_handle")) + logger.debug("%r: Found existing %r for %s", self, ghostbuster, r_pdu.get("parent_handle")) ghostbuster.update(publisher = publisher, fast = True) ca_details.add(ca_detail) @@ -557,22 +389,11 @@ class UpdateGhostbustersTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) + yield publisher.call_pubd() - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant_handle) - self.exit() - - def publication_failed(self, e): - logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + except: + logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle) - def ghostbuster_requests_failed(self, e): - logger.exception("Could not fetch Ghostbuster record requests for %s, skipping", self.tenant_handle) self.exit() @@ -585,22 +406,12 @@ class UpdateEECertificatesTask(AbstractTask): so keeping it simple for initial version, we can optimize later. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] updating EE certificates", self.tenant_handle, self) - assert not self.started - self.started = True - self.rpkid.irdb_query_ee_certificate_requests(self.tenant_handle, - self.got_requests, - self.get_requests_failed) - - def got_requests(self, r_msg): + logger.debug("%r: Updating EE certificates", self) try: - self.rpkid.checkpoint() + r_msg = yield self.rpkid.irdb_query_ee_certificate_requests(self.tenant.tenant_handle) publisher = rpki.rpkid.publication_queue(self.rpkid) @@ -621,28 +432,23 @@ class UpdateEECertificatesTask(AbstractTask): v4 = rpki.resource_set.resource_set_ipv4(r_pdu.get("ipv4")), v6 = rpki.resource_set.resource_set_ipv6(r_pdu.get("ipv6")), valid_until = rpki.sundial.datetime.fromXMLtime(r_pdu.get("valid_until"))) - covering = self.find_covering_ca_details(resources) + covering = self.tenant.find_covering_ca_details(resources) ca_details.update(covering) for ee in ees: if ee.ca_detail in covering: - logger.debug("Updating existing EE certificate for %s %s", - gski, resources) - ee.reissue( - resources = resources, - publisher = publisher) + logger.debug("Updating existing EE certificate for %s %s", gski, resources) + ee.reissue(resources = resources, publisher = publisher) covering.remove(ee.ca_detail) else: - logger.debug("Existing EE certificate for %s %s is no longer covered", - gski, resources) + logger.debug("Existing EE certificate for %s %s is no longer covered", gski, resources) ee.revoke(publisher = publisher) subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn")) subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey() for ca_detail in covering: - logger.debug("No existing EE certificate for %s %s", - gski, resources) + logger.debug("No existing EE certificate for %s %s", gski, resources) rpki.rpkidb.models.EECertificate.create( # sic: class method, not Django manager method (for now, anyway) ca_detail = ca_detail, subject_name = subject_name, @@ -661,29 +467,18 @@ class UpdateEECertificatesTask(AbstractTask): ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.exit, self.publication_failed) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Could not update EE certificates for %s, skipping", self.tenant_handle) - self.exit() + yield publisher.call_pubd() - def publication_failed(self, e): - logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + except: + logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle) - def get_requests_failed(self, e): - logger.exception("Could not fetch EE certificate requests for %s, skipping", self.tenant_handle) self.exit() @queue_task class RegenerateCRLsAndManifestsTask(AbstractTask): """ - Generate new CRLs and manifests as necessary for all of this self's + Generate new CRLs and manifests as necessary for all of this tenant's CAs. Extracting nextUpdate from a manifest is hard at the moment due to implementation silliness, so for now we generate a new manifest whenever we generate a new CRL @@ -693,48 +488,33 @@ class RegenerateCRLsAndManifestsTask(AbstractTask): database anyway. """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - self.rpkid.checkpoint() - logger.debug("Self %s[%r] regenerating CRLs and manifests", self.tenant_handle, self) - assert not self.started - self.started = True - now = rpki.sundial.now() - crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval) - regen_margin = max(self.rpkid.cron_period * 2, crl_interval / 4) - publisher = rpki.rpkid.publication_queue(self.rpkid) + logger.debug("%r: Regenerating CRLs and manifests", self) - logger.debug("RegenerateCRLsAndManifestsTask: setup complete") # XXX + try: + now = rpki.sundial.now() + crl_interval = rpki.sundial.timedelta(seconds = self.tenant.crl_interval) + regen_margin = max(rpki.sundial.timedelta(seconds = self.rpkid.cron_period) * 2, crl_interval / 4) + publisher = rpki.rpkid.publication_queue(self.rpkid) - for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant): - logger.debug("RegenerateCRLsAndManifestsTask: checking CA %r", ca) # XXX - try: - for ca_detail in ca.ca_details.filter(state = "revoked"): - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.destroy(ca = ca, publisher = publisher) - for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")): - if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception: - logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca) - - logger.debug("RegenerateCRLsAndManifestsTask: CA loop complete") # XXX - - self.rpkid.checkpoint() - publisher.call_pubd(self.done, self.lose) - - def done(self): - logger.debug("RegenerateCRLsAndManifestsTask: publication complete") # XXX - self.exit() + for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant): + try: + for ca_detail in ca.ca_details.filter(state = "revoked"): + if now > ca_detail.latest_crl.getNextUpdate(): + ca_detail.destroy(ca = ca, publisher = publisher) + for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")): + if now + regen_margin > ca_detail.latest_crl.getNextUpdate(): + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + except: + logger.exception("%r: Couldn't regenerate CRLs and manifests for CA %r, skipping", self, ca) + + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self) - def lose(self, e): - logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.tenant_handle) - self.rpkid.checkpoint() self.exit() @@ -745,24 +525,17 @@ class CheckFailedPublication(AbstractTask): to pubd being down or unreachable). """ - def clear(self): - self.started = False - + @tornado.gen.coroutine def start(self): - assert not self.started - logger.debug("CheckFailedPublication starting") - self.started = True - publisher = rpki.rpkid.publication_queue(self.rpkid) - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): - ca_detail.check_failed_publication(publisher) - self.rpkid.checkpoint() - publisher.call_pubd(self.done, self.publication_failed) - - def publication_failed(self, e): - logger.exception("Couldn't publish for %s, skipping", self.tenant_handle) - self.rpkid.checkpoint() - self.exit() + logger.debug("%r: Checking for failed publication actions", self) + + try: + publisher = rpki.rpkid.publication_queue(self.rpkid) + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): + ca_detail.check_failed_publication(publisher) + yield publisher.call_pubd() + + except: + logger.exception("%r: Couldn't run failed publications, skipping", self) - def done(self): - logger.debug("CheckFailedPublication finished") self.exit() diff --git a/rpki/rpkidb/models.py b/rpki/rpkidb/models.py index 91e6e5c0..62deeb8b 100644 --- a/rpki/rpkidb/models.py +++ b/rpki/rpkidb/models.py @@ -7,6 +7,13 @@ from __future__ import unicode_literals import logging import base64 +import tornado.gen +import tornado.web +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + from django.db import models import rpki.left_right @@ -139,8 +146,8 @@ class XMLManager(models.Manager): # pylint: disable=W0232 Add a few methods which locate or create an object or objects corresponding to the handles in an XML element, as appropriate. - This assumes that models which use it have an "xml" class attribute - holding an XMLTemplate object (above). + This assumes that models which use it have an "xml_template" + class attribute holding an XMLTemplate object (above). """ def xml_get_or_create(self, xml): @@ -191,28 +198,32 @@ def xml_hooks(cls): # Maybe inheritance from an abstract model would work here. Then # again, maybe we could use this decorator to do something prettier - # for the XMLTemplate setup. Whatever. Clean up once basic stuff - # works again after transition from pre-Django SQL. + # for the XMLTemplate setup. Whatever. Gussie up later. - def default_xml_post_save_hook(self, rpkid, q_pdu, cb, eb): - logger.debug("default_xml_post_save_hook()") - cb() - def default_xml_pre_delete_hook(self, rpkid, cb, eb): - logger.debug("default_xml_pre_delete_hook()") - cb() def default_xml_pre_save_hook(self, q_pdu): logger.debug("default_xml_pre_save_hook()") - pass # pylint: disable=W0107 - for name, method in (("xml_post_save_hook", default_xml_post_save_hook), - ("xml_pre_delete_hook", default_xml_pre_delete_hook), - ("xml_pre_save_hook", default_xml_pre_save_hook)): + + @tornado.gen.coroutine + def default_xml_post_save_hook(self, rpkid, q_pdu): + logger.debug("default_xml_post_save_hook()") + + @tornado.gen.coroutine + def default_xml_pre_delete_hook(self, rpkid): + logger.debug("default_xml_pre_delete_hook()") + + for name, method in (("xml_pre_save_hook", default_xml_pre_save_hook), + ("xml_post_save_hook", default_xml_post_save_hook), + ("xml_pre_delete_hook", default_xml_pre_delete_hook)): if not hasattr(cls, name): setattr(cls, name, method) return cls -# Models +# Models. +# +# There's far too much random code hanging off of model methods, relic +# of the earlier implementation. Clean up as time permits. @xml_hooks class Tenant(models.Model): @@ -230,14 +241,17 @@ class Tenant(models.Model): booleans = ("use_hsm",), elements = ("bpki_cert", "bpki_glue")) + @tornado.gen.coroutine + def xml_pre_delete_hook(self, rpkid): + yield [parent.destroy() for parent in self.parents.all()] - def xml_pre_delete_hook(self, rpkid, cb, eb): - def loop(iterator, parent): - parent.destroy(iterator) - rpki.async.iterator(self.parents.all(), loop, cb) - + @tornado.gen.coroutine + def xml_post_save_hook(self, rpkid, q_pdu): + rekey = q_pdu.get("rekey") + revoke = q_pdu.get("revoke") + reissue = q_pdu.get("reissue") + revoke_forgotten = q_pdu.get("revoke_forgotten") - def xml_post_save_hook(self, rpkid, q_pdu, cb, eb): if q_pdu.get("clear_replay_protection"): for parent in self.parents.all(): parent.clear_replay_protection() @@ -245,51 +259,52 @@ class Tenant(models.Model): child.clear_replay_protection() for repository in self.repositories.all(): repository.clear_replay_protection() - actions = [] - rekey = q_pdu.get("rekey") - revoke = q_pdu.get("revoke") - reissue = q_pdu.get("reissue") - revoke_forgotten = q_pdu.get("revoke_forgotten") + + futures = [] + if rekey or revoke or reissue or revoke_forgotten: for parent in self.parents.all(): if rekey: - actions.append(parent.serve_rekey) + futures.append(parent.serve_rekey(rpkid)) if revoke: - actions.append(parent.serve_revoke) + futures.append(parent.serve_revoke(rpkid)) if reissue: - actions.append(parent.serve_reissue) + futures.append(parent.serve_reissue(rpkid)) if revoke_forgotten: - actions.append(parent.serve_revoke_forgotten) + futures.append(parent.serve_revoke_forgotten(rpkid)) + if q_pdu.get("publish_world_now"): - actions.append(self.serve_publish_world_now) + futures.append(self.serve_publish_world_now(rpkid)) if q_pdu.get("run_now"): - actions.append(self.serve_run_now) - def loop(iterator, action): - action(rpkid, iterator, eb) - rpki.async.iterator(actions, loop, cb) + futures.append(self.serve_run_now(rpkid)) + + yield futures - def serve_publish_world_now(self, rpkid, cb, eb): + @tornado.gen.coroutine + def serve_publish_world_now(self, rpkid): publisher = rpki.rpkid.publication_queue(rpkid) repositories = set() objects = dict() - def loop(iterator, parent): + for parent in self.parents.all(): + repository = parent.repository if repository.peer_contact_uri in repositories: - return iterator() + continue repositories.add(repository.peer_contact_uri) q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, type = "query", version = rpki.publication.version) SubElement(q_msg, rpki.publication.tag_list, tag = "list") - def list_handler(r_pdu): - rpki.publication.raise_if_error(r_pdu) - assert r_pdu.tag == rpki.publication.tag_list - assert r_pdu.get("uri") not in objects - objects[r_pdu.get("uri")] = (r_pdu.get("hash"), repository) + r_msg = yield repository.call_pubd(rpkid, q_msg, length_check = False) - repository.call_pubd(rpkid, iterator, eb, q_msg, length_check = False, handlers = dict(list = list_handler)) + for r_pdu in r_msg: + assert r_pdu.tag == rpki.publication.tag_list + if r_pdu.get("uri") in objects: + logger.warning("pubd reported multiple published copies of URI %r, this makes no sense, blundering onwards", r_pdu.get("uri")) + else: + objects[r_pdu.get("uri")] = (r_pdu.get("hash"), repository) def reconcile(uri, obj, repository): h, r = objects.pop(uri, (None, None)) @@ -297,43 +312,49 @@ class Tenant(models.Model): assert r == repository publisher.queue(uri = uri, new_obj = obj, old_hash = h, repository = repository) - def done(): - for ca_detail in CADetail.objects.filter(ca__parent__tenant = self, state = "active"): - repository = ca_detail.ca.parent.repository - reconcile(uri = ca_detail.crl_uri, obj = ca_detail.latest_crl, repository = repository) - reconcile(uri = ca_detail.manifest_uri, obj = ca_detail.latest_manifest, repository = repository) - for c in ca_detail.child_certs.all(): - reconcile(uri = c.uri, obj = c.cert, repository = repository) - for r in ca_detail.roas.filter(roa__isnull = False): - reconcile(uri = r.uri, obj = r.roa, repository = repository) - for g in ca_detail.ghostbusters.all(): - reconcile(uri = g.uri, obj = g.ghostbuster, repository = repository) - for c in ca_detail.ee_certificates.all(): - reconcile(uri = c.uri, obj = c.cert, repository = repository) - for u in objects: - h, r = objects[u] - publisher.queue(uri = u, old_hash = h, repository = r) - publisher.call_pubd(cb, eb) - - rpki.async.iterator(self.parents.all(), loop, done) - - - def serve_run_now(self, rpkid, cb, eb): + for ca_detail in CADetail.objects.filter(ca__parent__tenant = self, state = "active"): + repository = ca_detail.ca.parent.repository + reconcile(uri = ca_detail.crl_uri, obj = ca_detail.latest_crl, repository = repository) + reconcile(uri = ca_detail.manifest_uri, obj = ca_detail.latest_manifest, repository = repository) + for c in ca_detail.child_certs.all(): + reconcile(uri = c.uri, obj = c.cert, repository = repository) + for r in ca_detail.roas.filter(roa__isnull = False): + reconcile(uri = r.uri, obj = r.roa, repository = repository) + for g in ca_detail.ghostbusters.all(): + reconcile(uri = g.uri, obj = g.ghostbuster, repository = repository) + for c in ca_detail.ee_certificates.all(): + reconcile(uri = c.uri, obj = c.cert, repository = repository) + for u in objects: + h, r = objects[u] + publisher.queue(uri = u, old_hash = h, repository = r) + + yield publisher.call_pubd() + + + @tornado.gen.coroutine + def serve_run_now(self, rpkid): logger.debug("Forced immediate run of periodic actions for tenant %s[%r]", self.tenant_handle, self) - completion = rpki.rpkid_tasks.CompletionHandler(cb) - self.schedule_cron_tasks(rpkid, completion) - assert completion.count > 0 + futures = [condition.wait() for condition in self.schedule_cron_tasks(rpkid)] rpkid.task_run() + logger.debug("serve_run_now() futures: %r", futures) + assert futures + try: + yield futures + except: + logger.exception("serve_run_now() failed") + raise + else: + logger.debug("serve_run_now() done") - def schedule_cron_tasks(self, rpkid, completion): + def schedule_cron_tasks(self, rpkid): try: tasks = self.cron_tasks except AttributeError: tasks = self.cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes) for task in tasks: rpkid.task_add(task) - completion.register(task) + yield task.completed # Plain old Python generator yield, this is not a coroutine def find_covering_ca_details(self, resources): @@ -403,10 +424,10 @@ class Repository(models.Model): elements = ("bpki_cert", "bpki_glue")) - def xml_post_save_hook(self, rpkid, q_pdu, cb, eb): + @tornado.gen.coroutine + def xml_post_save_hook(self, rpkid, q_pdu): if q_pdu.get("clear_replay_protection"): self.clear_replay_protection() - cb() def clear_replay_protection(self): @@ -414,7 +435,8 @@ class Repository(models.Model): self.save() - def call_pubd(self, rpkid, callback, errback, q_msg, handlers = {}, length_check = True): # pylint: disable=W0102 + @tornado.gen.coroutine + def call_pubd(self, rpkid, q_msg, handlers = {}, length_check = True): # pylint: disable=W0102 """ Send a message to publication daemon and return the response. @@ -428,47 +450,46 @@ class Repository(models.Model): handler value of False suppresses calling of the default handler. """ - try: - if len(q_msg) == 0: - return callback() - - for q_pdu in q_msg: - logger.info("Sending %r to pubd", q_pdu) - - bsc = self.bsc - q_der = rpki.publication.cms_msg().wrap(q_msg, bsc.private_key_id, bsc.signing_cert, bsc.signing_cert_crl) - bpki_ta_path = (rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue) - - def done(r_der): - try: - logger.debug("Received response from pubd") - r_cms = rpki.publication.cms_msg(DER = r_der) - r_msg = r_cms.unwrap(bpki_ta_path) - r_cms.check_replay_sql(self, self.peer_contact_uri) - for r_pdu in r_msg: - handler = handlers.get(r_pdu.get("tag"), rpki.publication.raise_if_error) - if handler: - logger.debug("Calling pubd handler %r", handler) - handler(r_pdu) - if length_check and len(q_msg) != len(r_msg): - raise rpki.exceptions.BadPublicationReply("Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg)) - callback() - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - errback(e) - - logger.debug("Sending request to pubd") - rpki.http.client( - url = self.peer_contact_uri, - msg = q_der, - callback = done, - errback = errback) - - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - errback(e) + if len(q_msg) == 0: + raise tornado.gen.Return + + for q_pdu in q_msg: + logger.info("Sending %r to pubd", q_pdu) + + q_der = rpki.publication.cms_msg().wrap(q_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl) + + http_client = tornado.httpclient.AsyncHTTPClient() + + http_request = tornado.httpclient.HTTPRequest( + url = self.peer_contact_uri, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.publication.content_type }) + + http_response = yield http_client.fetch(http_request) + + # Tornado already checked http_response.code for us + + content_type = http_response.headers.get("Content-Type") + + if content_type not in rpki.publication.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.publication.content_type, content_type)) + + r_der = http_response.body + r_cms = rpki.publication.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue)) + r_cms.check_replay_sql(self, self.peer_contact_uri) + + for r_pdu in r_msg: + handler = handlers.get(r_pdu.get("tag"), rpki.publication.raise_if_error) + if handler: + logger.debug("Calling pubd handler %r", handler) + handler(r_pdu) + + if length_check and len(q_msg) != len(r_msg): + raise rpki.exceptions.BadPublicationReply("Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg)) + + raise tornado.gen.Return(r_msg) @xml_hooks @@ -496,51 +517,44 @@ class Parent(models.Model): elements = ("bpki_cert", "bpki_glue")) - def xml_pre_delete_hook(self, rpkid, cb, eb): - self.destroy(rpkid, cb, delete_parent = False) - + @tornado.gen.coroutine + def xml_pre_delete_hook(self, rpkid): + self.destroy(rpkid, delete_parent = False) - def xml_post_save_hook(self, rpkid, q_pdu, cb, eb): + @tornado.gen.coroutine + def xml_post_save_hook(self, rpkid, q_pdu): if q_pdu.get("clear_replay_protection"): self.clear_replay_protection() - actions = [] + futures = [] if q_pdu.get("rekey"): - actions.append(self.serve_rekey) + futures.append(self.serve_rekey(rpkid)) if q_pdu.get("revoke"): - actions.append(self.serve_revoke) + futures.append(self.serve_revoke(rpkid)) if q_pdu.get("reissue"): - actions.append(self.serve_reissue) + futures.append(self.serve_reissue(rpkid)) if q_pdu.get("revoke_forgotten"): - actions.append(self.serve_revoke_forgotten) - def loop(iterator, action): - action(iterator, eb) - rpki.async.iterator(actions, loop, cb) + futures.append(self.serve_revoke_forgotten(rpkid)) + yield futures + @tornado.gen.coroutine + def serve_rekey(self, rpkid): + yield [ca.rekey() for ca in self.cas.all()] - def serve_rekey(self, rpkid, cb, eb): - def loop(iterator, ca): - ca.rekey(iterator, eb) - rpki.async.iterator(self.cas.all(), loop, cb) - - - def serve_revoke(self, rpkid, cb, eb): - def loop(iterator, ca): - ca.revoke(cb = iterator, eb = eb) - rpki.async.iterator(self.cas.all(), loop, cb) - - - def serve_reissue(self, rpkid, cb, eb): - def loop(iterator, ca): - ca.reissue(cb = iterator, eb = eb) - rpki.async.iterator(self.cas.all(), loop, cb) + @tornado.gen.coroutine + def serve_revoke(self, rpkid): + yield [ca.revoke() for ca in self.cas.all()] + @tornado.gen.coroutine + def serve_reissue(self, rpkid): + yield [ca.reissue() for ca in self.cas.all()] def clear_replay_protection(self): self.last_cms_timestamp = None self.save() - def get_skis(self, rpkid, cb, eb): + @tornado.gen.coroutine + def get_skis(self, rpkid): """ Fetch SKIs that this parent thinks we have. In theory this should agree with our own database, but in practice stuff can happen, so @@ -550,28 +564,32 @@ class Parent(models.Model): set of SKIs as value. """ - def done(r_msg): - cb(dict((rc.get("class_name"), - set(rpki.x509.X509(Base64 = c.text).gSKI() - for c in rc.getiterator(rpki.up_down.tag_certificate))) - for rc in r_msg.getiterator(rpki.up_down.tag_class))) - self.up_down_list_query(rpkid = rpkid, cb = done, eb = eb) + r_msg = yield self.up_down_list_query(rpkid = rpkid) + + ski_map = {} + + for rc in r_msg.getiterator(rpki.up_down.tag_class): + skis = set() + for c in rc.getiterator(rpki.up_down.tag_certificate): + skis.add(rpki.x509.X509(Base64 = c.text).gSKI()) + ski_map[rc.get("class_name")] = skis + raise tornado.gen.Return(ski_map) - def revoke_skis(self, rpkid, rc_name, skis_to_revoke, cb, eb): + + @tornado.gen.coroutine + def revoke_skis(self, rpkid, rc_name, skis_to_revoke): """ Revoke a set of SKIs within a particular resource class. """ - def loop(iterator, ski): - def revoked(r_pdu): - iterator() + for ski in skis_to_revoke: logger.debug("Asking parent %r to revoke class %r, SKI %s", self, rc_name, ski) - self.up_down_revoke_query(rpkid = rpkid, class_name = rc_name, ski = ski, cb = revoked, eb = eb) - rpki.async.iterator(skis_to_revoke, loop, cb) + yield self.up_down_revoke_query(rpkid = rpkid, class_name = rc_name, ski = ski) - def serve_revoke_forgotten(self, rpkid, cb, eb): + @tornado.gen.coroutine + def serve_revoke_forgotten(self, rpkid): """ Handle a left-right revoke_forgotten action for this parent. @@ -585,36 +603,24 @@ class Parent(models.Model): require an explicit trigger. """ - def got_skis(skis_from_parent): - def loop(iterator, item): - rc_name, skis_to_revoke = item - if rc_name in ca_map: - for ca_detail in ca_map[rc_name].issue_response_candidate_ca_details: - skis_to_revoke.discard(ca_detail.latest_ca_cert.gSKI()) - self.revoke_skis(rpkid, rc_name, skis_to_revoke, iterator, eb) - ca_map = dict((ca.parent_resource_class, ca) for ca in self.cas.all()) - rpki.async.iterator(skis_from_parent.items(), loop, cb) - self.get_skis(rpkid, got_skis, eb) + skis_from_parent = yield self.get_skis(rpkid) + for rc_name, skis_to_revoke in skis_from_parent.iteritems(): + for ca_detail in CADetail.objects.filter(ca__parent = self).exclude(state = "revoked"): + skis_to_revoke.discard(ca_detail.latest_ca_cert.gSKI()) + yield self.revoke_skis(rpkid, rc_name, skis_to_revoke) - def destroy(self, rpkid, cb, delete_parent = True): + @tornado.gen.coroutine + def destroy(self, rpkid, delete_parent = True): """ Delete all the CA stuff under this parent, and perhaps the parent itself. """ - def loop(iterator, ca): - ca.destroy(self, iterator) - def revoke(): - self.serve_revoke_forgotten(rpkid, done, fail) - def fail(e): - logger.warning("Trouble getting parent to revoke certificates, blundering onwards: %s", e) - done() - def done(): - if delete_parent: - self.delete() - cb() - rpki.async.iterator(self.cas, loop, revoke) + yield [ca.destroy(self) for ca in self.cas()] + yield self.serve_revoke_forgotten(rpkid) + if delete_parent: + self.delete() def _compose_up_down_query(self, query_type): @@ -622,12 +628,15 @@ class Parent(models.Model): sender = self.sender_name, recipient = self.recipient_name, type = query_type) - def up_down_list_query(self, rpkid, cb, eb): + @tornado.gen.coroutine + def up_down_list_query(self, rpkid): q_msg = self._compose_up_down_query("list") - self.query_up_down(rpkid, q_msg, cb, eb) + r_msg = yield self.query_up_down(rpkid, q_msg) + raise tornado.gen.Return(r_msg) - def up_down_issue_query(self, rpkid, ca, ca_detail, cb, eb): + @tornado.gen.coroutine + def up_down_issue_query(self, rpkid, ca, ca_detail): logger.debug("Parent.up_down_issue_query(): caRepository %r rpkiManifest %r rpkiNotify %r", ca.sia_uri, ca_detail.manifest_uri, ca.parent.repository.rrdp_notification_uri) pkcs10 = rpki.x509.PKCS10.create( @@ -639,16 +648,19 @@ class Parent(models.Model): q_msg = self._compose_up_down_query("issue") q_pdu = SubElement(q_msg, rpki.up_down.tag_request, class_name = ca.parent_resource_class) q_pdu.text = pkcs10.get_Base64() - self.query_up_down(rpkid, q_msg, cb, eb) + r_msg = yield self.query_up_down(rpkid, q_msg) + raise tornado.gen.Return(r_msg) - - def up_down_revoke_query(self, rpkid, class_name, ski, cb, eb): + @tornado.gen.coroutine + def up_down_revoke_query(self, rpkid, class_name, ski): q_msg = self._compose_up_down_query("revoke") SubElement(q_msg, rpki.up_down.tag_key, class_name = class_name, ski = ski) - self.query_up_down(rpkid, q_msg, cb, eb) + r_msg = yield self.query_up_down(rpkid, q_msg) + raise tornado.gen.Return(r_msg) - def query_up_down(self, rpkid, q_msg, cb, eb): + @tornado.gen.coroutine + def query_up_down(self, rpkid, q_msg): if self.bsc is None: raise rpki.exceptions.BSCNotFound("Could not find BSC") @@ -656,37 +668,32 @@ class Parent(models.Model): if self.bsc.signing_cert is None: raise rpki.exceptions.BSCNotReady("BSC %r is not yet usable" % self.bsc.bsc_handle) - q_der = rpki.up_down.cms_msg().wrap(q_msg, - self.bsc.private_key_id, - self.bsc.signing_cert, - self.bsc.signing_cert_crl) - - def unwrap(r_der): - try: - r_cms = rpki.up_down.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((rpkid.bpki_ta, - self.tenant.bpki_cert, - self.tenant.bpki_glue, - self.bpki_cert, - self.bpki_glue)) - r_cms.check_replay_sql(self, self.peer_contact_uri) - rpki.up_down.check_response(r_msg, q_msg.get("type")) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - eb(e) - else: - cb(r_msg) + q_der = rpki.up_down.cms_msg().wrap(q_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl) + + http_client = tornado.httpclient.AsyncHTTPClient() + + http_request = tornado.httpclient.HTTPRequest( + url = self.peer_contact_uri, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.up_down.content_type }) + + http_response = yield http_client.fetch(http_request) - logger.debug("query_up_down(): type(q_der) %r", type(q_der)) # XXX + # Tornado already checked http_response.code for us - rpki.http.client( - msg = q_der, - url = self.peer_contact_uri, - callback = unwrap, - errback = eb, - content_type = rpki.up_down.content_type) + content_type = http_response.headers.get("Content-Type") + + if content_type not in rpki.up_down.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.up_down.content_type, content_type)) + + r_der = http_response.body + r_cms = rpki.up_down.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue)) + r_cms.check_replay_sql(self, self.peer_contact_uri) + rpki.up_down.check_response(r_msg, q_msg.get("type")) + + raise tornado.gen.Return(r_msg) def construct_sia_uri(self, rc): @@ -736,7 +743,8 @@ class CA(models.Model): #def issue_response_candidate_ca_details(self): return self.ca_details.exclude(state = "revoked") - def check_for_updates(self, rpkid, parent, rc, cb, eb): + @tornado.gen.coroutine + def check_for_updates(self, rpkid, parent, rc): """ Parent has signaled continued existance of a resource class we already knew about, so we need to check for an updated @@ -747,74 +755,86 @@ class CA(models.Model): logger.debug("check_for_updates()") sia_uri = parent.construct_sia_uri(rc) sia_uri_changed = self.sia_uri != sia_uri + if sia_uri_changed: logger.debug("SIA changed: was %s now %s", self.sia_uri, sia_uri) self.sia_uri = sia_uri + class_name = rc.get("class_name") + rc_resources = rpki.resource_set.resource_bag( rc.get("resource_set_as"), rc.get("resource_set_ipv4"), rc.get("resource_set_ipv6"), rc.get("resource_set_notafter")) + cert_map = {} + for c in rc.getiterator(rpki.up_down.tag_certificate): x = rpki.x509.X509(Base64 = c.text) u = rpki.up_down.multi_uri(c.get("cert_url")).rsync() cert_map[x.gSKI()] = (x, u) - def loop(iterator, ca_detail): + + ca_details = self.ca_details.exclude(state = "revoked") + + if not ca_details: + logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", + class_name, parent.tenant.tenant_handle, parent.parent_handle) + yield self.rekey(rpkid) + raise tornado.gen.Return + + for ca_detail in ca_details: + rc_cert, rc_cert_uri = cert_map.pop(ca_detail.public_key.gSKI(), (None, None)) + if rc_cert is None: logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, " "maybe parent certificate went away?", ca_detail.public_key.gSKI(), class_name, parent.tenant.tenant_handle, parent.parent_handle) publisher = rpki.rpkid.publication_queue(rpkid) ca_detail.destroy(ca = ca_detail.ca, publisher = publisher) - return publisher.call_pubd(iterator, eb) + yield publisher.call_pubd() + continue + if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert_uri: logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert_uri) ca_detail.ca_cert_uri = rc_cert_uri ca_detail.save() + if ca_detail.state not in ("pending", "active"): - return iterator() + continue + if ca_detail.state == "pending": current_resources = rpki.resource_set.resource_bag() else: current_resources = ca_detail.latest_ca_cert.get_3779resources() + if (ca_detail.state == "pending" or sia_uri_changed or ca_detail.latest_ca_cert != rc_cert or ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or current_resources.undersized(rc_resources) or current_resources.oversized(rc_resources)): - return ca_detail.update( + + yield ca_detail.update( rpkid = rpkid, parent = parent, ca = self, rc = rc, sia_uri_changed = sia_uri_changed, - old_resources = current_resources, - callback = iterator, - errback = eb) - iterator() - def done(): - if cert_map: - logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?", - "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle) - cb() - ca_details = self.ca_details.exclude(state = "revoked") - if ca_details: - rpki.async.iterator(ca_details, loop, done) - else: - logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", - class_name, parent.tenant.tenant_handle, parent.parent_handle) - self.rekey(rpkid, cb, eb) + old_resources = current_resources) + + if cert_map: + logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?", + "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle) # Called from exactly one place, in rpki.rpkid_tasks.PollParentTask.class_loop(). # Might want to refactor. @classmethod - def create(cls, rpkid, parent, rc, cb, eb): + @tornado.gen.coroutine + def create(cls, rpkid, parent, rc): """ Parent has signaled existance of a new resource class, so we need to create and set up a corresponding CA object. @@ -823,22 +843,26 @@ class CA(models.Model): self = cls.objects.create(parent = parent, parent_resource_class = rc.get("class_name"), sia_uri = parent.construct_sia_uri(rc)) + ca_detail = CADetail.create(self) - def done(r_msg): - c = r_msg[0][0] - logger.debug("CA %r received certificate %s", self, c.get("cert_url")) - ca_detail.activate( - rpkid = rpkid, - ca = self, - cert = rpki.x509.X509(Base64 = c.text), - uri = c.get("cert_url"), - callback = cb, - errback = eb) + logger.debug("Sending issue request to %r from %r", parent, self.create) - parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = ca_detail, cb = done, eb = eb) + r_msg = yield parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = ca_detail) + + c = r_msg[0][0] + + logger.debug("CA %r received certificate %s", self, c.get("cert_url")) - def destroy(self, rpkid, parent, callback): + yield ca_detail.activate( + rpkid = rpkid, + ca = self, + cert = rpki.x509.X509(Base64 = c.text), + uri = c.get("cert_url")) + + + @tornado.gen.coroutine + def destroy(self, rpkid, parent): """ The list of current resource classes received from parent does not include the class corresponding to this CA, so we need to delete @@ -850,17 +874,20 @@ class CA(models.Model): CA, then finally delete this CA itself. """ - def lose(e): - logger.exception("Could not delete CA %r, skipping", self) - callback() - def done(): - logger.debug("Deleting %r", self) - self.delete() - callback() publisher = rpki.rpkid.publication_queue(rpkid) + for ca_detail in self.ca_details.all(): ca_detail.destroy(ca = self, publisher = publisher, allow_failure = True) - publisher.call_pubd(done, lose) + + try: + yield publisher.call_pubd() + + except: + logger.exception("Could not delete CA %r, skipping", self) + + else: + logger.debug("Deleting %r", self) + self.delete() def next_serial_number(self): @@ -893,7 +920,8 @@ class CA(models.Model): return self.last_crl_sn - def rekey(self, rpkid, cb, eb): + @tornado.gen.coroutine + def rekey(self, rpkid): """ Initiate a rekey operation for this CA. Generate a new keypair. Request cert from parent using new keypair. Mark result as our @@ -908,44 +936,46 @@ class CA(models.Model): new_detail = CADetail.create(ca = self) # sic: class method, not manager function (for now, anyway) - def done(r_msg): - c = r_msg[0][0] - logger.debug("CA %r received certificate %s", self, c.get("cert_url")) - new_detail.activate( - rpkid = rpkid, - ca = self, - cert = rpki.x509.X509(Base64 = c.text), - uri = c.get("cert_url"), - predecessor = old_detail, - callback = cb, - errback = eb) - logger.debug("Sending issue request to %r from %r", self.parent, self.rekey) - self.parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = new_detail, cb = done, eb = eb) + + r_msg = yield self.parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = new_detail) + + c = r_msg[0][0] + + logger.debug("CA %r received certificate %s", self, c.get("cert_url")) + + yield new_detail.activate( + rpkid = rpkid, + ca = self, + cert = rpki.x509.X509(Base64 = c.text), + uri = c.get("cert_url"), + predecessor = old_detail) - def revoke(self, cb, eb, revoke_all = False): + @tornado.gen.coroutine + def revoke(self, revoke_all = False): """ Revoke deprecated ca_detail objects associated with this CA, or all ca_details associated with this CA if revoke_all is set. """ - def loop(iterator, ca_detail): - ca_detail.revoke(cb = iterator, eb = eb) - rpki.async.iterator(self.ca_details.all() if revoke_all else self.ca_details.filter(state = "deprecated"), - loop, cb) + if revoke_all: + ca_details = self.ca_details.all() + else: + ca_details = self.ca_details.filter(state = "deprecated") + + yield [ca_detail.revoke() for ca_detail in ca_details] - def reissue(self, cb, eb): + @tornado.gen.coroutine + def reissue(self): """ Reissue all current certificates issued by this CA. """ ca_detail = self.ca_details.get(state = "active") if ca_detail: - ca_detail.reissue(cb, eb) - else: - cb() + yield ca_detail.reissue() class CADetail(models.Model): @@ -1014,7 +1044,8 @@ class CADetail(models.Model): return target.asn <= me.asn and target.v4 <= me.v4 and target.v6 <= me.v6 - def activate(self, rpkid, ca, cert, uri, callback, errback, predecessor = None): + @tornado.gen.coroutine + def activate(self, rpkid, ca, cert, uri, predecessor = None): """ Activate this ca_detail. """ @@ -1027,6 +1058,7 @@ class CADetail(models.Model): self.generate_crl(publisher = publisher) self.generate_manifest(publisher = publisher) self.save() + if predecessor is not None: predecessor.state = "deprecated" predecessor.save() @@ -1038,7 +1070,8 @@ class CADetail(models.Model): ghostbuster.regenerate(publisher = publisher) predecessor.generate_crl(publisher = publisher) predecessor.generate_manifest(publisher = publisher) - publisher.call_pubd(callback, errback) + + yield publisher.call_pubd() def destroy(self, ca, publisher, allow_failure = False): @@ -1068,7 +1101,9 @@ class CADetail(models.Model): logger.debug("Deleting %r", self) self.delete() - def revoke(self, rpkid, cb, eb): + + @tornado.gen.coroutine + def revoke(self, rpkid): """ Request revocation of all certificates whose SKI matches the key for this ca_detail. @@ -1091,84 +1126,109 @@ class CADetail(models.Model): time has passed. """ - ca = self.ca - parent = ca.parent - class_name = ca.parent_resource_class gski = self.latest_ca_cert.gSKI() - def parent_revoked(r_msg): - if r_msg[0].get("class_name") != class_name: - raise rpki.exceptions.ResourceClassMismatch - if r_msg[0].get("ski") != gski: - raise rpki.exceptions.SKIMismatch - logger.debug("Parent revoked %s, starting cleanup", gski) - crl_interval = rpki.sundial.timedelta(seconds = parent.tenant.crl_interval) - nextUpdate = rpki.sundial.now() - if self.latest_manifest is not None: - self.latest_manifest.extract_if_needed() - nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate()) - if self.latest_crl is not None: - nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate()) - publisher = rpki.rpkid.publication_queue(rpkid) - for child_cert in self.child_certs.all(): - nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter()) - child_cert.revoke(publisher = publisher) - for roa in self.roas.all(): - nextUpdate = nextUpdate.later(roa.cert.getNotAfter()) - roa.revoke(publisher = publisher) - for ghostbuster in self.ghostbusters.all(): - nextUpdate = nextUpdate.later(ghostbuster.cert.getNotAfter()) - ghostbuster.revoke(publisher = publisher) - nextUpdate += crl_interval - self.generate_crl(publisher = publisher, nextUpdate = nextUpdate) - self.generate_manifest(publisher = publisher, nextUpdate = nextUpdate) - self.private_key_id = None - self.manifest_private_key_id = None - self.manifest_public_key = None - self.latest_manifest_cert = None - self.state = "revoked" - self.save() - publisher.call_pubd(cb, eb) logger.debug("Asking parent to revoke CA certificate %s", gski) - parent.up_down_revoke_query(rpkid = rpkid, class_name = class_name, ski = gski, cb = parent_revoked, eb = eb) + r_msg = yield self.ca.parent.up_down_revoke_query(rpkid = rpkid, class_name = self.ca.parent_resource_class, ski = gski) + + if r_msg[0].get("class_name") != self.ca.parent_resource_class: + raise rpki.exceptions.ResourceClassMismatch + + if r_msg[0].get("ski") != gski: + raise rpki.exceptions.SKIMismatch + + logger.debug("Parent revoked %s, starting cleanup", gski) + + crl_interval = rpki.sundial.timedelta(seconds = self.ca.parent.tenant.crl_interval) + + nextUpdate = rpki.sundial.now() + + if self.latest_manifest is not None: + self.latest_manifest.extract_if_needed() + nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate()) - def update(self, rpkid, parent, ca, rc, sia_uri_changed, old_resources, callback, errback): + if self.latest_crl is not None: + nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate()) + + publisher = rpki.rpkid.publication_queue(rpkid) + + for child_cert in self.child_certs.all(): + nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter()) + child_cert.revoke(publisher = publisher) + + for roa in self.roas.all(): + nextUpdate = nextUpdate.later(roa.cert.getNotAfter()) + roa.revoke(publisher = publisher) + + for ghostbuster in self.ghostbusters.all(): + nextUpdate = nextUpdate.later(ghostbuster.cert.getNotAfter()) + ghostbuster.revoke(publisher = publisher) + + nextUpdate += crl_interval + + self.generate_crl(publisher = publisher, nextUpdate = nextUpdate) + self.generate_manifest(publisher = publisher, nextUpdate = nextUpdate) + self.private_key_id = None + self.manifest_private_key_id = None + self.manifest_public_key = None + self.latest_manifest_cert = None + self.state = "revoked" + self.save() + + yield publisher.call_pubd() + + + @tornado.gen.coroutine + def update(self, rpkid, parent, ca, rc, sia_uri_changed, old_resources): """ Need to get a new certificate for this ca_detail and perhaps frob children of this ca_detail. """ - def issued(r_msg): - c = r_msg[0][0] - cert = rpki.x509.X509(Base64 = c.text) - cert_url = c.get("cert_url") - logger.debug("CA %r received certificate %s", self, cert_url) - if self.state == "pending": - return self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url, callback = callback, errback = errback) - validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter() - publisher = rpki.rpkid.publication_queue(rpkid) - if self.latest_ca_cert != cert: - self.latest_ca_cert = cert - self.save() - self.generate_manifest_cert() - self.generate_crl(publisher = publisher) - self.generate_manifest(publisher = publisher) - new_resources = self.latest_ca_cert.get_3779resources() - if sia_uri_changed or old_resources.oversized(new_resources): - for child_cert in self.child_certs.all(): - child_resources = child_cert.cert.get_3779resources() - if sia_uri_changed or child_resources.oversized(new_resources): - child_cert.reissue(ca_detail = self, resources = child_resources & new_resources, publisher = publisher) - if sia_uri_changed or validity_changed or old_resources.oversized(new_resources): - for roa in self.roas.all(): - roa.update(publisher = publisher, fast = True) - if sia_uri_changed or validity_changed: - for ghostbuster in self.ghostbusters.all(): - ghostbuster.update(publisher = publisher, fast = True) - publisher.call_pubd(callback, errback) logger.debug("Sending issue request to %r from %r", parent, self.update) - parent.up_down_issue_query(rpkid = rpkid, ca = ca, ca_detail = self, cb = issued, eb = errback) + + r_msg = yield parent.up_down_issue_query(rpkid = rpkid, ca = ca, ca_detail = self) + + c = r_msg[0][0] + + cert = rpki.x509.X509(Base64 = c.text) + cert_url = c.get("cert_url") + + logger.debug("CA %r received certificate %s", self, cert_url) + + if self.state == "pending": + yield self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url) + raise tornado.gen.Return + + validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter() + + publisher = rpki.rpkid.publication_queue(rpkid) + + if self.latest_ca_cert != cert: + self.latest_ca_cert = cert + self.save() + self.generate_manifest_cert() + self.generate_crl(publisher = publisher) + self.generate_manifest(publisher = publisher) + + new_resources = self.latest_ca_cert.get_3779resources() + + if sia_uri_changed or old_resources.oversized(new_resources): + for child_cert in self.child_certs.all(): + child_resources = child_cert.cert.get_3779resources() + if sia_uri_changed or child_resources.oversized(new_resources): + child_cert.reissue(ca_detail = self, resources = child_resources & new_resources, publisher = publisher) + + if sia_uri_changed or validity_changed or old_resources.oversized(new_resources): + for roa in self.roas.all(): + roa.update(publisher = publisher, fast = True) + + if sia_uri_changed or validity_changed: + for ghostbuster in self.ghostbusters.all(): + ghostbuster.update(publisher = publisher, fast = True) + + yield publisher.call_pubd() @classmethod @@ -1179,9 +1239,13 @@ class CADetail(models.Model): cer_keypair = rpki.x509.RSA.generate() mft_keypair = rpki.x509.RSA.generate() - return cls.objects.create(ca = ca, state = "pending", - private_key_id = cer_keypair, public_key = cer_keypair.get_public(), - manifest_private_key_id = mft_keypair, manifest_public_key = mft_keypair.get_public()) + return cls.objects.create( + ca = ca, + state = "pending", + private_key_id = cer_keypair, + public_key = cer_keypair.get_public(), + manifest_private_key_id = mft_keypair, + manifest_public_key = mft_keypair.get_public()) def issue_ee(self, ca, resources, subject_key, sia, @@ -1361,7 +1425,8 @@ class CADetail(models.Model): self.save() - def reissue(self, rpkid, cb, eb): + @tornado.gen.coroutine + def reissue(self, rpkid): """ Reissue all current certificates issued by this ca_detail. """ @@ -1381,7 +1446,7 @@ class CADetail(models.Model): self.generate_crl(publisher = publisher) self.generate_manifest(publisher = publisher) self.save() - publisher.call_pubd(cb, eb) + yield publisher.call_pubd() def check_failed_publication(self, publisher, check_all = True): @@ -1476,27 +1541,27 @@ class Child(models.Model): elements = ("bpki_cert", "bpki_glue")) - def xml_pre_delete_hook(self, rpkid, cb, eb): + @tornado.gen.coroutine + def xml_pre_delete_hook(self, rpkid): publisher = rpki.rpkid.publication_queue(rpkid) for child_cert in self.child_certs.all(): child_cert.revoke(publisher = publisher, generate_crl_and_manifest = True) - publisher.call_pubd(cb, eb) + yield publisher.call_pubd() - def xml_post_save_hook(self, rpkid, q_pdu, cb, eb): + @tornado.gen.coroutine + def xml_post_save_hook(self, rpkid, q_pdu): if q_pdu.get("clear_replay_protection"): self.clear_replay_protection() if q_pdu.get("reissue"): - self.serve_reissue(rpkid, cb, eb) - else: - cb() + yield self.serve_reissue(rpkid) - def serve_reissue(self, rpkid, cb, eb): + def serve_reissue(self, rpkid): publisher = rpki.rpkid.publication_queue(rpkid) for child_cert in self.child_certs.all(): child_cert.reissue(child_cert.ca_detail, publisher, force = True) - publisher.call_pubd(cb, eb) + yield publisher.call_pubd() def clear_replay_protection(self): @@ -1504,82 +1569,40 @@ class Child(models.Model): self.save() - def up_down_handle_list(self, rpkid, q_msg, r_msg, callback, errback): - def got_resources(irdb_resources): - if irdb_resources.valid_until < rpki.sundial.now(): - logger.debug("Child %s's resources expired %s", self.child_handle, irdb_resources.valid_until) - else: - for ca_detail in CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): - resources = ca_detail.latest_ca_cert.get_3779resources() & irdb_resources - if resources.empty(): - logger.debug("No overlap between received resources and what child %s should get ([%s], [%s])", - self.child_handle, ca_detail.latest_ca_cert.get_3779resources(), irdb_resources) - continue - rc = SubElement(r_msg, rpki.up_down.tag_class, - class_name = ca_detail.ca.parent_resource_class, - cert_url = ca_detail.ca_cert_uri, - resource_set_as = str(resources.asn), - resource_set_ipv4 = str(resources.v4), - resource_set_ipv6 = str(resources.v6), - resource_set_notafter = str(resources.valid_until)) - for child_cert in self.child_certs.filter(ca_detail = ca_detail): - c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri) - c.text = child_cert.cert.get_Base64() - SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64() - callback() - rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle, got_resources, errback) - - - def up_down_handle_issue(self, rpkid, q_msg, r_msg, callback, errback): - - def got_resources(irdb_resources): - - def done(): + @tornado.gen.coroutine + def up_down_handle_list(self, rpkid, q_msg, r_msg): + + irdb_resources = yield rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle) + + if irdb_resources.valid_until < rpki.sundial.now(): + logger.debug("Child %s's resources expired %s", self.child_handle, irdb_resources.valid_until) + + else: + + for ca_detail in CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"): + resources = ca_detail.latest_ca_cert.get_3779resources() & irdb_resources + + if resources.empty(): + logger.debug("No overlap between received resources and what child %s should get ([%s], [%s])", + self.child_handle, ca_detail.latest_ca_cert.get_3779resources(), irdb_resources) + continue + rc = SubElement(r_msg, rpki.up_down.tag_class, - class_name = class_name, + class_name = ca_detail.ca.parent_resource_class, cert_url = ca_detail.ca_cert_uri, resource_set_as = str(resources.asn), resource_set_ipv4 = str(resources.v4), resource_set_ipv6 = str(resources.v6), resource_set_notafter = str(resources.valid_until)) - c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri) - c.text = child_cert.cert.get_Base64() - SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64() - callback() - - if irdb_resources.valid_until < rpki.sundial.now(): - raise rpki.exceptions.IRDBExpired("IRDB entry for child %s expired %s" % ( - self.child_handle, irdb_resources.valid_until)) - - resources = irdb_resources & ca_detail.latest_ca_cert.get_3779resources() - resources.valid_until = irdb_resources.valid_until - req_key = pkcs10.getPublicKey() - req_sia = pkcs10.get_SIA() - - # Generate new cert or regenerate old one if necessary - publisher = rpki.rpkid.publication_queue(rpkid) - - try: - child_cert = self.child_certs.get(ca_detail = ca_detail, ski = req_key.get_SKI()) - - except ChildCert.DoesNotExist: - child_cert = ca_detail.issue( - ca = ca_detail.ca, - child = self, - subject_key = req_key, - sia = req_sia, - resources = resources, - publisher = publisher) + for child_cert in self.child_certs.filter(ca_detail = ca_detail): + c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri) + c.text = child_cert.cert.get_Base64() + SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64() - else: - child_cert = child_cert.reissue( - ca_detail = ca_detail, - sia = req_sia, - resources = resources, - publisher = publisher) - publisher.call_pubd(done, errback) + @tornado.gen.coroutine + def up_down_handle_issue(self, rpkid, q_msg, r_msg): req = q_msg[0] assert req.tag == rpki.up_down.tag_request @@ -1591,24 +1614,60 @@ class Child(models.Model): class_name = req.get("class_name") pkcs10 = rpki.x509.PKCS10(Base64 = req.text) + pkcs10.check_valid_request_ca() + ca_detail = CADetail.objects.get(ca__parent__tenant = self.tenant, state = "active", + ca__parent_resource_class = class_name) - # XXX - logger.debug("Child.up_down_handle_issue(): PKCS #10 %s", pkcs10.get_Base64()) - sia = pkcs10.get_SIA() - logger.debug("Child.up_down_handle_issue(): PKCS #10 SIA %r (%r, %r, %r, %r) %r", - type(sia), type(sia[0]), type(sia[1]), type(sia[2]), type(sia[3]), sia) + irdb_resources = yield rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle) - pkcs10.check_valid_request_ca() - ca_detail = CADetail.objects.get(ca__parent__tenant = self.tenant, - ca__parent_resource_class = class_name, - state = "active") - rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle, got_resources, errback) + if irdb_resources.valid_until < rpki.sundial.now(): + raise rpki.exceptions.IRDBExpired("IRDB entry for child %s expired %s" % ( + self.child_handle, irdb_resources.valid_until)) + + resources = irdb_resources & ca_detail.latest_ca_cert.get_3779resources() + resources.valid_until = irdb_resources.valid_until + req_key = pkcs10.getPublicKey() + req_sia = pkcs10.get_SIA() + + # Generate new cert or regenerate old one if necessary + + publisher = rpki.rpkid.publication_queue(rpkid) + + try: + child_cert = self.child_certs.get(ca_detail = ca_detail, ski = req_key.get_SKI()) + except ChildCert.DoesNotExist: + child_cert = ca_detail.issue( + ca = ca_detail.ca, + child = self, + subject_key = req_key, + sia = req_sia, + resources = resources, + publisher = publisher) - def up_down_handle_revoke(self, rpkid, q_msg, r_msg, callback, errback): - def done(): - SubElement(r_msg, key.tag, class_name = class_name, ski = key.get("ski")) - callback() + else: + child_cert = child_cert.reissue( + ca_detail = ca_detail, + sia = req_sia, + resources = resources, + publisher = publisher) + + yield publisher.call_pubd() + + rc = SubElement(r_msg, rpki.up_down.tag_class, + class_name = class_name, + cert_url = ca_detail.ca_cert_uri, + resource_set_as = str(resources.asn), + resource_set_ipv4 = str(resources.v4), + resource_set_ipv6 = str(resources.v6), + resource_set_notafter = str(resources.valid_until)) + c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri) + c.text = child_cert.cert.get_Base64() + SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64() + + + @tornado.gen.coroutine + def up_down_handle_revoke(self, rpkid, q_msg, r_msg): key = q_msg[0] assert key.tag == rpki.up_down.tag_key class_name = key.get("class_name") @@ -1618,37 +1677,27 @@ class Child(models.Model): ca_detail__ca__parent_resource_class = class_name, ski = ski): child_cert.revoke(publisher = publisher) - publisher.call_pubd(done, errback) + yield publisher.call_pubd() + SubElement(r_msg, key.tag, class_name = class_name, ski = key.get("ski")) - def serve_up_down(self, rpkid, q_der, callback): + @tornado.gen.coroutine + def serve_up_down(self, rpkid, q_der): """ Outer layer of server handling for one up-down PDU from this child. """ - def done(): - callback(rpki.up_down.cms_msg().wrap(r_msg, - self.bsc.private_key_id, - self.bsc.signing_cert, - self.bsc.signing_cert_crl)) - - def lose(e): - logger.exception("Unhandled exception serving child %r", self) - rpki.up_down.generate_error_response_from_exception(r_msg, e, q_type) - done() - if self.bsc is None: raise rpki.exceptions.BSCNotFound("Could not find BSC") + q_cms = rpki.up_down.cms_msg(DER = q_der) - q_msg = q_cms.unwrap((rpkid.bpki_ta, - self.tenant.bpki_cert, - self.tenant.bpki_glue, - self.bpki_cert, - self.bpki_glue)) + q_msg = q_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue)) q_cms.check_replay_sql(self, "child", self.child_handle) q_type = q_msg.get("type") + logger.info("Serving %s query from child %s [sender %s, recipient %s]", q_type, self.child_handle, q_msg.get("sender"), q_msg.get("recipient")) + if rpki.up_down.enforce_strict_up_down_xml_sender and q_msg.get("sender") != self.child_handle: raise rpki.exceptions.BadSender("Unexpected XML sender %s" % q_msg.get("sender")) @@ -1656,12 +1705,14 @@ class Child(models.Model): sender = q_msg.get("recipient"), recipient = q_msg.get("sender"), type = q_type + "_response") try: - getattr(self, "up_down_handle_" + q_type)(rpkid, q_msg, r_msg, done, lose) - except (rpki.async.ExitNow, SystemExit): - raise + yield getattr(self, "up_down_handle_" + q_type)(rpkid, q_msg, r_msg) + except Exception, e: - lose(e) + logger.exception("Unhandled exception serving child %r", self) + rpki.up_down.generate_error_response_from_exception(r_msg, e, q_type) + r_der = rpki.up_down.cms_msg().wrap(r_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl) + raise tornado.gen.Return(r_der) class ChildCert(models.Model): cert = CertificateField() @@ -1840,8 +1891,7 @@ class EECertificate(models.Model): cn = cn, sn = sn, eku = eku) - self = cls(ca_detail = ca_detail, cert = cert, ski = subject_key.get_SKI()) - self.tenant = ca_detail.ca.parent.tenant + self = cls(tenant = ca_detail.ca.parent.tenant, ca_detail = ca_detail, cert = cert, ski = subject_key.get_SKI()) publisher.queue( uri = self.uri, new_obj = self.cert, diff --git a/rpki/up_down.py b/rpki/up_down.py index 1303647e..90965e45 100644 --- a/rpki/up_down.py +++ b/rpki/up_down.py @@ -38,8 +38,8 @@ version = "1" ## @var content_type # MIME content type to use when sending up-down queries. -#content_type = "application/rpki-updown" -content_type = "application/x-rpki" +content_type = "application/rpki-updown" +#content_type = "application/x-rpki" ## @var allowed_content_types # MIME content types which we consider acceptable for incoming up-down diff --git a/rpki/x509.py b/rpki/x509.py index 1be2f9a3..16981d06 100644 --- a/rpki/x509.py +++ b/rpki/x509.py @@ -47,7 +47,6 @@ import rpki.resource_set import rpki.oids import rpki.sundial import rpki.log -import rpki.async import rpki.relaxng logger = logging.getLogger(__name__) @@ -1510,9 +1509,7 @@ class CMS_object(DER_object): try: cms = self.get_POW() - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: + except: if self.print_on_der_error: logger.debug("Problem parsing DER CMS message, might not really be DER: %r", self.get_DER()) @@ -1593,9 +1590,7 @@ class CMS_object(DER_object): try: content = cms.verify(store) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: + except: if self.dump_on_verify_failure: if self.dump_using_dumpasn1: dbg = self.dumpasn1() @@ -1623,9 +1618,7 @@ class CMS_object(DER_object): try: cms = self.get_POW() - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: + except: raise rpki.exceptions.UnparsableCMSDER if cms.eContentType() != self.econtent_oid: |