aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildtools/pylint.rc2
-rw-r--r--ca/tests/smoketest.py7
-rw-r--r--rpki/adns.py5
-rw-r--r--rpki/async.py382
-rw-r--r--rpki/cli.py2
-rw-r--r--rpki/config.py42
-rw-r--r--rpki/exceptions.py3
-rw-r--r--rpki/fields.py2
-rw-r--r--rpki/http.py1061
-rw-r--r--rpki/left_right.py13
-rw-r--r--rpki/publication.py11
-rw-r--r--rpki/rpkic.py1
-rw-r--r--rpki/rpkid.py442
-rw-r--r--rpki/rpkid_tasks.py705
-rw-r--r--rpki/rpkidb/models.py990
-rw-r--r--rpki/up_down.py4
-rw-r--r--rpki/x509.py13
17 files changed, 1012 insertions, 2673 deletions
diff --git a/buildtools/pylint.rc b/buildtools/pylint.rc
index 34302f74..8eba84da 100644
--- a/buildtools/pylint.rc
+++ b/buildtools/pylint.rc
@@ -54,7 +54,7 @@ disable-msg-cat=
#enable-msg=
# Disable the message(s) with the given id(s).
-disable=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212,R0914,W0603,W0142,I0011,C0111,C0103,R0401,C0326,R0911,C0325,C0330,W0311,E1124
+disable=R0801,R0903,R0913,C0321,R0904,W0201,E1101,W0614,C0301,R0901,C0302,R0902,R0201,W0613,R0912,R0915,W0703,W0212,R0914,W0603,W0142,I0011,C0111,C0103,R0401,C0326,R0911,C0325,C0330,W0311,E1124,W0702
[REPORTS]
diff --git a/ca/tests/smoketest.py b/ca/tests/smoketest.py
index 5f18119c..3960981a 100644
--- a/ca/tests/smoketest.py
+++ b/ca/tests/smoketest.py
@@ -48,7 +48,6 @@ import rpki.log
import rpki.left_right
import rpki.config
import rpki.publication_control
-import rpki.async
from rpki.mysql_import import MySQLdb
@@ -784,7 +783,7 @@ class allocation(object):
for sql in rpki_sql:
try:
cur.execute(sql)
- except Exception:
+ except:
if "DROP TABLE IF EXISTS" not in sql.upper():
raise
db.close()
@@ -795,7 +794,7 @@ class allocation(object):
for sql in irdb_sql:
try:
cur.execute(sql)
- except Exception:
+ except:
if "DROP TABLE IF EXISTS" not in sql.upper():
raise
for s in [self] + self.hosts:
@@ -1244,7 +1243,7 @@ def setup_publication(pubd_sql, irdb_db_name):
for sql in pubd_sql:
try:
cur.execute(sql)
- except Exception:
+ except:
if "DROP TABLE IF EXISTS" not in sql.upper():
raise
db.close()
diff --git a/rpki/adns.py b/rpki/adns.py
index 018bb7cf..c5af3549 100644
--- a/rpki/adns.py
+++ b/rpki/adns.py
@@ -27,7 +27,6 @@ import time
import socket
import logging
import asyncore
-import rpki.async
import rpki.sundial
import rpki.log
@@ -62,12 +61,12 @@ for ns in resolver.nameservers:
try:
nameservers.append((socket.AF_INET, dns.ipv4.inet_aton(ns)))
continue
- except Exception:
+ except:
pass
try:
nameservers.append((socket.AF_INET6, dns.ipv6.inet_aton(ns)))
continue
- except Exception:
+ except:
pass
logger.error("Couldn't parse nameserver address %r", ns)
diff --git a/rpki/async.py b/rpki/async.py
deleted file mode 100644
index 74143bd1..00000000
--- a/rpki/async.py
+++ /dev/null
@@ -1,382 +0,0 @@
-# $Id$
-#
-# Copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notice and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-# AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-# PERFORMANCE OF THIS SOFTWARE.
-
-"""
-Utilities for event-driven programming.
-"""
-
-import gc
-import sys
-import signal
-import logging
-import asyncore
-import traceback
-import rpki.log
-import rpki.sundial
-
-logger = logging.getLogger(__name__)
-
-ExitNow = asyncore.ExitNow
-
-class iterator(object):
- """
- Iteration construct for event-driven code. Takes three
- arguments:
-
- - Some kind of iterable object
-
- - A callback to call on each item in the iteration
-
- - A callback to call after the iteration terminates.
-
- The item callback receives two arguments: the callable iterator
- object and the current value of the iteration. It should call the
- iterator (or arrange for the iterator to be called) when it is time
- to continue to the next item in the iteration.
-
- The termination callback receives no arguments.
-
- Special case for memory constrained cases: if keyword argument
- pop_list is True, iterable must be a list, which is modified in
- place, popping items off of it until it's empty.
- """
-
- def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False):
- assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list"
- self.item_callback = item_callback
- self.done_callback = done_callback if done_callback is not None else lambda: None
- self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
- self.unwind_stack = unwind_stack
- self.pop_list = pop_list
- try:
- if self.pop_list:
- self.iterator = iterable
- else:
- self.iterator = iter(iterable)
- except (ExitNow, SystemExit):
- raise
- except Exception:
- logger.debug("Problem constructing iterator for %s", repr(iterable))
- raise
- self.doit()
-
- def __repr__(self):
- return rpki.log.log_repr(self,
- "created at %s:%s" % (self.caller_file,
- self.caller_line),
- self.caller_function)
-
- def __call__(self):
- if self.unwind_stack:
- event_defer(self.doit)
- else:
- self.doit()
-
- def doit(self):
- """
- Implement the iterator protocol: attempt to call the item handler
- with the next iteration value, call the termination handler if the
- iterator signaled StopIteration.
- """
-
- try:
- if self.pop_list:
- val = self.iterator.pop(0)
- else:
- val = self.iterator.next()
- except (IndexError, StopIteration):
- self.done_callback()
- else:
- self.item_callback(self, val)
-
-## @var timer_queue
-# Timer queue.
-
-timer_queue = []
-
-class timer(object):
- """
- Timer construct for event-driven code.
- """
-
- ## @var gc_debug
- # Verbose chatter about timers states and garbage collection.
- gc_debug = False
-
- ## @var run_debug
- # Verbose chatter about timers being run.
- run_debug = False
-
- def __init__(self, handler = None, errback = None):
- self.set_handler(handler)
- self.set_errback(errback)
- self.when = None
- if self.gc_debug:
- self.trace("Creating %r" % self)
-
- def trace(self, msg):
- """
- Debug logging.
- """
-
- if self.gc_debug:
- bt = traceback.extract_stack(limit = 3)
- logger.debug("%s from %s:%d", msg, bt[0][0], bt[0][1])
-
- def set(self, when):
- """
- Set a timer. Argument can be a datetime, to specify an absolute
- time, or a timedelta, to specify an offset time.
- """
-
- if self.gc_debug:
- self.trace("Setting %r to %r" % (self, when))
- if isinstance(when, rpki.sundial.timedelta):
- self.when = rpki.sundial.now() + when
- else:
- self.when = when
- assert isinstance(self.when, rpki.sundial.datetime), "%r: Expecting a datetime, got %r" % (self, self.when)
- if self not in timer_queue:
- timer_queue.append(self)
- timer_queue.sort(key = lambda x: x.when)
-
- def __cmp__(self, other):
- return cmp(id(self), id(other))
-
- if gc_debug:
- def __del__(self):
- logger.debug("Deleting %r", self)
-
- def cancel(self):
- """
- Cancel a timer, if it was set.
- """
-
- if self.gc_debug:
- self.trace("Canceling %r" % self)
- try:
- while True:
- timer_queue.remove(self)
- except ValueError:
- pass
-
- def is_set(self):
- """
- Test whether this timer is currently set.
- """
-
- return self in timer_queue
-
- def set_handler(self, handler):
- """
- Set timer's expiration handler. This is an alternative to
- subclassing the timer class, and may be easier to use when
- integrating timers into other classes (eg, the handler can be a
- bound method to an object in a class representing a network
- connection).
- """
-
- self.handler = handler
-
- def set_errback(self, errback):
- """
- Set a timer's errback. Like set_handler(), for errbacks.
- """
-
- self.errback = errback
-
- @classmethod
- def runq(cls):
- """
- Run the timer queue: for each timer whose call time has passed,
- pull the timer off the queue and call its handler() method.
-
- Comparisions are made against time at which this function was
- called, so that even if new events keep getting scheduled, we'll
- return to the I/O loop reasonably quickly.
- """
-
- now = rpki.sundial.now()
- while timer_queue and now >= timer_queue[0].when:
- t = timer_queue.pop(0)
- if cls.run_debug:
- logger.debug("Running %r", t)
- try:
- if t.handler is not None:
- t.handler()
- else:
- logger.warning("Timer %r expired with no handler set", t)
- except (ExitNow, SystemExit):
- raise
- except Exception, e:
- if t.errback is not None:
- t.errback(e)
- else:
- logger.exception("Unhandled exception from timer %r", t)
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.when, repr(self.handler))
-
- @classmethod
- def seconds_until_wakeup(cls):
- """
- Calculate delay until next timer expires, or None if no timers are
- set and we should wait indefinitely. Rounds up to avoid spinning
- in select() or poll(). We could calculate fractional seconds in
- the right units instead, but select() and poll() don't even take
- the same units (argh!), and we're not doing anything that
- hair-triggered, so rounding up is simplest.
- """
-
- if not timer_queue:
- return None
- now = rpki.sundial.now()
- if now >= timer_queue[0].when:
- return 0
- delay = timer_queue[0].when - now
- seconds = delay.convert_to_seconds()
- if delay.microseconds:
- seconds += 1
- return seconds
-
- @classmethod
- def clear(cls):
- """
- Cancel every timer on the queue. We could just throw away the
- queue content, but this way we can notify subclasses that provide
- their own cancel() method.
- """
-
- while timer_queue:
- timer_queue.pop(0).cancel()
-
-def _raiseExitNow(signum, frame):
- """
- Signal handler for event_loop().
- """
-
- raise ExitNow
-
-def exit_event_loop():
- """
- Force exit from event_loop().
- """
-
- raise ExitNow
-
-def event_defer(handler, delay = rpki.sundial.timedelta(seconds = 0)):
- """
- Use a near-term (default: zero interval) timer to schedule an event
- to run after letting the I/O system have a turn.
- """
-
- timer(handler).set(delay)
-
-## @var debug_event_timing
-# Enable insanely verbose logging of event timing
-
-debug_event_timing = False
-
-def event_loop(catch_signals = (signal.SIGINT, signal.SIGTERM)):
- """
- Replacement for asyncore.loop(), adding timer and signal support.
- """
-
- old_signal_handlers = {}
- while True:
- save_sigs = len(old_signal_handlers) == 0
- try:
- for sig in catch_signals:
- old = signal.signal(sig, _raiseExitNow)
- if save_sigs:
- old_signal_handlers[sig] = old
- while asyncore.socket_map or timer_queue:
- t = timer.seconds_until_wakeup()
- if debug_event_timing:
- logger.debug("Dismissing to asyncore.poll(), t = %s, q = %r", t, timer_queue)
- asyncore.poll(t, asyncore.socket_map)
- timer.runq()
- if timer.gc_debug:
- gc.collect()
- if gc.garbage:
- for i in gc.garbage:
- logger.debug("GC-cycle %r", i)
- del gc.garbage[:]
- except ExitNow:
- break
- except SystemExit:
- raise
- except ValueError, e:
- if str(e) == "filedescriptor out of range in select()":
- logger.error("Something is badly wrong, select() thinks we gave it a bad file descriptor.")
- logger.error("Content of asyncore.socket_map:")
- for fd in sorted(asyncore.socket_map.iterkeys()):
- logger.error(" fd %s obj %r", fd, asyncore.socket_map[fd])
- logger.error("Not safe to continue due to risk of spin loop on select(). Exiting.")
- sys.exit(1)
- logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
- except Exception, e:
- logger.exception("event_loop() exited with exception %r, this is not supposed to happen, restarting")
- else:
- break
- finally:
- for sig in old_signal_handlers:
- signal.signal(sig, old_signal_handlers[sig])
-
-class gc_summary(object):
- """
- Periodic summary of GC state, for tracking down memory bloat.
- """
-
- def __init__(self, interval, threshold = 0):
- if isinstance(interval, (int, long)):
- interval = rpki.sundial.timedelta(seconds = interval)
- self.interval = interval
- self.threshold = threshold
- self.timer = timer(handler = self.handler)
- self.timer.set(self.interval)
-
- def handler(self):
- """
- Collect and log GC state for this period, reset timer.
- """
-
- logger.debug("gc_summary: Running gc.collect()")
- gc.collect()
- logger.debug("gc_summary: Summarizing (threshold %d)", self.threshold)
- total = {}
- tuples = {}
- for g in gc.get_objects():
- k = type(g).__name__
- total[k] = total.get(k, 0) + 1
- if isinstance(g, tuple):
- k = ", ".join(type(x).__name__ for x in g)
- tuples[k] = tuples.get(k, 0) + 1
- logger.debug("gc_summary: Sorting result")
- total = total.items()
- total.sort(reverse = True, key = lambda x: x[1])
- tuples = tuples.items()
- tuples.sort(reverse = True, key = lambda x: x[1])
- logger.debug("gc_summary: Object type counts in descending order")
- for name, count in total:
- if count > self.threshold:
- logger.debug("gc_summary: %8d %s", count, name)
- logger.debug("gc_summary: Tuple content type signature counts in descending order")
- for types, count in tuples:
- if count > self.threshold:
- logger.debug("gc_summary: %8d (%s)", count, types)
- logger.debug("gc_summary: Scheduling next cycle")
- self.timer.set(self.interval)
diff --git a/rpki/cli.py b/rpki/cli.py
index e75b8430..35999cb0 100644
--- a/rpki/cli.py
+++ b/rpki/cli.py
@@ -77,7 +77,7 @@ class Cmd(cmd.Cmd):
return False
except BadCommandSyntax, e:
print e
- except Exception:
+ except:
traceback.print_exc()
self.last_command_failed = True
return False
diff --git a/rpki/config.py b/rpki/config.py
index 3234294c..99041259 100644
--- a/rpki/config.py
+++ b/rpki/config.py
@@ -205,9 +205,7 @@ class parser(object):
"""
# pylint: disable=W0621
- import rpki.http
import rpki.x509
- import rpki.async
import rpki.log
import rpki.daemonize
@@ -219,46 +217,11 @@ class parser(object):
logger.warning("Could not process configure_logger line %r: %s", line, e)
try:
- rpki.http.want_persistent_client = self.getboolean("want_persistent_client")
- except ConfigParser.NoOptionError:
- pass
-
- try:
- rpki.http.want_persistent_server = self.getboolean("want_persistent_server")
- except ConfigParser.NoOptionError:
- pass
-
- try:
- rpki.http.use_adns = self.getboolean("use_adns")
- except ConfigParser.NoOptionError:
- pass
-
- try:
- rpki.http.enable_ipv6_clients = self.getboolean("enable_ipv6_clients")
- except ConfigParser.NoOptionError:
- pass
-
- try:
- rpki.http.enable_ipv6_servers = self.getboolean("enable_ipv6_servers")
- except ConfigParser.NoOptionError:
- pass
-
- try:
rpki.x509.CMS_object.debug_cms_certs = self.getboolean("debug_cms_certs")
except ConfigParser.NoOptionError:
pass
try:
- rpki.async.timer.gc_debug = self.getboolean("gc_debug")
- except ConfigParser.NoOptionError:
- pass
-
- try:
- rpki.async.timer.run_debug = self.getboolean("timer_debug")
- except ConfigParser.NoOptionError:
- pass
-
- try:
rpki.x509.XML_CMS_object.dump_outbound_cms = rpki.x509.DeadDrop(self.get("dump_outbound_cms"))
except OSError, e:
logger.warning("Couldn't initialize mailbox %s: %s", self.get("dump_outbound_cms"), e)
@@ -283,11 +246,6 @@ class parser(object):
pass
try:
- rpki.async.gc_summary(self.getint("gc_summary"), self.getint("gc_summary_threshold", 0))
- except ConfigParser.NoOptionError:
- pass
-
- try:
rpki.log.enable_tracebacks = self.getboolean("enable_tracebacks")
except ConfigParser.NoOptionError:
pass
diff --git a/rpki/exceptions.py b/rpki/exceptions.py
index b6889b0d..f456dfc5 100644
--- a/rpki/exceptions.py
+++ b/rpki/exceptions.py
@@ -238,3 +238,6 @@ class WrongEKU(RPKI_Exception):
class UnexpectedUpDownResponse(RPKI_Exception):
"Up-down message is not of the expected type."
+
+class BadContentType(RPKI_Exception):
+ "Bad HTTP Content-Type."
diff --git a/rpki/fields.py b/rpki/fields.py
index 6c71ac35..9fc57e51 100644
--- a/rpki/fields.py
+++ b/rpki/fields.py
@@ -173,7 +173,7 @@ class RSAPrivateKeyField(DERField):
description = "RSA keypair"
rpki_type = rpki.x509.RSA
-KeyField = RSAPrivateKeyField # XXX backwards compatability
+KeyField = RSAPrivateKeyField
class PublicKeyField(DERField):
description = "RSA keypair"
diff --git a/rpki/http.py b/rpki/http.py
deleted file mode 100644
index b991eeb0..00000000
--- a/rpki/http.py
+++ /dev/null
@@ -1,1061 +0,0 @@
-# $Id$
-#
-# Copyright (C) 2013--2014 Dragon Research Labs ("DRL")
-# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
-# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notices and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
-# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL,
-# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
-# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
-# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-"""
-HTTP utilities, both client and server.
-"""
-
-import time
-import socket
-import asyncore
-import asynchat
-import urlparse
-import sys
-import random
-import logging
-import rpki.async
-import rpki.sundial
-import rpki.x509
-import rpki.exceptions
-import rpki.log
-import rpki.POW
-
-logger = logging.getLogger(__name__)
-
-## @var default_content_type
-# HTTP content type used for RPKI messages.
-# Can be overriden on a per-client or per-server basis.
-default_content_type = "application/x-rpki"
-
-## @var want_persistent_client
-# Whether we want persistent HTTP client streams, when server also supports them.
-want_persistent_client = False
-
-## @var want_persistent_server
-# Whether we want persistent HTTP server streams, when client also supports them.
-want_persistent_server = False
-
-## @var default_client_timeout
-# Default HTTP client connection timeout.
-default_client_timeout = rpki.sundial.timedelta(minutes = 5)
-
-## @var default_server_timeout
-# Default HTTP server connection timeouts. Given our druthers, we'd
-# prefer that the client close the connection, as this avoids the
-# problem of client starting to reuse connection just as server closes
-# it, so this should be longer than the client timeout.
-default_server_timeout = rpki.sundial.timedelta(minutes = 10)
-
-## @var default_http_version
-# Preferred HTTP version.
-default_http_version = (1, 0)
-
-## @var default_tcp_port
-# Default port for clients and servers that don't specify one.
-default_tcp_port = 80
-
-## @var enable_ipv6_servers
-# Whether to enable IPv6 listeners. Enabled by default, as it should
-# be harmless. Has no effect if kernel doesn't support IPv6.
-enable_ipv6_servers = True
-
-## @var enable_ipv6_clients
-# Whether to consider IPv6 addresses when making connections.
-# Disabled by default, as IPv6 connectivity is still a bad joke in
-# far too much of the world.
-enable_ipv6_clients = False
-
-## @var have_ipv6
-# Whether the current machine claims to support IPv6. Note that just
-# because the kernel supports it doesn't mean that the machine has
-# usable IPv6 connectivity. I don't know of a simple portable way to
-# probe for connectivity at runtime (the old test of "can you ping
-# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set
-# automatically by probing using the socket() system call at runtime.
-try:
- # pylint: disable=W0702,W0104
- socket.socket(socket.AF_INET6).close()
- socket.IPPROTO_IPV6
- socket.IPV6_V6ONLY
-except:
- have_ipv6 = False
-else:
- have_ipv6 = True
-
-## @var use_adns
-
-# Whether to use rpki.adns code. This is still experimental, so it's
-# not (yet) enabled by default.
-use_adns = False
-try:
- import rpki.adns
-except ImportError:
- pass
-
-def supported_address_families(enable_ipv6):
- """
- IP address families on which servers should listen, and to consider
- when selecting addresses for client connections.
- """
-
- if enable_ipv6 and have_ipv6:
- return (socket.AF_INET, socket.AF_INET6)
- else:
- return (socket.AF_INET,)
-
-def localhost_addrinfo():
- """
- Return pseudo-getaddrinfo results for localhost.
- """
-
- result = [(socket.AF_INET, "127.0.0.1")]
- if enable_ipv6_clients and have_ipv6:
- result.append((socket.AF_INET6, "::1"))
- return result
-
-class http_message(object):
- """
- Virtual class representing of one HTTP message.
- """
-
- software_name = "ISC RPKI library"
-
- def __init__(self, version = None, body = None, headers = None):
- self.version = version
- self.body = body
- self.headers = headers
- self.normalize_headers()
-
- def normalize_headers(self, headers = None):
- """
- Clean up (some of) the horrible messes that HTTP allows in its
- headers.
- """
-
- if headers is None:
- headers = () if self.headers is None else self.headers.items()
- translate_underscore = True
- else:
- translate_underscore = False
- result = {}
- for k, v in headers:
- if translate_underscore:
- k = k.replace("_", "-")
- k = "-".join(s.capitalize() for s in k.split("-"))
- v = v.strip()
- if k in result:
- result[k] += ", " + v
- else:
- result[k] = v
- self.headers = result
-
- @classmethod
- def parse_from_wire(cls, headers):
- """
- Parse and normalize an incoming HTTP message.
- """
-
- self = cls()
- headers = headers.split("\r\n")
- self.parse_first_line(*headers.pop(0).split(None, 2))
- for i in xrange(len(headers) - 2, -1, -1):
- if headers[i + 1][0].isspace():
- headers[i] += headers[i + 1]
- del headers[i + 1]
- self.normalize_headers([h.split(":", 1) for h in headers])
- return self
-
- def format(self):
- """
- Format an outgoing HTTP message.
- """
-
- s = str(self.format_first_line())
- assert isinstance(s, str)
- if self.body is not None:
- assert isinstance(self.body, str)
- self.headers["Content-Length"] = len(self.body)
- for kv in self.headers.iteritems():
- h = str("%s: %s\r\n" % kv)
- assert isinstance(h, str)
- s += h
- s += "\r\n"
- assert isinstance(s, str)
- if self.body is not None:
- s += self.body
- assert isinstance(s, str)
- return s
-
- def __str__(self):
- return self.format()
-
- def parse_version(self, version):
- """
- Parse HTTP version, raise an exception if we can't.
- """
-
- if version[:5] != "HTTP/":
- raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version)
- self.version = tuple(int(i) for i in version[5:].split("."))
-
- @property
- def persistent(self):
- """
- Figure out whether this HTTP message encourages a persistent connection.
- """
-
- c = self.headers.get("Connection")
- if self.version == (1, 1):
- return c is None or "close" not in c.lower()
- elif self.version == (1, 0):
- return c is not None and "keep-alive" in c.lower()
- else:
- return False
-
-class http_request(http_message):
- """
- HTTP request message.
- """
-
- def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
- assert cmd == "POST" or body is None
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.cmd = cmd
- self.path = path
- self.callback = callback
- self.errback = errback
- self.retried = False
-
- def parse_first_line(self, cmd, path, version):
- """
- Parse first line of HTTP request message.
- """
-
- self.parse_version(version)
- self.cmd = cmd
- self.path = path
-
- def format_first_line(self):
- """
- Format first line of HTTP request message, and set up the
- User-Agent header.
- """
-
- self.headers.setdefault("User-Agent", self.software_name)
- return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.cmd, self.path)
-
-class http_response(http_message):
- """
- HTTP response message.
- """
-
- def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.code = code
- self.reason = reason
-
- def parse_first_line(self, version, code, reason):
- """
- Parse first line of HTTP response message.
- """
-
- self.parse_version(version)
- self.code = int(code)
- self.reason = reason
-
- def format_first_line(self):
- """
- Format first line of HTTP response message, and set up Date and
- Server headers.
- """
-
- self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
- self.headers.setdefault("Server", self.software_name)
- return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.code, self.reason)
-
-def addr_to_string(addr):
- """
- Convert socket addr tuple to printable string. Assumes 2-element
- tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
- anything else.
- """
-
- if len(addr) == 2:
- return "%s:%d" % (addr[0], addr[1])
- if len(addr) == 4:
- return "%s.%d" % (addr[0], addr[1])
- raise TypeError
-
-@rpki.log.class_logger(logger)
-class http_stream(asynchat.async_chat):
- """
- Virtual class representing an HTTP message stream.
- """
-
- # Keep pylint happy; @class_logger overwrites this.
- logger = None
-
- def __repr__(self):
- status = ["connected"] if self.connected else []
- try:
- status.append(addr_to_string(self.addr))
- except TypeError:
- pass
- return rpki.log.log_repr(self, *status)
-
- def __init__(self, sock = None):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- asynchat.async_chat.__init__(self, sock)
- self.buffer = []
- self.timer = rpki.async.timer(self.handle_timeout)
- self.restart()
-
- def restart(self):
- """
- (Re)start HTTP message parser, reset timer.
- """
-
- assert not self.buffer
- self.chunk_handler = None
- self.set_terminator("\r\n\r\n")
- self.update_timeout()
-
- def update_timeout(self):
- """
- Put this stream's timer in known good state: set it to the
- stream's timeout value if we're doing timeouts, otherwise clear
- it.
- """
-
- if self.timeout is not None:
- self.logger.debug("Setting timeout %s", self.timeout)
- self.timer.set(self.timeout)
- else:
- self.logger.debug("Clearing timeout")
- self.timer.cancel()
-
- def collect_incoming_data(self, data):
- """
- Buffer incoming data from asynchat.
- """
-
- self.buffer.append(data)
- self.update_timeout()
-
- def get_buffer(self):
- """
- Consume data buffered from asynchat.
- """
-
- val = "".join(self.buffer)
- self.buffer = []
- return val
-
- def found_terminator(self):
- """
- Asynchat reported that it found whatever terminator we set, so
- figure out what to do next. This can be messy, because we can be
- in any of several different states:
-
- @li We might be handling chunked HTTP, in which case we have to
- initialize the chunk decoder;
-
- @li We might have found the end of the message body, in which case
- we can (finally) process it; or
-
- @li We might have just gotten to the end of the message headers,
- in which case we have to parse them to figure out which of three
- separate mechanisms (chunked, content-length, TCP close) is going
- to tell us how to find the end of the message body.
- """
-
- self.update_timeout()
- if self.chunk_handler:
- self.chunk_handler()
- elif not isinstance(self.get_terminator(), str):
- self.handle_body()
- else:
- self.msg = self.parse_type.parse_from_wire(self.get_buffer())
- if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
- self.msg.body = []
- self.chunk_handler = self.chunk_header
- self.set_terminator("\r\n")
- elif "Content-Length" in self.msg.headers:
- self.set_terminator(int(self.msg.headers["Content-Length"]))
- else:
- self.handle_no_content_length()
-
- def chunk_header(self):
- """
- Asynchat just handed us what should be the header of one chunk of
- a chunked encoding stream. If this chunk has a body, set the
- stream up to read it; otherwise, this is the last chunk, so start
- the process of exiting the chunk decoder.
- """
-
- n = int(self.get_buffer().partition(";")[0], 16)
- self.logger.debug("Chunk length %s", n)
- if n:
- self.chunk_handler = self.chunk_body
- self.set_terminator(n)
- else:
- self.msg.body = "".join(self.msg.body)
- self.chunk_handler = self.chunk_discard_trailer
-
- def chunk_body(self):
- """
- Asynchat just handed us what should be the body of a chunk of the
- body of a chunked message (sic). Save it, and prepare to move on
- to the next chunk.
- """
-
- self.logger.debug("Chunk body")
- self.msg.body += self.buffer
- self.buffer = []
- self.chunk_handler = self.chunk_discard_crlf
- self.set_terminator("\r\n")
-
- def chunk_discard_crlf(self):
- """
- Consume the CRLF that terminates a chunk, reinitialize chunk
- decoder to be ready for the next chunk.
- """
-
- self.logger.debug("Chunk CRLF")
- s = self.get_buffer()
- assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
- self.chunk_handler = self.chunk_header
-
- def chunk_discard_trailer(self):
- """
- Consume chunk trailer, which should be empty, then (finally!) exit
- the chunk decoder and hand complete message off to the application.
- """
-
- self.logger.debug("Chunk trailer")
- s = self.get_buffer()
- assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
- self.chunk_handler = None
- self.handle_message()
-
- def handle_body(self):
- """
- Hand normal (not chunked) message off to the application.
- """
-
- self.msg.body = self.get_buffer()
- self.handle_message()
-
- def handle_error(self):
- """
- Asynchat (or asyncore, or somebody) raised an exception. See
- whether it's one we should just pass along, otherwise log a stack
- trace and close the stream.
- """
-
- self.timer.cancel()
- etype = sys.exc_info()[0]
- if etype in (SystemExit, rpki.async.ExitNow):
- raise
- if etype is not rpki.exceptions.HTTPClientAborted:
- self.logger.exception("Closing due to error")
- self.close()
-
- def handle_timeout(self):
- """
- Inactivity timer expired, close connection with prejudice.
- """
-
- self.logger.debug("Timeout, closing")
- self.close()
-
- def handle_close(self):
- """
- Wrapper around asynchat connection close handler, so that we can
- log the event, cancel timer, and so forth.
- """
-
- self.logger.debug("Close event in HTTP stream handler")
- self.timer.cancel()
- asynchat.async_chat.handle_close(self)
-
-@rpki.log.class_logger(logger)
-class http_server(http_stream):
- """
- HTTP server stream.
- """
-
- ## @var parse_type
- # Stream parser should look for incoming HTTP request messages.
- parse_type = http_request
-
- ## @var timeout
- # Use the default server timeout value set in the module header.
- timeout = default_server_timeout
-
- def __init__(self, sock, handlers):
- self.handlers = handlers
- self.received_content_type = None
- http_stream.__init__(self, sock = sock)
- self.expect_close = not want_persistent_server
- self.logger.debug("Starting")
-
- def handle_no_content_length(self):
- """
- Handle an incoming message that used neither chunking nor a
- Content-Length header (that is: this message will be the last one
- in this server stream). No special action required.
- """
-
- self.handle_message()
-
- def find_handler(self, path):
- """
- Helper method to search self.handlers.
- """
-
- for h in self.handlers:
- if path.startswith(h[0]):
- return h[1], h[2] if len(h) > 2 else (default_content_type,)
- return None, None
-
- def handle_message(self):
- """
- HTTP layer managed to deliver a complete HTTP request to
- us, figure out what to do with it. Check the command and
- Content-Type, look for a handler, and if everything looks right,
- pass the message body, path, and a reply callback to the handler.
- """
-
- self.logger.debug("Received request %r", self.msg)
- if not self.msg.persistent:
- self.expect_close = True
- handler, allowed_content_types = self.find_handler(self.msg.path)
- self.received_content_type = self.msg.headers["Content-Type"]
- error = None
- if self.msg.cmd != "POST":
- error = 501, "No handler for method %s" % self.msg.cmd
- elif self.received_content_type not in allowed_content_types:
- error = 415, "No handler for Content-Type %s" % self.received_content_type
- elif handler is None:
- error = 404, "No handler for URL %s" % self.msg.path
- if error is None:
- try:
- handler(self.msg.body, self.msg.path, self.send_reply)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- self.logger.exception("Unhandled exception while handling HTTP request")
- self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
- else:
- self.send_error(code = error[0], reason = error[1])
-
- def send_error(self, code, reason):
- """
- Send an error response to this request.
- """
-
- self.send_message(code = code, reason = reason)
-
- def send_reply(self, code, body = None, reason = "OK"):
- """
- Send a reply to this request.
- """
-
- self.send_message(code = code, body = body, reason = reason)
-
- def send_message(self, code, reason = "OK", body = None):
- """
- Queue up reply message. If both parties agree that connection is
- persistant, and if no error occurred, restart this stream to
- listen for next message; otherwise, queue up a close event for
- this stream so it will shut down once the reply has been sent.
- """
-
- self.logger.debug("Sending response %s %s", code, reason)
- if code >= 400:
- self.expect_close = True
- msg = http_response(code = code, reason = reason, body = body,
- Content_Type = self.received_content_type,
- Connection = "Close" if self.expect_close else "Keep-Alive")
- self.push(msg.format())
- if self.expect_close:
- self.logger.debug("Closing")
- self.timer.cancel()
- self.close_when_done()
- else:
- self.logger.debug("Listening for next message")
- self.restart()
-
-@rpki.log.class_logger(logger)
-class http_listener(asyncore.dispatcher):
- """
- Listener for incoming HTTP connections.
- """
-
- def __repr__(self):
- try:
- status = (addr_to_string(self.addr),)
- except TypeError:
- status = ()
- return rpki.log.log_repr(self, *status)
-
- def __init__(self, handlers, addrinfo):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- asyncore.dispatcher.__init__(self)
- self.handlers = handlers
- try:
- af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
- self.create_socket(af, socktype)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except AttributeError:
- pass
- if have_ipv6 and af == socket.AF_INET6:
- self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
- self.bind(sockaddr)
- self.listen(5)
- except Exception:
- self.logger.exception("Couldn't set up HTTP listener")
- self.close()
- for h in handlers:
- self.logger.debug("Handling %s", h[0])
-
- def handle_accept(self):
- """
- Asyncore says we have an incoming connection, spawn an http_server
- stream for it and pass along all of our handler data.
- """
-
- try:
- res = self.accept()
- if res is None:
- raise
- sock, addr = res # pylint: disable=W0633
- self.logger.debug("Accepting connection from %s", addr_to_string(addr))
- http_server(sock = sock, handlers = self.handlers)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.logger.exception("Unable to accept connection")
-
- def handle_error(self):
- """
- Asyncore signaled an error, pass it along or log it.
- """
-
- if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
- raise
- self.logger.exception("Error in HTTP listener")
-
-@rpki.log.class_logger(logger)
-class http_client(http_stream):
- """
- HTTP client stream.
- """
-
- ## @var parse_type
- # Stream parser should look for incoming HTTP response messages.
- parse_type = http_response
-
- ## @var timeout
- # Use the default client timeout value set in the module header.
- timeout = default_client_timeout
-
- ## @var state
- # Application layer connection state.
- state = None
-
- def __init__(self, queue, hostport):
- http_stream.__init__(self)
- self.logger.debug("Creating new connection to %s", addr_to_string(hostport))
- self.queue = queue
- self.host = hostport[0]
- self.port = hostport[1]
- self.set_state("opening")
- self.expect_close = not want_persistent_client
-
- def start(self):
- """
- Create socket and request a connection.
- """
-
- if not use_adns:
- self.logger.debug("Not using ADNS")
- self.gotaddrinfo([(socket.AF_INET, self.host)])
- elif self.host == "localhost":
- self.logger.debug("Bypassing DNS for localhost")
- self.gotaddrinfo(localhost_addrinfo())
- else:
- families = supported_address_families(enable_ipv6_clients)
- self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families)
- rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)
-
- def dns_error(self, e):
- """
- Handle DNS lookup errors. For now, just whack the connection.
- Undoubtedly we should do something better with diagnostics here.
- """
-
- self.handle_error()
-
- def gotaddrinfo(self, addrinfo):
- """
- Got address data from DNS, create socket and request connection.
- """
-
- try:
- self.af, self.address = random.choice(addrinfo)
- self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address)
- self.create_socket(self.af, socket.SOCK_STREAM)
- self.connect((self.address, self.port))
- if self.addr is None:
- self.addr = (self.host, self.port)
- self.update_timeout()
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.handle_error()
-
- def handle_connect(self):
- """
- Asyncore says socket has connected.
- """
-
- self.logger.debug("Socket connected")
- self.set_state("idle")
- assert self.queue.client is self
- self.queue.send_request()
-
- def set_state(self, state):
- """
- Set HTTP client connection state.
- """
-
- self.logger.debug("State transition %s => %s", self.state, state)
- self.state = state
-
- def handle_no_content_length(self):
- """
- Handle response message that used neither chunking nor a
- Content-Length header (that is: this message will be the last one
- in this server stream). In this case we want to read until we
- reach the end of the data stream.
- """
-
- self.set_terminator(None)
-
- def send_request(self, msg):
- """
- Queue up request message and kickstart connection.
- """
-
- self.logger.debug("Sending request %r", msg)
- assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
- self.set_state("request-sent")
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- self.restart()
-
- def handle_message(self):
- """
- Handle incoming HTTP response message. Make sure we're in a state
- where we expect to see such a message (and allow the mysterious
- empty messages that Apache sends during connection close, no idea
- what that is supposed to be about). If everybody agrees that the
- connection should stay open, put it into an idle state; otherwise,
- arrange for the stream to shut down.
- """
-
- self.logger.debug("Message received, state %s", self.state)
-
- if not self.msg.persistent:
- self.expect_close = True
-
- if self.state != "request-sent":
- if self.state == "closing":
- assert not self.msg.body
- self.logger.debug("Ignoring empty response received while closing")
- return
- raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state))
-
- if self.expect_close:
- self.logger.debug("Closing")
- self.set_state("closing")
- self.close_when_done()
- else:
- self.logger.debug("Idling")
- self.set_state("idle")
- self.update_timeout()
-
- if self.msg.code != 200:
- errmsg = "HTTP request failed"
- if self.msg.code is not None:
- errmsg += " with status %s" % self.msg.code
- if self.msg.reason:
- errmsg += ", reason %s" % self.msg.reason
- if self.msg.body:
- errmsg += ", response %s" % self.msg.body
- raise rpki.exceptions.HTTPRequestFailed(errmsg)
- self.queue.return_result(self, self.msg, detach = self.expect_close)
-
- def handle_close(self):
- """
- Asyncore signaled connection close. If we were waiting for that
- to find the end of a response message, process the resulting
- message now; if we were waiting for the response to a request we
- sent, signal the error.
- """
-
- http_stream.handle_close(self)
- self.logger.debug("State %s", self.state)
- if self.get_terminator() is None:
- self.handle_body()
- elif self.state == "request-sent":
- raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event")
- else:
- self.queue.detach(self)
-
- def handle_timeout(self):
- """
- Connection idle timer has expired. Shut down connection in any
- case, noisily if we weren't idle.
- """
-
- bad = self.state not in ("idle", "closing")
- if bad:
- self.logger.warning("Timeout while in state %s", self.state)
- http_stream.handle_timeout(self)
- if bad:
- try:
- raise rpki.exceptions.HTTPTimeout
- except: # pylint: disable=W0702
- self.handle_error()
- else:
- self.queue.detach(self)
-
- def handle_error(self):
- """
- Asyncore says something threw an exception. Log it, then shut
- down the connection and pass back the exception.
- """
-
- eclass, edata = sys.exc_info()[0:2]
- self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata)
- http_stream.handle_error(self)
- self.queue.return_result(self, edata, detach = True)
-
-@rpki.log.class_logger(logger)
-class http_queue(object):
- """
- Queue of pending HTTP requests for a single destination. This class
- is very tightly coupled to http_client; http_client handles the HTTP
- stream itself, this class provides a slightly higher-level API.
- """
-
- def __repr__(self):
- return rpki.log.log_repr(self, addr_to_string(self.hostport))
-
- def __init__(self, hostport):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- self.hostport = hostport
- self.client = None
- self.logger.debug("Created")
- self.queue = []
-
- def request(self, *requests):
- """
- Append http_request object(s) to this queue.
- """
-
- self.logger.debug("Adding requests %r", requests)
- self.queue.extend(requests)
-
- def restart(self):
- """
- Send next request for this queue, if we can. This may involve
- starting a new http_client stream, reusing an existing idle
- stream, or just ignoring this request if there's an active client
- stream already; in the last case, handling of the response (or
- exception, or timeout) for the query currently in progress will
- call this method when it's time to kick out the next query.
- """
-
- try:
- if self.client is None:
- self.client = http_client(self, self.hostport)
- self.logger.debug("Attached client %r", self.client)
- self.client.start()
- elif self.client.state == "idle":
- self.logger.debug("Sending request to existing client %r", self.client)
- self.send_request()
- else:
- self.logger.debug("Client %r exists in state %r", self.client, self.client.state)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- self.return_result(self.client, e, detach = True)
-
- def send_request(self):
- """
- Kick out the next query in this queue, if any.
- """
-
- if self.queue:
- self.client.send_request(self.queue[0])
-
- def detach(self, client_):
- """
- Detatch a client from this queue. Silently ignores attempting to
- detach a client that is not attached to this queue, to simplify
- handling of what otherwise would be a nasty set of race
- conditions.
- """
-
- if client_ is self.client:
- self.logger.debug("Detaching client %r", client_)
- self.client = None
-
- def return_result(self, client, result, detach = False): # pylint: disable=W0621
- """
- Client stream has returned a result, which we need to pass along
- to the original caller. Result may be either an HTTP response
- message or an exception. In either case, once we're done
- processing this result, kick off next message in the queue, if any.
- """
-
- if client is not self.client:
- self.logger.warning("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
- return
-
- if detach:
- self.detach(client)
-
- try:
- req = self.queue.pop(0)
- self.logger.debug("Dequeuing request %r", req)
- except IndexError:
- self.logger.warning("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
- return
-
- assert isinstance(result, http_response) or isinstance(result, Exception)
-
- if isinstance(result, http_response):
- try:
- self.logger.debug("Returning result %r to caller", result)
- req.callback(result.body)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- result = e
-
- if isinstance(result, Exception):
- try:
- self.logger.warning("Returning exception %r to caller: %s", result, result)
- req.errback(result)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.logger.exception("Exception in exception callback, may have lost event chain")
-
- self.logger.debug("Queue: %r", self.queue)
-
- if self.queue:
- self.restart()
-
-## @var client_queues
-# Map of (host, port) tuples to http_queue objects.
-client_queues = {}
-
-def client(msg, url, callback, errback, content_type = default_content_type):
- """
- Open client HTTP connection, send a message, set up callbacks to
- handle response.
- """
-
- u = urlparse.urlparse(url)
-
- if (u.scheme not in ("", "http") or
- u.username is not None or
- u.password is not None or
- u.params != "" or
- u.query != "" or
- u.fragment != ""):
- raise rpki.exceptions.BadClientURL("Unusable URL %s" % url)
-
- logger.debug("Contacting %s", url)
-
- request = http_request(
- cmd = "POST",
- path = u.path,
- body = msg,
- callback = callback,
- errback = errback,
- Host = u.hostname,
- Content_Type = content_type)
-
- hostport = (u.hostname or "localhost", u.port or default_tcp_port)
-
- logger.debug("Created request %r for %s", request, addr_to_string(hostport))
- if hostport not in client_queues:
- client_queues[hostport] = http_queue(hostport)
- client_queues[hostport].request(request)
-
- # Defer connection attempt until after we've had time to process any
- # pending I/O events, in case connections have closed.
-
- logger.debug("Scheduling connection startup for %r", request)
- rpki.async.event_defer(client_queues[hostport].restart)
-
-def server(handlers, port, host = ""):
- """
- Run an HTTP server and wait (forever) for connections.
- """
-
- if not isinstance(handlers, (tuple, list)):
- handlers = (("/", handlers),)
-
- # Yes, this is sick. So is getaddrinfo() returning duplicate
- # records, which RedHat has the gall to claim is a feature.
- ai = []
- for af in supported_address_families(enable_ipv6_servers):
- try:
- if host:
- h = host
- elif have_ipv6 and af == socket.AF_INET6:
- h = "::"
- else:
- h = "0.0.0.0"
- for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
- if a not in ai:
- ai.append(a)
- except socket.gaierror:
- pass
-
- for a in ai:
- http_listener(addrinfo = a, handlers = handlers)
-
- rpki.async.event_loop()
diff --git a/rpki/left_right.py b/rpki/left_right.py
index 090de561..387e908f 100644
--- a/rpki/left_right.py
+++ b/rpki/left_right.py
@@ -25,19 +25,16 @@ import logging
import rpki.x509
import rpki.exceptions
-import rpki.http
import rpki.up_down
import rpki.relaxng
import rpki.sundial
import rpki.log
import rpki.publication
-import rpki.async
import rpki.rpkid_tasks
logger = logging.getLogger(__name__)
-
xmlns = rpki.relaxng.left_right.xmlns
nsmap = rpki.relaxng.left_right.nsmap
version = rpki.relaxng.left_right.version
@@ -62,6 +59,16 @@ tag_tenant = xmlns + "tenant"
tag_signing_cert = xmlns + "signing_cert"
tag_signing_cert_crl = xmlns + "signing_cert_crl"
+## @var content_type
+# Content type to use when sending left-right queries
+content_type = "application/x-rpki"
+
+## @var allowed_content_types
+# Content types we consider acceptable for incoming left-right
+# queries.
+
+allowed_content_types = (content_type,)
+
class cms_msg(rpki.x509.XML_CMS_object):
"""
diff --git a/rpki/publication.py b/rpki/publication.py
index 7939b9de..16824d05 100644
--- a/rpki/publication.py
+++ b/rpki/publication.py
@@ -39,6 +39,17 @@ tag_withdraw = rpki.relaxng.publication.xmlns + "withdraw"
tag_report_error = rpki.relaxng.publication.xmlns + "report_error"
+## @var content_type
+# Content type to use when sending left-right queries
+content_type = "application/x-rpki"
+
+## @var allowed_content_types
+# Content types we consider acceptable for incoming left-right
+# queries.
+
+allowed_content_types = (content_type,)
+
+
def raise_if_error(pdu):
"""
Raise an appropriate error if this is a <report_error/> PDU.
diff --git a/rpki/rpkic.py b/rpki/rpkic.py
index f5e77396..4b9ffedb 100644
--- a/rpki/rpkic.py
+++ b/rpki/rpkic.py
@@ -43,7 +43,6 @@ import rpki.relaxng
import rpki.exceptions
import rpki.left_right
import rpki.x509
-import rpki.async
import rpki.version
from lxml.etree import SubElement
diff --git a/rpki/rpkid.py b/rpki/rpkid.py
index c6b1001e..896fe0be 100644
--- a/rpki/rpkid.py
+++ b/rpki/rpkid.py
@@ -28,20 +28,27 @@ import random
import logging
import argparse
+import tornado.gen
+import tornado.web
+import tornado.ioloop
+import tornado.httputil
+import tornado.httpclient
+import tornado.httpserver
+
+from lxml.etree import Element, SubElement, tostring as ElementToString
+
import rpki.resource_set
import rpki.up_down
import rpki.left_right
import rpki.x509
-import rpki.http
import rpki.config
import rpki.exceptions
import rpki.relaxng
import rpki.log
-import rpki.async
import rpki.daemonize
+
import rpki.rpkid_tasks
-from lxml.etree import Element, SubElement, tostring as ElementToString
logger = logging.getLogger(__name__)
@@ -105,12 +112,10 @@ class main(object):
logger.info("Running in profile mode with output to %s", self.profile)
logger.debug("Initializing Django")
-
import django
django.setup()
logger.debug("Initializing rpkidb...")
-
global rpki # pylint: disable=W0602
import rpki.rpkidb # pylint: disable=W0621
@@ -127,41 +132,46 @@ class main(object):
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
- self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/")
-
self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
self.cfg.getint("initial-delay-max", 120))
# Should be much longer in production
- self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120))
- self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0))
- if not self.cron_keepalive:
- self.cron_keepalive = self.cron_period * 4
- self.cron_timeout = None
+ self.cron_period = self.cfg.getint("cron-period", 120)
- self.start_cron()
+ if self.use_internal_cron:
+ logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay)
+ tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop)
- rpki.http.server(
- host = self.http_server_host,
- port = self.http_server_port,
- handlers = (("/left-right", self.left_right_handler),
- ("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types),
- ("/cronjob", self.cronjob_handler)))
+ rpkid = self
- def start_cron(self):
- """
- Start clock for rpkid's internal cron process.
- """
+ class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.left_right_handler(self)
+
+ class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self, tenant_handle, child_handle):
+ yield rpkid.up_down_handler(self, tenant_handle, child_handle)
+
+ class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223
+ @tornado.gen.coroutine
+ def post(self):
+ yield rpkid.cronjob_handler(self)
+
+ application = tornado.web.Application((
+ (r"/left-right", LeftRightHandler),
+ (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler),
+ (r"/cronjob", CronjobHandler)))
+
+ application.listen(
+ address = self.http_server_host,
+ port = self.http_server_port)
+
+ tornado.ioloop.IOLoop.current().start()
- if self.use_internal_cron:
- self.cron_timer = rpki.async.timer(handler = self.cron)
- when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay)
- logger.debug("Scheduling initial cron pass at %s", when)
- self.cron_timer.set(when)
- else:
- logger.debug("Not using internal clock, start_cron() call ignored")
@staticmethod
def _compose_left_right_query():
@@ -172,70 +182,86 @@ class main(object):
return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "query", version = rpki.left_right.version)
- def irdb_query(self, q_msg, callback, errback):
+
+ @tornado.gen.coroutine
+ def irdb_query(self, q_msg):
"""
Perform an IRDB callback query.
"""
- try:
- q_tags = set(q_pdu.tag for q_pdu in q_msg)
+ q_tags = set(q_pdu.tag for q_pdu in q_msg)
- q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
+ q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
- def unwrap(r_der):
- try:
- r_cms = rpki.left_right.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
- self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
- #rpki.left_right.check_response(r_msg)
- if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
- raise rpki.exceptions.BadIRDBReply(
- "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
- callback(r_msg)
- except Exception, e:
- errback(e)
+ http_client = tornado.httpclient.AsyncHTTPClient()
- rpki.http.client(
- url = self.irdb_url,
- msg = q_der,
- callback = unwrap,
- errback = errback)
+ http_request = tornado.httpclient.HTTPRequest(
+ url = self.irdb_url,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.left_right.content_type })
- except Exception, e:
- errback(e)
+ http_response = yield http_client.fetch(http_request)
+
+ # Tornado already checked http_response.code for us
+
+ content_type = http_response.headers.get("Content-Type")
+
+ if content_type not in rpki.left_right.allowed_content_types:
+ raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type))
+
+ r_der = http_response.body
+
+ r_cms = rpki.left_right.cms_msg(DER = r_der)
+ r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
+ self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
- def irdb_query_child_resources(self, tenant_handle, child_handle, callback, errback):
+ #rpki.left_right.check_response(r_msg)
+
+ if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
+ raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
+
+ raise tornado.gen.Return(r_msg)
+
+
+ @tornado.gen.coroutine
+ def irdb_query_child_resources(self, tenant_handle, child_handle):
"""
Ask IRDB about a child's resources.
"""
q_msg = self._compose_left_right_query()
- SubElement(q_msg, rpki.left_right.tag_list_resources,
- tenant_handle = tenant_handle, child_handle = child_handle)
+ SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle)
+
+ r_msg = yield self.irdb_query(q_msg)
- def done(r_msg):
- if len(r_msg) != 1:
- raise rpki.exceptions.BadIRDBReply(
- "Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
- callback(rpki.resource_set.resource_bag(
- asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
- v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
- v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
- valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))))
+ if len(r_msg) != 1:
+ raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
- self.irdb_query(q_msg, done, errback)
+ bag = rpki.resource_set.resource_bag(
+ asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
+ v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
+ v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
+ valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))
- def irdb_query_roa_requests(self, tenant_handle, callback, errback):
+ raise tornado.gen.Return(bag)
+
+
+ @tornado.gen.coroutine
+ def irdb_query_roa_requests(self, tenant_handle):
"""
Ask IRDB about self's ROA requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
+
- def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles, callback, errback):
+ @tornado.gen.coroutine
+ def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles):
"""
Ask IRDB about self's ghostbuster record requests.
"""
@@ -244,16 +270,20 @@ class main(object):
for parent_handle in parent_handles:
SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests,
tenant_handle = tenant_handle, parent_handle = parent_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
- def irdb_query_ee_certificate_requests(self, tenant_handle, callback, errback):
+ @tornado.gen.coroutine
+ def irdb_query_ee_certificate_requests(self, tenant_handle):
"""
Ask IRDB about self's EE certificate requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle)
- self.irdb_query(q_msg, callback, errback)
+ r_msg = yield self.irdb_query(q_msg)
+ raise tornado.gen.Return(r_msg)
+
@property
def left_right_models(self):
@@ -273,6 +303,7 @@ class main(object):
rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
return self._left_right_models
+
@property
def left_right_trivial_handlers(self):
"""
@@ -287,6 +318,7 @@ class main(object):
rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
return self._left_right_trivial_handlers
+
def handle_list_published_objects(self, q_pdu, r_msg):
"""
<list_published_objects/> server.
@@ -317,6 +349,7 @@ class main(object):
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = c.uri, **kw).text = c.cert.get_Base64()
+
def handle_list_received_resources(self, q_pdu, r_msg):
"""
<list_received_resources/> server.
@@ -344,28 +377,28 @@ class main(object):
r_pdu.set("tag", msg_tag)
- def left_right_handler(self, query, path, cb):
+ @tornado.gen.coroutine
+ def left_right_handler(self, handler):
"""
- Process one left-right PDU.
+ Process one left-right message.
"""
- # This handles five persistent classes (self, bsc, parent, child,
- # repository) and two simple queries (list_published_objects and
- # list_received_resources). The former probably need to dispatch
- # via methods to the corresponding model classes; the latter
- # probably just become calls to ordinary methods of this
- # (rpki.rpkid.main) class.
- #
- # Need to clone logic from rpki.pubd.main.control_handler().
-
logger.debug("Entering left_right_handler()")
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.left_right.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ raise tornado.gen.Return
+
+ handler.set_header("Content-Type", rpki.left_right.content_type)
+
try:
- q_cms = rpki.left_right.cms_msg(DER = query)
+ q_cms = rpki.left_right.cms_msg(DER = handler.request.body)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "reply", version = rpki.left_right.version)
- self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path)
+ self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path)
assert q_msg.tag.startswith(rpki.left_right.xmlns)
assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg)
@@ -376,14 +409,37 @@ class main(object):
if q_msg.get("type") != "query":
raise rpki.exceptions.BadQuery("Message type is not query")
- def done():
- cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ for q_pdu in q_msg:
- def loop(iterator, q_pdu):
+ try:
+ action = q_pdu.get("action")
+ model = self.left_right_models.get(q_pdu.tag)
+
+ if q_pdu.tag in self.left_right_trivial_handlers:
+ self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
+
+ elif action in ("get", "list"):
+ for obj in model.objects.xml_list(q_pdu):
+ obj.xml_template.encode(obj, q_pdu, r_msg)
+
+ elif action == "destroy":
+ obj = model.objects.xml_get_for_delete(q_pdu)
+ yield obj.xml_pre_delete_hook(self)
+ obj.delete()
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- logger.debug("left_right_handler():loop(%r)", q_pdu)
+ elif action in ("create", "set"):
+ obj = model.objects.xml_get_or_create(q_pdu)
+ obj.xml_template.decode(obj, q_pdu)
+ obj.xml_pre_save_hook(q_pdu)
+ obj.save()
+ yield obj.xml_post_save_hook(self, q_pdu)
+ obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- def fail(e):
+ else:
+ raise rpki.exceptions.BadQuery("Unrecognized action %r" % action)
+
+ except Exception, e:
if not isinstance(e, rpki.exceptions.NotFound):
logger.exception("Unhandled exception serving left-right PDU %r", q_pdu)
error_tenant_handle = q_pdu.get("tenant_handle")
@@ -394,102 +450,50 @@ class main(object):
r_pdu.set("tag", error_tag)
if error_tenant_handle is not None:
r_pdu.set("tenant_handle", error_tenant_handle)
- cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ break
- try:
- if q_pdu.tag in self.left_right_trivial_handlers:
- logger.debug("left_right_handler(): trivial handler")
- self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
- iterator()
-
- else:
- action = q_pdu.get("action")
- model = self.left_right_models[q_pdu.tag]
-
- logger.debug("left_right_handler(): action %s model %r", action, model)
-
- if action in ("get", "list"):
- logger.debug("left_right_handler(): get/list")
- for obj in model.objects.xml_list(q_pdu):
- logger.debug("left_right_handler(): get/list: encoding %r", obj)
- obj.xml_template.encode(obj, q_pdu, r_msg)
- iterator()
-
- elif action == "destroy":
- def destroy_cb():
- obj.delete()
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- iterator()
- logger.debug("left_right_handler(): destroy")
- obj = model.objects.xml_get_for_delete(q_pdu)
- obj.xml_pre_delete_hook(self, destroy_cb, fail)
-
- elif action in ("create", "set"):
- def create_set_cb():
- obj.xml_template.acknowledge(obj, q_pdu, r_msg)
- iterator()
- logger.debug("left_right_handler(): create/set")
- obj = model.objects.xml_get_or_create(q_pdu)
- obj.xml_template.decode(obj, q_pdu)
- obj.xml_pre_save_hook(q_pdu)
- obj.save()
- obj.xml_post_save_hook(self, q_pdu, create_set_cb, fail)
-
- else:
- raise rpki.exceptions.BadQuery
-
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- fail(e)
-
- rpki.async.iterator(q_msg, loop, done)
-
- except (rpki.async.ExitNow, SystemExit):
- raise
+ handler.set_status(200)
+ handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
+ logger.debug("Normal exit from left_right_handler()")
except Exception, e:
logger.exception("Unhandled exception serving left-right request")
- cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ handler.finish()
- up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I)
- def up_down_handler(self, q_der, path, cb):
+ @tornado.gen.coroutine
+ def up_down_handler(self, handler, tenant_handle, child_handle):
"""
Process one up-down PDU.
"""
- def done(r_der):
- cb(200, body = r_der)
+ logger.debug("Entering up_down_handler()")
+
+ content_type = handler.request.headers["Content-Type"]
+ if content_type not in rpki.up_down.allowed_content_types:
+ handler.set_status(415, "No handler for Content-Type %s" % content_type)
+ handler.finish()
+ raise tornado.gen.Return
try:
- match = self.up_down_url_regexp.search(path)
- if match is None:
- raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path)
- tenant_handle, child_handle = match.groups()
- try:
- child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
- except rpki.rpkidb.models.Child.DoesNotExist:
- raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % (
- child_handle, tenant_handle))
- child.serve_up_down(self, q_der, done)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e:
- logger.warning(str(e))
- cb(400, reason = str(e))
+ child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
+ q_der = handler.request.body
+ r_der = yield child.serve_up_down(self, q_der)
+ handler.set_header("Content-Type", rpki.up_down.content_type)
+ handler.set_status(200)
+ handler.finish(r_der)
+
+ except rpki.rpkidb.models.Child.DoesNotExist:
+ logger.info("Child %r of tenant %r not found", child_handle, tenant_handle)
+ handler.set_status(400, "Child %r not found" % child_handle)
+ handler.finish()
+
except Exception, e:
logger.exception("Unhandled exception processing up-down request")
- cb(400, reason = "Could not process PDU: %s" % e)
-
- def checkpoint(self, force = False):
- """
- Record that we were still alive when we got here, by resetting
- keepalive timer.
- """
+ handler.set_status(400, "Could not process PDU: %s" % e)
+ handler.finish()
- if force or self.cron_timeout is not None:
- self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
def task_add(self, task):
"""
@@ -504,11 +508,10 @@ class main(object):
logger.debug("Task %r was already in the task queue", task)
return False
+
def task_next(self):
"""
- Pull next task from the task queue and put it the deferred event
- queue (we don't want to run it directly, as that could eventually
- blow out our call stack).
+ Schedule next task in the queue to be run.
"""
try:
@@ -516,77 +519,67 @@ class main(object):
except IndexError:
self.task_current = None
else:
- rpki.async.event_defer(self.task_current)
+ tornado.ioloop.IOLoop.current().add_callback(self.task_current)
+
def task_run(self):
"""
- Run first task on the task queue, unless one is running already.
+ Schedule first queued task unless a task is running already.
"""
if self.task_current is None:
self.task_next()
- def cron(self, cb = None):
+
+ @tornado.gen.coroutine
+ def cron_loop(self):
"""
- Periodic tasks.
+ Asynchronous infinite loop to drive cron cycle.
"""
- now = rpki.sundial.now()
+ assert self.use_internal_cron
+ yield tornado.gen.sleep(self.initial_delay)
+ while True:
+ yield self.cron_run()
+ yield tornado.gen.sleep(self.cron_period)
- logger.debug("Starting cron run")
- def done():
- self.cron_timeout = None
- logger.info("Finished cron run started at %s", now)
- if cb is not None:
- cb()
+ @tornado.gen.coroutine
+ def cron_run(self):
+ """
+ Periodic tasks.
+ """
- completion = rpki.rpkid_tasks.CompletionHandler(done)
+ now = rpki.sundial.now()
+ logger.debug("Starting cron run")
+ futures = []
try:
- selves = rpki.rpkidb.models.Tenant.objects.all()
- except Exception:
- logger.exception("Error pulling selves from SQL, maybe SQL server is down?")
+ tenants = rpki.rpkidb.models.Tenant.objects.all()
+ except:
+ logger.exception("Error pulling tenants from SQL, maybe SQL server is down?")
else:
- for s in selves:
- s.schedule_cron_tasks(self, completion)
- nothing_queued = completion.count == 0
-
- assert self.use_internal_cron or self.cron_timeout is None
-
- if self.cron_timeout is not None and self.cron_timeout < now:
- logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout)
- self.cron_timeout = None
-
- if self.use_internal_cron:
- when = now + self.cron_period
- logger.debug("Scheduling next cron run at %s", when)
- self.cron_timer.set(when)
-
- if self.cron_timeout is None:
- self.checkpoint(self.use_internal_cron)
- self.task_run()
-
- elif self.use_internal_cron:
- logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout)
+ for tenant in tenants:
+ futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self))
+ if futures:
+ yield futures
+ logger.info("Finished cron run started at %s", now)
- if nothing_queued:
- done()
- def cronjob_handler(self, query, path, cb):
+ @tornado.gen.coroutine
+ def cronjob_handler(self, handler):
"""
External trigger for periodic tasks. This is somewhat obsolete
now that we have internal timers, but the test framework still
uses it.
"""
- def done():
- cb(200, body = "OK")
-
if self.use_internal_cron:
- cb(500, reason = "Running cron internally")
+ handler.set_status(500, "Running cron internally")
else:
logger.debug("Starting externally triggered cron")
- self.cron(done)
+ yield self.cron()
+ handler.set_status(200)
+ handler.finish()
class publication_queue(object):
@@ -661,19 +654,16 @@ class publication_queue(object):
if self.replace:
self.uris[uri] = pdu
- def call_pubd(self, cb, eb):
- def loop(iterator, rid):
+ @tornado.gen.coroutine
+ def call_pubd(self):
+ for rid in self.repositories:
logger.debug("Calling pubd[%r]", self.repositories[rid])
- self.repositories[rid].call_pubd(self.rpkid, iterator, eb, self.msgs[rid], self.handlers)
- def done():
- self.clear()
- cb()
- rpki.async.iterator(self.repositories, loop, done)
+ yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers)
+ self.clear()
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)
def empty(self):
- assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size)
return not self.msgs
diff --git a/rpki/rpkid_tasks.py b/rpki/rpkid_tasks.py
index 91fa787d..0a9c1654 100644
--- a/rpki/rpkid_tasks.py
+++ b/rpki/rpkid_tasks.py
@@ -22,9 +22,17 @@ because interactions with rpkid scheduler were getting too complicated.
"""
import logging
+
+import tornado.gen
+import tornado.web
+import tornado.locks
+import tornado.ioloop
+import tornado.httputil
+import tornado.httpclient
+import tornado.httpserver
+
import rpki.log
import rpki.rpkid
-import rpki.async
import rpki.up_down
import rpki.sundial
import rpki.publication
@@ -44,45 +52,6 @@ def queue_task(cls):
return cls
-class CompletionHandler(object):
- """
- Track one or more scheduled rpkid tasks and execute a callback when
- the last of them terminates.
- """
-
- ## @var debug
- # Debug logging.
-
- debug = False
-
- def __init__(self, cb):
- self.cb = cb
- self.tasks = set()
-
- def register(self, task):
- if self.debug:
- logger.debug("Completion handler %r registering task %r", self, task)
- self.tasks.add(task)
- task.register_completion(self.done)
-
- def done(self, task):
- try:
- self.tasks.remove(task)
- except KeyError:
- logger.warning("Completion handler %r called with unregistered task %r, blundering onwards", self, task)
- else:
- if self.debug:
- logger.debug("Completion handler %r called with registered task %r", self, task)
- if not self.tasks:
- if self.debug:
- logger.debug("Completion handler %r finished, calling %r", self, self.cb)
- self.cb()
-
- @property
- def count(self):
- return len(self.tasks)
-
-
class AbstractTask(object):
"""
Abstract base class for rpkid scheduler task objects. This just
@@ -95,53 +64,61 @@ class AbstractTask(object):
timeslice = rpki.sundial.timedelta(seconds = 15)
- def __init__(self, rpkid, s, description = None):
- self.rpkid = rpkid
- self.tenant = s
+ def __init__(self, rpkid, tenant, description = None):
+ self.rpkid = rpkid
+ self.tenant = tenant
self.description = description
- self.completions = []
- self.continuation = None
- self.due_date = None
+ self.resumed = tornado.locks.Condition()
+ self.completed = tornado.locks.Condition()
+ self.due_date = None
+ self.started = False
self.clear()
def __repr__(self):
return rpki.log.log_repr(self, self.description)
- def register_completion(self, completion):
- self.completions.append(completion)
-
def exit(self):
- while self.completions:
- self.completions.pop(0)(self)
- self.clear()
+ logger.debug("%r: Exiting", self)
self.due_date = None
+ self.started = False
+ self.clear()
+ self.completed.notify_all()
self.rpkid.task_next()
- def postpone(self, continuation):
- self.continuation = continuation
+ @tornado.gen.coroutine
+ def postpone(self):
+ logger.debug("%r: Postponed", self)
self.due_date = None
self.rpkid.task_add(self)
self.rpkid.task_next()
+ yield self.resumed.wait()
+ @tornado.gen.coroutine
def __call__(self):
- self.due_date = rpki.sundial.now() + self.timeslice
- if self.continuation is None:
- logger.debug("Running task %r", self)
- self.clear()
- self.start()
- else:
- logger.debug("Restarting task %r at %r", self, self.continuation)
- continuation = self.continuation
- self.continuation = None
- continuation()
+ try:
+ self.due_date = rpki.sundial.now() + self.timeslice
+ if self.started:
+ logger.debug("%r: Resuming", self)
+ self.resumed.notify()
+ else:
+ logger.debug("%r: Starting", self)
+ self.clear()
+ self.started = True
+ yield self.start()
+ except:
+ logger.exception("%r: Unhandled exception", self)
+ self.exit()
+ #
+ # Unclear whether we should re-raise the exception here or not,
+ # but re-raising it is probably safer until we know for sure.
+ #
+ raise
@property
def overdue(self):
return rpki.sundial.now() > self.due_date
- def __getattr__(self, name):
- return getattr(self.tenant, name)
-
+ @tornado.gen.coroutine
def start(self):
raise NotImplementedError
@@ -152,340 +129,207 @@ class AbstractTask(object):
@queue_task
class PollParentTask(AbstractTask):
"""
- Run the regular client poll cycle with each of this self's
+ Run the regular client poll cycle with each of this tenant's
parents, in turn.
"""
- def clear(self):
- logger.debug("PollParentTask.clear()")
- self.parent_iterator = None
- self.parent = None
- self.ca_map = None
- self.class_iterator = None
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- logger.debug("PollParentTask.start()")
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] polling parents", self.tenant_handle, self)
- assert not self.started
- self.started = True
- rpki.async.iterator(self.parents.all(), self.parent_loop, self.exit)
-
- def parent_loop(self, parent_iterator, parent):
- logger.debug("PollParentTask.parent_loop()")
- self.parent_iterator = parent_iterator
- self.parent = parent
- parent.up_down_list_query(rpkid = self.rpkid, cb = self.got_list, eb = self.list_failed)
-
- def got_list(self, r_msg):
- logger.debug("PollParentTask.got_list()")
- self.ca_map = dict((ca.parent_resource_class, ca) for ca in self.parent.cas.all())
- self.rpkid.checkpoint()
- rpki.async.iterator(r_msg.getiterator(rpki.up_down.tag_class), self.class_loop, self.class_done)
-
- def list_failed(self, e):
- logger.debug("PollParentTask.list_failed()")
- logger.exception("Couldn't get resource class list from parent %r, skipping", self.parent)
- self.parent_iterator()
-
- def class_loop(self, class_iterator, rc):
- logger.debug("PollParentTask.class_loop()")
- self.rpkid.checkpoint()
- self.class_iterator = class_iterator
- try:
- ca = self.ca_map.pop(rc.get("class_name"))
- except KeyError:
- rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = self.parent, rc = rc,
- cb = class_iterator, eb = self.class_create_failed)
- else:
- ca.check_for_updates(rpkid = self.rpkid, parent = self.parent, rc = rc, cb = class_iterator, eb = self.class_update_failed)
-
- def class_update_failed(self, e):
- logger.debug("PollParentTask.class_update_failed()")
- logger.exception("Couldn't update class, skipping")
- self.class_iterator()
-
- def class_create_failed(self, e):
- logger.debug("PollParentTask.class_create_failed()")
- logger.exception("Couldn't create class, skipping")
- self.class_iterator()
-
- def class_done(self):
- logger.debug("PollParentTask.class_done()")
- rpki.async.iterator(self.ca_map.values(), self.ca_loop, self.ca_done)
-
- def ca_loop(self, iterator, ca):
- logger.debug("PollParentTask.ca_loop()")
- self.rpkid.checkpoint()
- ca.destroy(self.parent, iterator)
-
- def ca_done(self):
- logger.debug("PollParentTask.ca_done()")
- self.rpkid.checkpoint()
- self.parent_iterator()
+ logger.debug("%r: Polling parents", self)
+
+ for parent in self.tenant.parents.all():
+ try:
+ logger.debug("%r: Executing list query", self)
+ r_msg = yield parent.up_down_list_query(rpkid = self.rpkid)
+ except:
+ logger.exception("%r: Couldn't get resource class list from parent %r, skipping", self, parent)
+ continue
+
+ logger.debug("%r: Parsing list response", self)
+
+ ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas.all())
+
+ for rc in r_msg.getiterator(rpki.up_down.tag_class):
+ try:
+ class_name = rc.get("class_name")
+ ca = ca_map.pop(class_name, None)
+ if ca is None:
+ logger.debug("%r: Creating new CA for resource class %r", self, class_name)
+ rpki.rpkidb.models.CA.create(rpkid = self.rpkid, parent = parent, rc = rc)
+ else:
+ logger.debug("%r: Checking updates for existing CA %r for resource class %r", self, ca, class_name)
+ yield ca.check_for_updates(rpkid = self.rpkid, parent = parent, rc = rc)
+ except:
+ logger.exception("Couldn't update resource class %r, skipping", class_name)
+
+ for ca, class_name in ca_map.iteritems():
+ logger.debug("%r: Destroying orphaned CA %r for resource class %r", self, ca, class_name)
+ yield ca.destroy(parent)
+
+ self.exit()
@queue_task
class UpdateChildrenTask(AbstractTask):
"""
- Check for updated IRDB data for all of this self's children and
+ Check for updated IRDB data for all of this tenant's children and
issue new certs as necessary. Must handle changes both in
resources and in expiration date.
"""
- def clear(self):
- self.now = None
- self.rsn = None
- self.publisher = None
- self.iterator = None
- self.child = None
- self.child_certs = None
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] updating children", self.tenant_handle, self)
- assert not self.started
- self.started = True
- self.now = rpki.sundial.now()
- self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin)
- self.publisher = rpki.rpkid.publication_queue(self.rpkid)
- rpki.async.iterator(self.children.all(), self.loop, self.done)
-
- def loop(self, iterator, child):
- self.rpkid.checkpoint()
- self.iterator = iterator
- self.child = child
- self.child_certs = child.child_certs
- if self.overdue:
- self.publisher.call_pubd(lambda: self.postpone(self.do_child), self.publication_failed)
- else:
- self.do_child()
-
- def do_child(self):
- if self.child_certs:
- self.rpkid.irdb_query_child_resources(self.child.tenant.tenant_handle, self.child.child_handle,
- self.got_resources, self.lose)
- else:
- self.iterator()
-
- def lose(self, e):
- logger.exception("Couldn't update child %r, skipping", self.child)
- self.iterator()
-
- def got_resources(self, irdb_resources):
+ logger.debug("%r: Updating children", self)
+ now = rpki.sundial.now()
+ rsn = now + rpki.sundial.timedelta(seconds = self.tenant.regen_margin)
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+
+ for child in self.tenant.children.all():
+ try:
+ if self.overdue:
+ yield publisher.call_pubd()
+ yield self.postpone()
+
+ child_certs = list(child.child_certs.filter(ca_detail__state = "active"))
+
+ if child_certs:
+ irdb_resources = yield self.rpkid.irdb_query_child_resources(child.tenant.tenant_handle, child.child_handle)
+
+ for child_cert in child_certs:
+ ca_detail = child_cert.ca_detail
+ old_resources = child_cert.cert.get_3779resources()
+ new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
+ old_aia = child_cert.cert.get_AIA()[0]
+ new_aia = ca_detail.ca_cert_uri
+
+ if new_resources.empty():
+ logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI())
+ child_cert.revoke(publisher = publisher)
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+
+ elif (old_resources != new_resources or old_aia != new_aia or (old_resources.valid_until < rsn and irdb_resources.valid_until > now and old_resources.valid_until != irdb_resources.valid_until)):
+ logger.debug("Need to reissue child %s certificate SKI %s", child.child_handle, child_cert.cert.gSKI())
+ if old_resources != new_resources:
+ logger.debug("Child %s SKI %s resources changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources, new_resources)
+ if old_resources.valid_until != irdb_resources.valid_until:
+ logger.debug("Child %s SKI %s validity changed: old %s new %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)
+
+ new_resources.valid_until = irdb_resources.valid_until
+ child_cert.reissue(ca_detail = ca_detail, resources = new_resources, publisher = publisher)
+
+ elif old_resources.valid_until < now:
+ logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s", child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)
+ child_cert.delete()
+ publisher.queue(uri = child_cert.uri, old_obj = child_cert.cert, repository = ca_detail.ca.parent.repository)
+ ca_detail.generate_manifest(publisher = publisher)
+
+ except:
+ logger.exception("%r: Couldn't update child %r, skipping", self, child)
+
try:
- for child_cert in self.child_certs.filter(ca_detail__state = "active"):
- ca_detail = child_cert.ca_detail
- old_resources = child_cert.cert.get_3779resources()
- new_resources = old_resources & irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
- old_aia = child_cert.cert.get_AIA()[0]
- new_aia = ca_detail.ca_cert_uri
-
- if new_resources.empty():
- logger.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s",
- self.child.child_handle, child_cert.cert.gSKI())
- child_cert.revoke(publisher = self.publisher)
- ca_detail.generate_crl(publisher = self.publisher)
- ca_detail.generate_manifest(publisher = self.publisher)
-
- elif (old_resources != new_resources or
- old_aia != new_aia or
- (old_resources.valid_until < self.rsn and
- irdb_resources.valid_until > self.now and
- old_resources.valid_until != irdb_resources.valid_until)):
-
- logger.debug("Need to reissue child %s certificate SKI %s",
- self.child.child_handle, child_cert.cert.gSKI())
- if old_resources != new_resources:
- logger.debug("Child %s SKI %s resources changed: old %s new %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources, new_resources)
- if old_resources.valid_until != irdb_resources.valid_until:
- logger.debug("Child %s SKI %s validity changed: old %s new %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources.valid_until, irdb_resources.valid_until)
-
- new_resources.valid_until = irdb_resources.valid_until
- child_cert.reissue(
- ca_detail = ca_detail,
- resources = new_resources,
- publisher = self.publisher)
-
- elif old_resources.valid_until < self.now:
- logger.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s",
- self.child.child_handle, child_cert.cert.gSKI(),
- old_resources.valid_until, irdb_resources.valid_until)
- child_cert.delete()
- self.publisher.queue(
- uri = child_cert.uri,
- old_obj = child_cert.cert,
- repository = ca_detail.ca.parent.repository)
- ca_detail.generate_manifest(publisher = self.publisher)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- self.rpkid.checkpoint()
- self.lose(e)
- else:
- self.rpkid.checkpoint()
- self.iterator()
-
- def done(self):
- self.rpkid.checkpoint()
- self.publisher.call_pubd(self.exit, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
+ yield publisher.call_pubd()
+ except:
+ logger.exception("%r: Couldn't publish, skipping", self)
+
self.exit()
@queue_task
class UpdateROAsTask(AbstractTask):
"""
- Generate or update ROAs for this self.
+ Generate or update ROAs for this tenant.
"""
def clear(self):
- self.orphans = None
- self.updates = None
- self.publisher = None
+ self.publisher = None
self.ca_details = None
- self.count = None
- self.started = False
+ @tornado.gen.coroutine
def start(self):
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] updating ROAs", self.tenant_handle, self)
- assert not self.started
- self.started = True
- logger.debug("Issuing query for ROA requests")
- self.rpkid.irdb_query_roa_requests(self.tenant_handle, self.got_roa_requests, self.roa_requests_failed)
+ logger.debug("%r: Updating ROAs", self)
+
+ try:
+ r_msg = yield self.rpkid.irdb_query_roa_requests(self.tenant.tenant_handle)
+ except:
+ logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant.tenant_handle)
+ raise tornado.gen.Return
- def got_roa_requests(self, r_msg):
- self.rpkid.checkpoint()
- logger.debug("Received response to query for ROA requests")
+ logger.debug("%r: Received response to query for ROA requests: %r", self, r_msg)
roas = {}
seen = set()
- self.orphans = []
- self.updates = []
+ orphans = []
+ updates = []
self.publisher = rpki.rpkid.publication_queue(self.rpkid)
self.ca_details = set()
- logger.debug("UpdateROAsTask.got_roa_requests(): setup done, self.orphans %r", self.orphans)
- assert isinstance(self.orphans, list) # XXX
-
for roa in self.tenant.roas.all():
- logger.debug("UpdateROAsTask.got_roa_requests(): roa loop, self.orphans %r", self.orphans)
- assert isinstance(self.orphans, list) # XXX
k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
if k not in roas:
roas[k] = roa
- elif (roa.roa is not None and
- roa.cert is not None and
- roa.ca_detail is not None and
- roa.ca_detail.state == "active" and
- (roas[k].roa is None or
- roas[k].cert is None or
- roas[k].ca_detail is None or
- roas[k].ca_detail.state != "active")):
- self.orphans.append(roas[k])
+ elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")):
+ orphans.append(roas[k])
roas[k] = roa
else:
- self.orphans.append(roa)
-
- logger.debug("UpdateROAsTask.got_roa_requests(): roa loop done, self.orphans %r", self.orphans)
- assert isinstance(self.orphans, list) # XXX
+ orphans.append(roa)
for r_pdu in r_msg:
- logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop, self.orphans %r", self.orphans)
- assert isinstance(self.orphans, list)
k = (r_pdu.get("asn"), r_pdu.get("ipv4"), r_pdu.get("ipv6"))
if k in seen:
- logger.warning("Skipping duplicate ROA request %r", r_pdu)
+ logger.warning("%r: Skipping duplicate ROA request %r", self, r_pdu)
else:
seen.add(k)
roa = roas.pop(k, None)
if roa is None:
- roa = rpki.rpkidb.models.ROA(asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
- roa.tenant = self.tenant
- logger.debug("Created new %r", roa)
+ roa = rpki.rpkidb.models.ROA(tenant = self.tenant, asn = long(r_pdu.get("asn")), ipv4 = r_pdu.get("ipv4"), ipv6 = r_pdu.get("ipv6"))
+ logger.debug("%r: Created new %r", self, roa)
else:
- logger.debug("Found existing %r", roa)
- self.updates.append(roa)
+ logger.debug("%r: Found existing %r", self, roa)
+ updates.append(roa)
- logger.debug("UpdateROAsTask.got_roa_requests(): r_pdu loop done, self.orphans %r", self.orphans)
- assert isinstance(self.orphans, list) # XXX
+ orphans.extend(roas.itervalues())
- self.orphans.extend(roas.itervalues())
+ while updates:
+ if self.overdue:
+ yield self.publish()
+ yield self.postpone()
+ roa = updates.pop(0)
+ try:
+ roa.update(publisher = self.publisher, fast = True)
+ self.ca_details.add(roa.ca_detail)
+ except rpki.exceptions.NoCoveringCertForROA:
+ logger.warning("%r: No covering certificate for %r, skipping", self, roa)
+ except:
+ logger.exception("%r: Could not update %r, skipping", self, roa)
- if self.overdue:
- self.postpone(self.begin_loop)
- else:
- self.begin_loop()
+ for roa in orphans:
+ try:
+ self.ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = self.publisher, fast = True)
+ except:
+ logger.exception("%r: Could not revoke %r", self, roa)
- def begin_loop(self):
- self.count = 0
- rpki.async.iterator(self.updates, self.loop, self.done, pop_list = True)
+ yield self.publish()
- def loop(self, iterator, roa):
- self.rpkid.checkpoint()
- try:
- roa.update(publisher = self.publisher, fast = True)
- self.ca_details.add(roa.ca_detail)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except rpki.exceptions.NoCoveringCertForROA:
- logger.warning("No covering certificate for %r, skipping", roa)
- except Exception:
- logger.exception("Could not update %r, skipping", roa)
- self.count += 1
- if self.overdue:
- self.publish(lambda: self.postpone(iterator))
- else:
- iterator()
-
- def publish(self, done):
+ self.exit()
+
+ @tornado.gen.coroutine
+ def publish(self):
if not self.publisher.empty():
for ca_detail in self.ca_details:
- logger.debug("Generating new CRL for %r", ca_detail)
+ logger.debug("%r: Generating new CRL for %r", self, ca_detail)
ca_detail.generate_crl(publisher = self.publisher)
- logger.debug("Generating new manifest for %r", ca_detail)
+ logger.debug("%r: Generating new manifest for %r", self, ca_detail)
ca_detail.generate_manifest(publisher = self.publisher)
+ yield self.publisher.call_pubd()
self.ca_details.clear()
- self.rpkid.checkpoint()
- self.publisher.call_pubd(done, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
- self.exit()
-
- def done(self):
- for roa in self.orphans:
- try:
- self.ca_details.add(roa.ca_detail)
- roa.revoke(publisher = self.publisher, fast = True)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not revoke %r", roa)
- self.rpkid.checkpoint()
- self.publish(self.exit)
-
- def roa_requests_failed(self, e):
- logger.exception("Could not fetch ROA requests for %s, skipping", self.tenant_handle)
- self.exit()
@queue_task
class UpdateGhostbustersTask(AbstractTask):
"""
- Generate or update Ghostbuster records for this self.
+ Generate or update Ghostbuster records for this tenant.
This was originally based on the ROA update code. It's possible
that both could benefit from refactoring, but at this point the
@@ -494,23 +338,13 @@ class UpdateGhostbustersTask(AbstractTask):
exceptionally silly.
"""
- def clear(self):
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] updating Ghostbuster records", self.tenant_handle, self)
- assert not self.started
- self.started = True
+ logger.debug("%r: Updating Ghostbuster records", self)
parent_handles = set(p.parent_handle for p in self.tenant.parents.all())
- self.rpkid.irdb_query_ghostbuster_requests(self.tenant_handle, parent_handles,
- self.got_ghostbuster_requests,
- self.ghostbuster_requests_failed)
-
- def got_ghostbuster_requests(self, r_msg):
try:
- self.rpkid.checkpoint()
+ r_msg = yield self.rpkid.irdb_query_ghostbuster_requests(self.tenant.tenant_handle, parent_handles)
ghostbusters = {}
orphans = []
@@ -529,22 +363,20 @@ class UpdateGhostbustersTask(AbstractTask):
try:
self.tenant.parents.get(parent_handle = r_pdu.get("parent_handle"))
except rpki.rpkidb.models.Parent.DoesNotExist:
- logger.warning("Unknown parent_handle %r in Ghostbuster request, skipping", r_pdu.get("parent_handle"))
+ logger.warning("%r: Unknown parent_handle %r in Ghostbuster request, skipping", self, r_pdu.get("parent_handle"))
continue
k = (r_pdu.get("parent_handle"), r_pdu.text)
if k in seen:
- logger.warning("Skipping duplicate Ghostbuster request %r", r_pdu)
+ logger.warning("%r: Skipping duplicate Ghostbuster request %r", self, r_pdu)
continue
seen.add(k)
- for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"),
- ca__parent__tenant = self.tenant, state = "active"):
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__parent_handle = r_pdu.get("parent_handle"), ca__parent__tenant = self.tenant, state = "active"):
ghostbuster = ghostbusters.pop((ca_detail.pk, r_pdu.text), None)
if ghostbuster is None:
- ghostbuster = rpki.rpkidb.models.Ghostbuster(ca_detail = ca_detail, vcard = r_pdu.text)
- ghostbuster.tenant = self.tenant
- logger.debug("Created new %r for %r", ghostbuster, r_pdu.get("parent_handle"))
+ ghostbuster = rpki.rpkidb.models.Ghostbuster(tenant = self.tenant, ca_detail = ca_detail, vcard = r_pdu.text)
+ logger.debug("%r: Created new %r for %r", self, ghostbuster, r_pdu.get("parent_handle"))
else:
- logger.debug("Found existing %r for %s", ghostbuster, r_pdu.get("parent_handle"))
+ logger.debug("%r: Found existing %r for %s", self, ghostbuster, r_pdu.get("parent_handle"))
ghostbuster.update(publisher = publisher, fast = True)
ca_details.add(ca_detail)
@@ -557,22 +389,11 @@ class UpdateGhostbustersTask(AbstractTask):
ca_detail.generate_crl(publisher = publisher)
ca_detail.generate_manifest(publisher = publisher)
- self.rpkid.checkpoint()
- publisher.call_pubd(self.exit, self.publication_failed)
+ yield publisher.call_pubd()
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant_handle)
- self.exit()
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish Ghostbuster updates for %s, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
- self.exit()
+ except:
+ logger.exception("Could not update Ghostbuster records for %s, skipping", self.tenant.tenant_handle)
- def ghostbuster_requests_failed(self, e):
- logger.exception("Could not fetch Ghostbuster record requests for %s, skipping", self.tenant_handle)
self.exit()
@@ -585,22 +406,12 @@ class UpdateEECertificatesTask(AbstractTask):
so keeping it simple for initial version, we can optimize later.
"""
- def clear(self):
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] updating EE certificates", self.tenant_handle, self)
- assert not self.started
- self.started = True
- self.rpkid.irdb_query_ee_certificate_requests(self.tenant_handle,
- self.got_requests,
- self.get_requests_failed)
-
- def got_requests(self, r_msg):
+ logger.debug("%r: Updating EE certificates", self)
try:
- self.rpkid.checkpoint()
+ r_msg = yield self.rpkid.irdb_query_ee_certificate_requests(self.tenant.tenant_handle)
publisher = rpki.rpkid.publication_queue(self.rpkid)
@@ -621,28 +432,23 @@ class UpdateEECertificatesTask(AbstractTask):
v4 = rpki.resource_set.resource_set_ipv4(r_pdu.get("ipv4")),
v6 = rpki.resource_set.resource_set_ipv6(r_pdu.get("ipv6")),
valid_until = rpki.sundial.datetime.fromXMLtime(r_pdu.get("valid_until")))
- covering = self.find_covering_ca_details(resources)
+ covering = self.tenant.find_covering_ca_details(resources)
ca_details.update(covering)
for ee in ees:
if ee.ca_detail in covering:
- logger.debug("Updating existing EE certificate for %s %s",
- gski, resources)
- ee.reissue(
- resources = resources,
- publisher = publisher)
+ logger.debug("Updating existing EE certificate for %s %s", gski, resources)
+ ee.reissue(resources = resources, publisher = publisher)
covering.remove(ee.ca_detail)
else:
- logger.debug("Existing EE certificate for %s %s is no longer covered",
- gski, resources)
+ logger.debug("Existing EE certificate for %s %s is no longer covered", gski, resources)
ee.revoke(publisher = publisher)
subject_name = rpki.x509.X501DN.from_cn(r_pdu.get("cn"), r_pdu.get("sn"))
subject_key = rpki.x509.PKCS10(Base64 = r_pdu[0].text).getPublicKey()
for ca_detail in covering:
- logger.debug("No existing EE certificate for %s %s",
- gski, resources)
+ logger.debug("No existing EE certificate for %s %s", gski, resources)
rpki.rpkidb.models.EECertificate.create( # sic: class method, not Django manager method (for now, anyway)
ca_detail = ca_detail,
subject_name = subject_name,
@@ -661,29 +467,18 @@ class UpdateEECertificatesTask(AbstractTask):
ca_detail.generate_crl(publisher = publisher)
ca_detail.generate_manifest(publisher = publisher)
- self.rpkid.checkpoint()
- publisher.call_pubd(self.exit, self.publication_failed)
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Could not update EE certificates for %s, skipping", self.tenant_handle)
- self.exit()
+ yield publisher.call_pubd()
- def publication_failed(self, e):
- logger.exception("Couldn't publish EE certificate updates for %s, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
- self.exit()
+ except:
+ logger.exception("Could not update EE certificates for %s, skipping", self.tenant.tenant_handle)
- def get_requests_failed(self, e):
- logger.exception("Could not fetch EE certificate requests for %s, skipping", self.tenant_handle)
self.exit()
@queue_task
class RegenerateCRLsAndManifestsTask(AbstractTask):
"""
- Generate new CRLs and manifests as necessary for all of this self's
+ Generate new CRLs and manifests as necessary for all of this tenant's
CAs. Extracting nextUpdate from a manifest is hard at the moment
due to implementation silliness, so for now we generate a new
manifest whenever we generate a new CRL
@@ -693,48 +488,33 @@ class RegenerateCRLsAndManifestsTask(AbstractTask):
database anyway.
"""
- def clear(self):
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- self.rpkid.checkpoint()
- logger.debug("Self %s[%r] regenerating CRLs and manifests", self.tenant_handle, self)
- assert not self.started
- self.started = True
- now = rpki.sundial.now()
- crl_interval = rpki.sundial.timedelta(seconds = self.crl_interval)
- regen_margin = max(self.rpkid.cron_period * 2, crl_interval / 4)
- publisher = rpki.rpkid.publication_queue(self.rpkid)
+ logger.debug("%r: Regenerating CRLs and manifests", self)
- logger.debug("RegenerateCRLsAndManifestsTask: setup complete") # XXX
+ try:
+ now = rpki.sundial.now()
+ crl_interval = rpki.sundial.timedelta(seconds = self.tenant.crl_interval)
+ regen_margin = max(rpki.sundial.timedelta(seconds = self.rpkid.cron_period) * 2, crl_interval / 4)
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
- for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant):
- logger.debug("RegenerateCRLsAndManifestsTask: checking CA %r", ca) # XXX
- try:
- for ca_detail in ca.ca_details.filter(state = "revoked"):
- if now > ca_detail.latest_crl.getNextUpdate():
- ca_detail.destroy(ca = ca, publisher = publisher)
- for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")):
- if now + regen_margin > ca_detail.latest_crl.getNextUpdate():
- ca_detail.generate_crl(publisher = publisher)
- ca_detail.generate_manifest(publisher = publisher)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception:
- logger.exception("Couldn't regenerate CRLs and manifests for CA %r, skipping", ca)
-
- logger.debug("RegenerateCRLsAndManifestsTask: CA loop complete") # XXX
-
- self.rpkid.checkpoint()
- publisher.call_pubd(self.done, self.lose)
-
- def done(self):
- logger.debug("RegenerateCRLsAndManifestsTask: publication complete") # XXX
- self.exit()
+ for ca in rpki.rpkidb.models.CA.objects.filter(parent__tenant = self.tenant):
+ try:
+ for ca_detail in ca.ca_details.filter(state = "revoked"):
+ if now > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.destroy(ca = ca, publisher = publisher)
+ for ca_detail in ca.ca_details.filter(state__in = ("active", "deprecated")):
+ if now + regen_margin > ca_detail.latest_crl.getNextUpdate():
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+ except:
+ logger.exception("%r: Couldn't regenerate CRLs and manifests for CA %r, skipping", self, ca)
+
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("%r: Couldn't publish updated CRLs and manifests, skipping", self)
- def lose(self, e):
- logger.exception("Couldn't publish updated CRLs and manifests for self %r, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
self.exit()
@@ -745,24 +525,17 @@ class CheckFailedPublication(AbstractTask):
to pubd being down or unreachable).
"""
- def clear(self):
- self.started = False
-
+ @tornado.gen.coroutine
def start(self):
- assert not self.started
- logger.debug("CheckFailedPublication starting")
- self.started = True
- publisher = rpki.rpkid.publication_queue(self.rpkid)
- for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"):
- ca_detail.check_failed_publication(publisher)
- self.rpkid.checkpoint()
- publisher.call_pubd(self.done, self.publication_failed)
-
- def publication_failed(self, e):
- logger.exception("Couldn't publish for %s, skipping", self.tenant_handle)
- self.rpkid.checkpoint()
- self.exit()
+ logger.debug("%r: Checking for failed publication actions", self)
+
+ try:
+ publisher = rpki.rpkid.publication_queue(self.rpkid)
+ for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"):
+ ca_detail.check_failed_publication(publisher)
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("%r: Couldn't run failed publications, skipping", self)
- def done(self):
- logger.debug("CheckFailedPublication finished")
self.exit()
diff --git a/rpki/rpkidb/models.py b/rpki/rpkidb/models.py
index 91e6e5c0..62deeb8b 100644
--- a/rpki/rpkidb/models.py
+++ b/rpki/rpkidb/models.py
@@ -7,6 +7,13 @@ from __future__ import unicode_literals
import logging
import base64
+import tornado.gen
+import tornado.web
+import tornado.ioloop
+import tornado.httputil
+import tornado.httpclient
+import tornado.httpserver
+
from django.db import models
import rpki.left_right
@@ -139,8 +146,8 @@ class XMLManager(models.Manager): # pylint: disable=W0232
Add a few methods which locate or create an object or objects
corresponding to the handles in an XML element, as appropriate.
- This assumes that models which use it have an "xml" class attribute
- holding an XMLTemplate object (above).
+ This assumes that models which use it have an "xml_template"
+ class attribute holding an XMLTemplate object (above).
"""
def xml_get_or_create(self, xml):
@@ -191,28 +198,32 @@ def xml_hooks(cls):
# Maybe inheritance from an abstract model would work here. Then
# again, maybe we could use this decorator to do something prettier
- # for the XMLTemplate setup. Whatever. Clean up once basic stuff
- # works again after transition from pre-Django SQL.
+ # for the XMLTemplate setup. Whatever. Gussie up later.
- def default_xml_post_save_hook(self, rpkid, q_pdu, cb, eb):
- logger.debug("default_xml_post_save_hook()")
- cb()
- def default_xml_pre_delete_hook(self, rpkid, cb, eb):
- logger.debug("default_xml_pre_delete_hook()")
- cb()
def default_xml_pre_save_hook(self, q_pdu):
logger.debug("default_xml_pre_save_hook()")
- pass # pylint: disable=W0107
- for name, method in (("xml_post_save_hook", default_xml_post_save_hook),
- ("xml_pre_delete_hook", default_xml_pre_delete_hook),
- ("xml_pre_save_hook", default_xml_pre_save_hook)):
+
+ @tornado.gen.coroutine
+ def default_xml_post_save_hook(self, rpkid, q_pdu):
+ logger.debug("default_xml_post_save_hook()")
+
+ @tornado.gen.coroutine
+ def default_xml_pre_delete_hook(self, rpkid):
+ logger.debug("default_xml_pre_delete_hook()")
+
+ for name, method in (("xml_pre_save_hook", default_xml_pre_save_hook),
+ ("xml_post_save_hook", default_xml_post_save_hook),
+ ("xml_pre_delete_hook", default_xml_pre_delete_hook)):
if not hasattr(cls, name):
setattr(cls, name, method)
return cls
-# Models
+# Models.
+#
+# There's far too much random code hanging off of model methods, relic
+# of the earlier implementation. Clean up as time permits.
@xml_hooks
class Tenant(models.Model):
@@ -230,14 +241,17 @@ class Tenant(models.Model):
booleans = ("use_hsm",),
elements = ("bpki_cert", "bpki_glue"))
+ @tornado.gen.coroutine
+ def xml_pre_delete_hook(self, rpkid):
+ yield [parent.destroy() for parent in self.parents.all()]
- def xml_pre_delete_hook(self, rpkid, cb, eb):
- def loop(iterator, parent):
- parent.destroy(iterator)
- rpki.async.iterator(self.parents.all(), loop, cb)
-
+ @tornado.gen.coroutine
+ def xml_post_save_hook(self, rpkid, q_pdu):
+ rekey = q_pdu.get("rekey")
+ revoke = q_pdu.get("revoke")
+ reissue = q_pdu.get("reissue")
+ revoke_forgotten = q_pdu.get("revoke_forgotten")
- def xml_post_save_hook(self, rpkid, q_pdu, cb, eb):
if q_pdu.get("clear_replay_protection"):
for parent in self.parents.all():
parent.clear_replay_protection()
@@ -245,51 +259,52 @@ class Tenant(models.Model):
child.clear_replay_protection()
for repository in self.repositories.all():
repository.clear_replay_protection()
- actions = []
- rekey = q_pdu.get("rekey")
- revoke = q_pdu.get("revoke")
- reissue = q_pdu.get("reissue")
- revoke_forgotten = q_pdu.get("revoke_forgotten")
+
+ futures = []
+
if rekey or revoke or reissue or revoke_forgotten:
for parent in self.parents.all():
if rekey:
- actions.append(parent.serve_rekey)
+ futures.append(parent.serve_rekey(rpkid))
if revoke:
- actions.append(parent.serve_revoke)
+ futures.append(parent.serve_revoke(rpkid))
if reissue:
- actions.append(parent.serve_reissue)
+ futures.append(parent.serve_reissue(rpkid))
if revoke_forgotten:
- actions.append(parent.serve_revoke_forgotten)
+ futures.append(parent.serve_revoke_forgotten(rpkid))
+
if q_pdu.get("publish_world_now"):
- actions.append(self.serve_publish_world_now)
+ futures.append(self.serve_publish_world_now(rpkid))
if q_pdu.get("run_now"):
- actions.append(self.serve_run_now)
- def loop(iterator, action):
- action(rpkid, iterator, eb)
- rpki.async.iterator(actions, loop, cb)
+ futures.append(self.serve_run_now(rpkid))
+
+ yield futures
- def serve_publish_world_now(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def serve_publish_world_now(self, rpkid):
publisher = rpki.rpkid.publication_queue(rpkid)
repositories = set()
objects = dict()
- def loop(iterator, parent):
+ for parent in self.parents.all():
+
repository = parent.repository
if repository.peer_contact_uri in repositories:
- return iterator()
+ continue
repositories.add(repository.peer_contact_uri)
q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
type = "query", version = rpki.publication.version)
SubElement(q_msg, rpki.publication.tag_list, tag = "list")
- def list_handler(r_pdu):
- rpki.publication.raise_if_error(r_pdu)
- assert r_pdu.tag == rpki.publication.tag_list
- assert r_pdu.get("uri") not in objects
- objects[r_pdu.get("uri")] = (r_pdu.get("hash"), repository)
+ r_msg = yield repository.call_pubd(rpkid, q_msg, length_check = False)
- repository.call_pubd(rpkid, iterator, eb, q_msg, length_check = False, handlers = dict(list = list_handler))
+ for r_pdu in r_msg:
+ assert r_pdu.tag == rpki.publication.tag_list
+ if r_pdu.get("uri") in objects:
+ logger.warning("pubd reported multiple published copies of URI %r, this makes no sense, blundering onwards", r_pdu.get("uri"))
+ else:
+ objects[r_pdu.get("uri")] = (r_pdu.get("hash"), repository)
def reconcile(uri, obj, repository):
h, r = objects.pop(uri, (None, None))
@@ -297,43 +312,49 @@ class Tenant(models.Model):
assert r == repository
publisher.queue(uri = uri, new_obj = obj, old_hash = h, repository = repository)
- def done():
- for ca_detail in CADetail.objects.filter(ca__parent__tenant = self, state = "active"):
- repository = ca_detail.ca.parent.repository
- reconcile(uri = ca_detail.crl_uri, obj = ca_detail.latest_crl, repository = repository)
- reconcile(uri = ca_detail.manifest_uri, obj = ca_detail.latest_manifest, repository = repository)
- for c in ca_detail.child_certs.all():
- reconcile(uri = c.uri, obj = c.cert, repository = repository)
- for r in ca_detail.roas.filter(roa__isnull = False):
- reconcile(uri = r.uri, obj = r.roa, repository = repository)
- for g in ca_detail.ghostbusters.all():
- reconcile(uri = g.uri, obj = g.ghostbuster, repository = repository)
- for c in ca_detail.ee_certificates.all():
- reconcile(uri = c.uri, obj = c.cert, repository = repository)
- for u in objects:
- h, r = objects[u]
- publisher.queue(uri = u, old_hash = h, repository = r)
- publisher.call_pubd(cb, eb)
-
- rpki.async.iterator(self.parents.all(), loop, done)
-
-
- def serve_run_now(self, rpkid, cb, eb):
+ for ca_detail in CADetail.objects.filter(ca__parent__tenant = self, state = "active"):
+ repository = ca_detail.ca.parent.repository
+ reconcile(uri = ca_detail.crl_uri, obj = ca_detail.latest_crl, repository = repository)
+ reconcile(uri = ca_detail.manifest_uri, obj = ca_detail.latest_manifest, repository = repository)
+ for c in ca_detail.child_certs.all():
+ reconcile(uri = c.uri, obj = c.cert, repository = repository)
+ for r in ca_detail.roas.filter(roa__isnull = False):
+ reconcile(uri = r.uri, obj = r.roa, repository = repository)
+ for g in ca_detail.ghostbusters.all():
+ reconcile(uri = g.uri, obj = g.ghostbuster, repository = repository)
+ for c in ca_detail.ee_certificates.all():
+ reconcile(uri = c.uri, obj = c.cert, repository = repository)
+ for u in objects:
+ h, r = objects[u]
+ publisher.queue(uri = u, old_hash = h, repository = r)
+
+ yield publisher.call_pubd()
+
+
+ @tornado.gen.coroutine
+ def serve_run_now(self, rpkid):
logger.debug("Forced immediate run of periodic actions for tenant %s[%r]", self.tenant_handle, self)
- completion = rpki.rpkid_tasks.CompletionHandler(cb)
- self.schedule_cron_tasks(rpkid, completion)
- assert completion.count > 0
+ futures = [condition.wait() for condition in self.schedule_cron_tasks(rpkid)]
rpkid.task_run()
+ logger.debug("serve_run_now() futures: %r", futures)
+ assert futures
+ try:
+ yield futures
+ except:
+ logger.exception("serve_run_now() failed")
+ raise
+ else:
+ logger.debug("serve_run_now() done")
- def schedule_cron_tasks(self, rpkid, completion):
+ def schedule_cron_tasks(self, rpkid):
try:
tasks = self.cron_tasks
except AttributeError:
tasks = self.cron_tasks = tuple(task(rpkid, self) for task in rpki.rpkid_tasks.task_classes)
for task in tasks:
rpkid.task_add(task)
- completion.register(task)
+ yield task.completed # Plain old Python generator yield, this is not a coroutine
def find_covering_ca_details(self, resources):
@@ -403,10 +424,10 @@ class Repository(models.Model):
elements = ("bpki_cert", "bpki_glue"))
- def xml_post_save_hook(self, rpkid, q_pdu, cb, eb):
+ @tornado.gen.coroutine
+ def xml_post_save_hook(self, rpkid, q_pdu):
if q_pdu.get("clear_replay_protection"):
self.clear_replay_protection()
- cb()
def clear_replay_protection(self):
@@ -414,7 +435,8 @@ class Repository(models.Model):
self.save()
- def call_pubd(self, rpkid, callback, errback, q_msg, handlers = {}, length_check = True): # pylint: disable=W0102
+ @tornado.gen.coroutine
+ def call_pubd(self, rpkid, q_msg, handlers = {}, length_check = True): # pylint: disable=W0102
"""
Send a message to publication daemon and return the response.
@@ -428,47 +450,46 @@ class Repository(models.Model):
handler value of False suppresses calling of the default handler.
"""
- try:
- if len(q_msg) == 0:
- return callback()
-
- for q_pdu in q_msg:
- logger.info("Sending %r to pubd", q_pdu)
-
- bsc = self.bsc
- q_der = rpki.publication.cms_msg().wrap(q_msg, bsc.private_key_id, bsc.signing_cert, bsc.signing_cert_crl)
- bpki_ta_path = (rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue)
-
- def done(r_der):
- try:
- logger.debug("Received response from pubd")
- r_cms = rpki.publication.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap(bpki_ta_path)
- r_cms.check_replay_sql(self, self.peer_contact_uri)
- for r_pdu in r_msg:
- handler = handlers.get(r_pdu.get("tag"), rpki.publication.raise_if_error)
- if handler:
- logger.debug("Calling pubd handler %r", handler)
- handler(r_pdu)
- if length_check and len(q_msg) != len(r_msg):
- raise rpki.exceptions.BadPublicationReply("Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg))
- callback()
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- errback(e)
-
- logger.debug("Sending request to pubd")
- rpki.http.client(
- url = self.peer_contact_uri,
- msg = q_der,
- callback = done,
- errback = errback)
-
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- errback(e)
+ if len(q_msg) == 0:
+ raise tornado.gen.Return
+
+ for q_pdu in q_msg:
+ logger.info("Sending %r to pubd", q_pdu)
+
+ q_der = rpki.publication.cms_msg().wrap(q_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl)
+
+ http_client = tornado.httpclient.AsyncHTTPClient()
+
+ http_request = tornado.httpclient.HTTPRequest(
+ url = self.peer_contact_uri,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.publication.content_type })
+
+ http_response = yield http_client.fetch(http_request)
+
+ # Tornado already checked http_response.code for us
+
+ content_type = http_response.headers.get("Content-Type")
+
+ if content_type not in rpki.publication.allowed_content_types:
+ raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.publication.content_type, content_type))
+
+ r_der = http_response.body
+ r_cms = rpki.publication.cms_msg(DER = r_der)
+ r_msg = r_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue))
+ r_cms.check_replay_sql(self, self.peer_contact_uri)
+
+ for r_pdu in r_msg:
+ handler = handlers.get(r_pdu.get("tag"), rpki.publication.raise_if_error)
+ if handler:
+ logger.debug("Calling pubd handler %r", handler)
+ handler(r_pdu)
+
+ if length_check and len(q_msg) != len(r_msg):
+ raise rpki.exceptions.BadPublicationReply("Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg))
+
+ raise tornado.gen.Return(r_msg)
@xml_hooks
@@ -496,51 +517,44 @@ class Parent(models.Model):
elements = ("bpki_cert", "bpki_glue"))
- def xml_pre_delete_hook(self, rpkid, cb, eb):
- self.destroy(rpkid, cb, delete_parent = False)
-
+ @tornado.gen.coroutine
+ def xml_pre_delete_hook(self, rpkid):
+ self.destroy(rpkid, delete_parent = False)
- def xml_post_save_hook(self, rpkid, q_pdu, cb, eb):
+ @tornado.gen.coroutine
+ def xml_post_save_hook(self, rpkid, q_pdu):
if q_pdu.get("clear_replay_protection"):
self.clear_replay_protection()
- actions = []
+ futures = []
if q_pdu.get("rekey"):
- actions.append(self.serve_rekey)
+ futures.append(self.serve_rekey(rpkid))
if q_pdu.get("revoke"):
- actions.append(self.serve_revoke)
+ futures.append(self.serve_revoke(rpkid))
if q_pdu.get("reissue"):
- actions.append(self.serve_reissue)
+ futures.append(self.serve_reissue(rpkid))
if q_pdu.get("revoke_forgotten"):
- actions.append(self.serve_revoke_forgotten)
- def loop(iterator, action):
- action(iterator, eb)
- rpki.async.iterator(actions, loop, cb)
+ futures.append(self.serve_revoke_forgotten(rpkid))
+ yield futures
+ @tornado.gen.coroutine
+ def serve_rekey(self, rpkid):
+ yield [ca.rekey() for ca in self.cas.all()]
- def serve_rekey(self, rpkid, cb, eb):
- def loop(iterator, ca):
- ca.rekey(iterator, eb)
- rpki.async.iterator(self.cas.all(), loop, cb)
-
-
- def serve_revoke(self, rpkid, cb, eb):
- def loop(iterator, ca):
- ca.revoke(cb = iterator, eb = eb)
- rpki.async.iterator(self.cas.all(), loop, cb)
-
-
- def serve_reissue(self, rpkid, cb, eb):
- def loop(iterator, ca):
- ca.reissue(cb = iterator, eb = eb)
- rpki.async.iterator(self.cas.all(), loop, cb)
+ @tornado.gen.coroutine
+ def serve_revoke(self, rpkid):
+ yield [ca.revoke() for ca in self.cas.all()]
+ @tornado.gen.coroutine
+ def serve_reissue(self, rpkid):
+ yield [ca.reissue() for ca in self.cas.all()]
def clear_replay_protection(self):
self.last_cms_timestamp = None
self.save()
- def get_skis(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def get_skis(self, rpkid):
"""
Fetch SKIs that this parent thinks we have. In theory this should
agree with our own database, but in practice stuff can happen, so
@@ -550,28 +564,32 @@ class Parent(models.Model):
set of SKIs as value.
"""
- def done(r_msg):
- cb(dict((rc.get("class_name"),
- set(rpki.x509.X509(Base64 = c.text).gSKI()
- for c in rc.getiterator(rpki.up_down.tag_certificate)))
- for rc in r_msg.getiterator(rpki.up_down.tag_class)))
- self.up_down_list_query(rpkid = rpkid, cb = done, eb = eb)
+ r_msg = yield self.up_down_list_query(rpkid = rpkid)
+
+ ski_map = {}
+
+ for rc in r_msg.getiterator(rpki.up_down.tag_class):
+ skis = set()
+ for c in rc.getiterator(rpki.up_down.tag_certificate):
+ skis.add(rpki.x509.X509(Base64 = c.text).gSKI())
+ ski_map[rc.get("class_name")] = skis
+ raise tornado.gen.Return(ski_map)
- def revoke_skis(self, rpkid, rc_name, skis_to_revoke, cb, eb):
+
+ @tornado.gen.coroutine
+ def revoke_skis(self, rpkid, rc_name, skis_to_revoke):
"""
Revoke a set of SKIs within a particular resource class.
"""
- def loop(iterator, ski):
- def revoked(r_pdu):
- iterator()
+ for ski in skis_to_revoke:
logger.debug("Asking parent %r to revoke class %r, SKI %s", self, rc_name, ski)
- self.up_down_revoke_query(rpkid = rpkid, class_name = rc_name, ski = ski, cb = revoked, eb = eb)
- rpki.async.iterator(skis_to_revoke, loop, cb)
+ yield self.up_down_revoke_query(rpkid = rpkid, class_name = rc_name, ski = ski)
- def serve_revoke_forgotten(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def serve_revoke_forgotten(self, rpkid):
"""
Handle a left-right revoke_forgotten action for this parent.
@@ -585,36 +603,24 @@ class Parent(models.Model):
require an explicit trigger.
"""
- def got_skis(skis_from_parent):
- def loop(iterator, item):
- rc_name, skis_to_revoke = item
- if rc_name in ca_map:
- for ca_detail in ca_map[rc_name].issue_response_candidate_ca_details:
- skis_to_revoke.discard(ca_detail.latest_ca_cert.gSKI())
- self.revoke_skis(rpkid, rc_name, skis_to_revoke, iterator, eb)
- ca_map = dict((ca.parent_resource_class, ca) for ca in self.cas.all())
- rpki.async.iterator(skis_from_parent.items(), loop, cb)
- self.get_skis(rpkid, got_skis, eb)
+ skis_from_parent = yield self.get_skis(rpkid)
+ for rc_name, skis_to_revoke in skis_from_parent.iteritems():
+ for ca_detail in CADetail.objects.filter(ca__parent = self).exclude(state = "revoked"):
+ skis_to_revoke.discard(ca_detail.latest_ca_cert.gSKI())
+ yield self.revoke_skis(rpkid, rc_name, skis_to_revoke)
- def destroy(self, rpkid, cb, delete_parent = True):
+ @tornado.gen.coroutine
+ def destroy(self, rpkid, delete_parent = True):
"""
Delete all the CA stuff under this parent, and perhaps the parent
itself.
"""
- def loop(iterator, ca):
- ca.destroy(self, iterator)
- def revoke():
- self.serve_revoke_forgotten(rpkid, done, fail)
- def fail(e):
- logger.warning("Trouble getting parent to revoke certificates, blundering onwards: %s", e)
- done()
- def done():
- if delete_parent:
- self.delete()
- cb()
- rpki.async.iterator(self.cas, loop, revoke)
+ yield [ca.destroy(self) for ca in self.cas()]
+ yield self.serve_revoke_forgotten(rpkid)
+ if delete_parent:
+ self.delete()
def _compose_up_down_query(self, query_type):
@@ -622,12 +628,15 @@ class Parent(models.Model):
sender = self.sender_name, recipient = self.recipient_name, type = query_type)
- def up_down_list_query(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def up_down_list_query(self, rpkid):
q_msg = self._compose_up_down_query("list")
- self.query_up_down(rpkid, q_msg, cb, eb)
+ r_msg = yield self.query_up_down(rpkid, q_msg)
+ raise tornado.gen.Return(r_msg)
- def up_down_issue_query(self, rpkid, ca, ca_detail, cb, eb):
+ @tornado.gen.coroutine
+ def up_down_issue_query(self, rpkid, ca, ca_detail):
logger.debug("Parent.up_down_issue_query(): caRepository %r rpkiManifest %r rpkiNotify %r",
ca.sia_uri, ca_detail.manifest_uri, ca.parent.repository.rrdp_notification_uri)
pkcs10 = rpki.x509.PKCS10.create(
@@ -639,16 +648,19 @@ class Parent(models.Model):
q_msg = self._compose_up_down_query("issue")
q_pdu = SubElement(q_msg, rpki.up_down.tag_request, class_name = ca.parent_resource_class)
q_pdu.text = pkcs10.get_Base64()
- self.query_up_down(rpkid, q_msg, cb, eb)
+ r_msg = yield self.query_up_down(rpkid, q_msg)
+ raise tornado.gen.Return(r_msg)
-
- def up_down_revoke_query(self, rpkid, class_name, ski, cb, eb):
+ @tornado.gen.coroutine
+ def up_down_revoke_query(self, rpkid, class_name, ski):
q_msg = self._compose_up_down_query("revoke")
SubElement(q_msg, rpki.up_down.tag_key, class_name = class_name, ski = ski)
- self.query_up_down(rpkid, q_msg, cb, eb)
+ r_msg = yield self.query_up_down(rpkid, q_msg)
+ raise tornado.gen.Return(r_msg)
- def query_up_down(self, rpkid, q_msg, cb, eb):
+ @tornado.gen.coroutine
+ def query_up_down(self, rpkid, q_msg):
if self.bsc is None:
raise rpki.exceptions.BSCNotFound("Could not find BSC")
@@ -656,37 +668,32 @@ class Parent(models.Model):
if self.bsc.signing_cert is None:
raise rpki.exceptions.BSCNotReady("BSC %r is not yet usable" % self.bsc.bsc_handle)
- q_der = rpki.up_down.cms_msg().wrap(q_msg,
- self.bsc.private_key_id,
- self.bsc.signing_cert,
- self.bsc.signing_cert_crl)
-
- def unwrap(r_der):
- try:
- r_cms = rpki.up_down.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap((rpkid.bpki_ta,
- self.tenant.bpki_cert,
- self.tenant.bpki_glue,
- self.bpki_cert,
- self.bpki_glue))
- r_cms.check_replay_sql(self, self.peer_contact_uri)
- rpki.up_down.check_response(r_msg, q_msg.get("type"))
-
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- eb(e)
- else:
- cb(r_msg)
+ q_der = rpki.up_down.cms_msg().wrap(q_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl)
+
+ http_client = tornado.httpclient.AsyncHTTPClient()
+
+ http_request = tornado.httpclient.HTTPRequest(
+ url = self.peer_contact_uri,
+ method = "POST",
+ body = q_der,
+ headers = { "Content-Type" : rpki.up_down.content_type })
+
+ http_response = yield http_client.fetch(http_request)
- logger.debug("query_up_down(): type(q_der) %r", type(q_der)) # XXX
+ # Tornado already checked http_response.code for us
- rpki.http.client(
- msg = q_der,
- url = self.peer_contact_uri,
- callback = unwrap,
- errback = eb,
- content_type = rpki.up_down.content_type)
+ content_type = http_response.headers.get("Content-Type")
+
+ if content_type not in rpki.up_down.allowed_content_types:
+ raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.up_down.content_type, content_type))
+
+ r_der = http_response.body
+ r_cms = rpki.up_down.cms_msg(DER = r_der)
+ r_msg = r_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue))
+ r_cms.check_replay_sql(self, self.peer_contact_uri)
+ rpki.up_down.check_response(r_msg, q_msg.get("type"))
+
+ raise tornado.gen.Return(r_msg)
def construct_sia_uri(self, rc):
@@ -736,7 +743,8 @@ class CA(models.Model):
#def issue_response_candidate_ca_details(self): return self.ca_details.exclude(state = "revoked")
- def check_for_updates(self, rpkid, parent, rc, cb, eb):
+ @tornado.gen.coroutine
+ def check_for_updates(self, rpkid, parent, rc):
"""
Parent has signaled continued existance of a resource class we
already knew about, so we need to check for an updated
@@ -747,74 +755,86 @@ class CA(models.Model):
logger.debug("check_for_updates()")
sia_uri = parent.construct_sia_uri(rc)
sia_uri_changed = self.sia_uri != sia_uri
+
if sia_uri_changed:
logger.debug("SIA changed: was %s now %s", self.sia_uri, sia_uri)
self.sia_uri = sia_uri
+
class_name = rc.get("class_name")
+
rc_resources = rpki.resource_set.resource_bag(
rc.get("resource_set_as"),
rc.get("resource_set_ipv4"),
rc.get("resource_set_ipv6"),
rc.get("resource_set_notafter"))
+
cert_map = {}
+
for c in rc.getiterator(rpki.up_down.tag_certificate):
x = rpki.x509.X509(Base64 = c.text)
u = rpki.up_down.multi_uri(c.get("cert_url")).rsync()
cert_map[x.gSKI()] = (x, u)
- def loop(iterator, ca_detail):
+
+ ca_details = self.ca_details.exclude(state = "revoked")
+
+ if not ca_details:
+ logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying",
+ class_name, parent.tenant.tenant_handle, parent.parent_handle)
+ yield self.rekey(rpkid)
+ raise tornado.gen.Return
+
+ for ca_detail in ca_details:
+
rc_cert, rc_cert_uri = cert_map.pop(ca_detail.public_key.gSKI(), (None, None))
+
if rc_cert is None:
logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, "
"maybe parent certificate went away?",
ca_detail.public_key.gSKI(), class_name, parent.tenant.tenant_handle, parent.parent_handle)
publisher = rpki.rpkid.publication_queue(rpkid)
ca_detail.destroy(ca = ca_detail.ca, publisher = publisher)
- return publisher.call_pubd(iterator, eb)
+ yield publisher.call_pubd()
+ continue
+
if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert_uri:
logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert_uri)
ca_detail.ca_cert_uri = rc_cert_uri
ca_detail.save()
+
if ca_detail.state not in ("pending", "active"):
- return iterator()
+ continue
+
if ca_detail.state == "pending":
current_resources = rpki.resource_set.resource_bag()
else:
current_resources = ca_detail.latest_ca_cert.get_3779resources()
+
if (ca_detail.state == "pending" or
sia_uri_changed or
ca_detail.latest_ca_cert != rc_cert or
ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or
current_resources.undersized(rc_resources) or
current_resources.oversized(rc_resources)):
- return ca_detail.update(
+
+ yield ca_detail.update(
rpkid = rpkid,
parent = parent,
ca = self,
rc = rc,
sia_uri_changed = sia_uri_changed,
- old_resources = current_resources,
- callback = iterator,
- errback = eb)
- iterator()
- def done():
- if cert_map:
- logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?",
- "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle)
- cb()
- ca_details = self.ca_details.exclude(state = "revoked")
- if ca_details:
- rpki.async.iterator(ca_details, loop, done)
- else:
- logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying",
- class_name, parent.tenant.tenant_handle, parent.parent_handle)
- self.rekey(rpkid, cb, eb)
+ old_resources = current_resources)
+
+ if cert_map:
+ logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?",
+ "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.tenant.tenant_handle, parent.parent_handle)
# Called from exactly one place, in rpki.rpkid_tasks.PollParentTask.class_loop().
# Might want to refactor.
@classmethod
- def create(cls, rpkid, parent, rc, cb, eb):
+ @tornado.gen.coroutine
+ def create(cls, rpkid, parent, rc):
"""
Parent has signaled existance of a new resource class, so we need
to create and set up a corresponding CA object.
@@ -823,22 +843,26 @@ class CA(models.Model):
self = cls.objects.create(parent = parent,
parent_resource_class = rc.get("class_name"),
sia_uri = parent.construct_sia_uri(rc))
+
ca_detail = CADetail.create(self)
- def done(r_msg):
- c = r_msg[0][0]
- logger.debug("CA %r received certificate %s", self, c.get("cert_url"))
- ca_detail.activate(
- rpkid = rpkid,
- ca = self,
- cert = rpki.x509.X509(Base64 = c.text),
- uri = c.get("cert_url"),
- callback = cb,
- errback = eb)
+
logger.debug("Sending issue request to %r from %r", parent, self.create)
- parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = ca_detail, cb = done, eb = eb)
+ r_msg = yield parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = ca_detail)
+
+ c = r_msg[0][0]
+
+ logger.debug("CA %r received certificate %s", self, c.get("cert_url"))
- def destroy(self, rpkid, parent, callback):
+ yield ca_detail.activate(
+ rpkid = rpkid,
+ ca = self,
+ cert = rpki.x509.X509(Base64 = c.text),
+ uri = c.get("cert_url"))
+
+
+ @tornado.gen.coroutine
+ def destroy(self, rpkid, parent):
"""
The list of current resource classes received from parent does not
include the class corresponding to this CA, so we need to delete
@@ -850,17 +874,20 @@ class CA(models.Model):
CA, then finally delete this CA itself.
"""
- def lose(e):
- logger.exception("Could not delete CA %r, skipping", self)
- callback()
- def done():
- logger.debug("Deleting %r", self)
- self.delete()
- callback()
publisher = rpki.rpkid.publication_queue(rpkid)
+
for ca_detail in self.ca_details.all():
ca_detail.destroy(ca = self, publisher = publisher, allow_failure = True)
- publisher.call_pubd(done, lose)
+
+ try:
+ yield publisher.call_pubd()
+
+ except:
+ logger.exception("Could not delete CA %r, skipping", self)
+
+ else:
+ logger.debug("Deleting %r", self)
+ self.delete()
def next_serial_number(self):
@@ -893,7 +920,8 @@ class CA(models.Model):
return self.last_crl_sn
- def rekey(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def rekey(self, rpkid):
"""
Initiate a rekey operation for this CA. Generate a new keypair.
Request cert from parent using new keypair. Mark result as our
@@ -908,44 +936,46 @@ class CA(models.Model):
new_detail = CADetail.create(ca = self) # sic: class method, not manager function (for now, anyway)
- def done(r_msg):
- c = r_msg[0][0]
- logger.debug("CA %r received certificate %s", self, c.get("cert_url"))
- new_detail.activate(
- rpkid = rpkid,
- ca = self,
- cert = rpki.x509.X509(Base64 = c.text),
- uri = c.get("cert_url"),
- predecessor = old_detail,
- callback = cb,
- errback = eb)
-
logger.debug("Sending issue request to %r from %r", self.parent, self.rekey)
- self.parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = new_detail, cb = done, eb = eb)
+
+ r_msg = yield self.parent.up_down_issue_query(rpkid = rpkid, ca = self, ca_detail = new_detail)
+
+ c = r_msg[0][0]
+
+ logger.debug("CA %r received certificate %s", self, c.get("cert_url"))
+
+ yield new_detail.activate(
+ rpkid = rpkid,
+ ca = self,
+ cert = rpki.x509.X509(Base64 = c.text),
+ uri = c.get("cert_url"),
+ predecessor = old_detail)
- def revoke(self, cb, eb, revoke_all = False):
+ @tornado.gen.coroutine
+ def revoke(self, revoke_all = False):
"""
Revoke deprecated ca_detail objects associated with this CA, or
all ca_details associated with this CA if revoke_all is set.
"""
- def loop(iterator, ca_detail):
- ca_detail.revoke(cb = iterator, eb = eb)
- rpki.async.iterator(self.ca_details.all() if revoke_all else self.ca_details.filter(state = "deprecated"),
- loop, cb)
+ if revoke_all:
+ ca_details = self.ca_details.all()
+ else:
+ ca_details = self.ca_details.filter(state = "deprecated")
+
+ yield [ca_detail.revoke() for ca_detail in ca_details]
- def reissue(self, cb, eb):
+ @tornado.gen.coroutine
+ def reissue(self):
"""
Reissue all current certificates issued by this CA.
"""
ca_detail = self.ca_details.get(state = "active")
if ca_detail:
- ca_detail.reissue(cb, eb)
- else:
- cb()
+ yield ca_detail.reissue()
class CADetail(models.Model):
@@ -1014,7 +1044,8 @@ class CADetail(models.Model):
return target.asn <= me.asn and target.v4 <= me.v4 and target.v6 <= me.v6
- def activate(self, rpkid, ca, cert, uri, callback, errback, predecessor = None):
+ @tornado.gen.coroutine
+ def activate(self, rpkid, ca, cert, uri, predecessor = None):
"""
Activate this ca_detail.
"""
@@ -1027,6 +1058,7 @@ class CADetail(models.Model):
self.generate_crl(publisher = publisher)
self.generate_manifest(publisher = publisher)
self.save()
+
if predecessor is not None:
predecessor.state = "deprecated"
predecessor.save()
@@ -1038,7 +1070,8 @@ class CADetail(models.Model):
ghostbuster.regenerate(publisher = publisher)
predecessor.generate_crl(publisher = publisher)
predecessor.generate_manifest(publisher = publisher)
- publisher.call_pubd(callback, errback)
+
+ yield publisher.call_pubd()
def destroy(self, ca, publisher, allow_failure = False):
@@ -1068,7 +1101,9 @@ class CADetail(models.Model):
logger.debug("Deleting %r", self)
self.delete()
- def revoke(self, rpkid, cb, eb):
+
+ @tornado.gen.coroutine
+ def revoke(self, rpkid):
"""
Request revocation of all certificates whose SKI matches the key
for this ca_detail.
@@ -1091,84 +1126,109 @@ class CADetail(models.Model):
time has passed.
"""
- ca = self.ca
- parent = ca.parent
- class_name = ca.parent_resource_class
gski = self.latest_ca_cert.gSKI()
- def parent_revoked(r_msg):
- if r_msg[0].get("class_name") != class_name:
- raise rpki.exceptions.ResourceClassMismatch
- if r_msg[0].get("ski") != gski:
- raise rpki.exceptions.SKIMismatch
- logger.debug("Parent revoked %s, starting cleanup", gski)
- crl_interval = rpki.sundial.timedelta(seconds = parent.tenant.crl_interval)
- nextUpdate = rpki.sundial.now()
- if self.latest_manifest is not None:
- self.latest_manifest.extract_if_needed()
- nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate())
- if self.latest_crl is not None:
- nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate())
- publisher = rpki.rpkid.publication_queue(rpkid)
- for child_cert in self.child_certs.all():
- nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter())
- child_cert.revoke(publisher = publisher)
- for roa in self.roas.all():
- nextUpdate = nextUpdate.later(roa.cert.getNotAfter())
- roa.revoke(publisher = publisher)
- for ghostbuster in self.ghostbusters.all():
- nextUpdate = nextUpdate.later(ghostbuster.cert.getNotAfter())
- ghostbuster.revoke(publisher = publisher)
- nextUpdate += crl_interval
- self.generate_crl(publisher = publisher, nextUpdate = nextUpdate)
- self.generate_manifest(publisher = publisher, nextUpdate = nextUpdate)
- self.private_key_id = None
- self.manifest_private_key_id = None
- self.manifest_public_key = None
- self.latest_manifest_cert = None
- self.state = "revoked"
- self.save()
- publisher.call_pubd(cb, eb)
logger.debug("Asking parent to revoke CA certificate %s", gski)
- parent.up_down_revoke_query(rpkid = rpkid, class_name = class_name, ski = gski, cb = parent_revoked, eb = eb)
+ r_msg = yield self.ca.parent.up_down_revoke_query(rpkid = rpkid, class_name = self.ca.parent_resource_class, ski = gski)
+
+ if r_msg[0].get("class_name") != self.ca.parent_resource_class:
+ raise rpki.exceptions.ResourceClassMismatch
+
+ if r_msg[0].get("ski") != gski:
+ raise rpki.exceptions.SKIMismatch
+
+ logger.debug("Parent revoked %s, starting cleanup", gski)
+
+ crl_interval = rpki.sundial.timedelta(seconds = self.ca.parent.tenant.crl_interval)
+
+ nextUpdate = rpki.sundial.now()
+
+ if self.latest_manifest is not None:
+ self.latest_manifest.extract_if_needed()
+ nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate())
- def update(self, rpkid, parent, ca, rc, sia_uri_changed, old_resources, callback, errback):
+ if self.latest_crl is not None:
+ nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate())
+
+ publisher = rpki.rpkid.publication_queue(rpkid)
+
+ for child_cert in self.child_certs.all():
+ nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter())
+ child_cert.revoke(publisher = publisher)
+
+ for roa in self.roas.all():
+ nextUpdate = nextUpdate.later(roa.cert.getNotAfter())
+ roa.revoke(publisher = publisher)
+
+ for ghostbuster in self.ghostbusters.all():
+ nextUpdate = nextUpdate.later(ghostbuster.cert.getNotAfter())
+ ghostbuster.revoke(publisher = publisher)
+
+ nextUpdate += crl_interval
+
+ self.generate_crl(publisher = publisher, nextUpdate = nextUpdate)
+ self.generate_manifest(publisher = publisher, nextUpdate = nextUpdate)
+ self.private_key_id = None
+ self.manifest_private_key_id = None
+ self.manifest_public_key = None
+ self.latest_manifest_cert = None
+ self.state = "revoked"
+ self.save()
+
+ yield publisher.call_pubd()
+
+
+ @tornado.gen.coroutine
+ def update(self, rpkid, parent, ca, rc, sia_uri_changed, old_resources):
"""
Need to get a new certificate for this ca_detail and perhaps frob
children of this ca_detail.
"""
- def issued(r_msg):
- c = r_msg[0][0]
- cert = rpki.x509.X509(Base64 = c.text)
- cert_url = c.get("cert_url")
- logger.debug("CA %r received certificate %s", self, cert_url)
- if self.state == "pending":
- return self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url, callback = callback, errback = errback)
- validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter()
- publisher = rpki.rpkid.publication_queue(rpkid)
- if self.latest_ca_cert != cert:
- self.latest_ca_cert = cert
- self.save()
- self.generate_manifest_cert()
- self.generate_crl(publisher = publisher)
- self.generate_manifest(publisher = publisher)
- new_resources = self.latest_ca_cert.get_3779resources()
- if sia_uri_changed or old_resources.oversized(new_resources):
- for child_cert in self.child_certs.all():
- child_resources = child_cert.cert.get_3779resources()
- if sia_uri_changed or child_resources.oversized(new_resources):
- child_cert.reissue(ca_detail = self, resources = child_resources & new_resources, publisher = publisher)
- if sia_uri_changed or validity_changed or old_resources.oversized(new_resources):
- for roa in self.roas.all():
- roa.update(publisher = publisher, fast = True)
- if sia_uri_changed or validity_changed:
- for ghostbuster in self.ghostbusters.all():
- ghostbuster.update(publisher = publisher, fast = True)
- publisher.call_pubd(callback, errback)
logger.debug("Sending issue request to %r from %r", parent, self.update)
- parent.up_down_issue_query(rpkid = rpkid, ca = ca, ca_detail = self, cb = issued, eb = errback)
+
+ r_msg = yield parent.up_down_issue_query(rpkid = rpkid, ca = ca, ca_detail = self)
+
+ c = r_msg[0][0]
+
+ cert = rpki.x509.X509(Base64 = c.text)
+ cert_url = c.get("cert_url")
+
+ logger.debug("CA %r received certificate %s", self, cert_url)
+
+ if self.state == "pending":
+ yield self.activate(rpkid = rpkid, ca = ca, cert = cert, uri = cert_url)
+ raise tornado.gen.Return
+
+ validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter()
+
+ publisher = rpki.rpkid.publication_queue(rpkid)
+
+ if self.latest_ca_cert != cert:
+ self.latest_ca_cert = cert
+ self.save()
+ self.generate_manifest_cert()
+ self.generate_crl(publisher = publisher)
+ self.generate_manifest(publisher = publisher)
+
+ new_resources = self.latest_ca_cert.get_3779resources()
+
+ if sia_uri_changed or old_resources.oversized(new_resources):
+ for child_cert in self.child_certs.all():
+ child_resources = child_cert.cert.get_3779resources()
+ if sia_uri_changed or child_resources.oversized(new_resources):
+ child_cert.reissue(ca_detail = self, resources = child_resources & new_resources, publisher = publisher)
+
+ if sia_uri_changed or validity_changed or old_resources.oversized(new_resources):
+ for roa in self.roas.all():
+ roa.update(publisher = publisher, fast = True)
+
+ if sia_uri_changed or validity_changed:
+ for ghostbuster in self.ghostbusters.all():
+ ghostbuster.update(publisher = publisher, fast = True)
+
+ yield publisher.call_pubd()
@classmethod
@@ -1179,9 +1239,13 @@ class CADetail(models.Model):
cer_keypair = rpki.x509.RSA.generate()
mft_keypair = rpki.x509.RSA.generate()
- return cls.objects.create(ca = ca, state = "pending",
- private_key_id = cer_keypair, public_key = cer_keypair.get_public(),
- manifest_private_key_id = mft_keypair, manifest_public_key = mft_keypair.get_public())
+ return cls.objects.create(
+ ca = ca,
+ state = "pending",
+ private_key_id = cer_keypair,
+ public_key = cer_keypair.get_public(),
+ manifest_private_key_id = mft_keypair,
+ manifest_public_key = mft_keypair.get_public())
def issue_ee(self, ca, resources, subject_key, sia,
@@ -1361,7 +1425,8 @@ class CADetail(models.Model):
self.save()
- def reissue(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def reissue(self, rpkid):
"""
Reissue all current certificates issued by this ca_detail.
"""
@@ -1381,7 +1446,7 @@ class CADetail(models.Model):
self.generate_crl(publisher = publisher)
self.generate_manifest(publisher = publisher)
self.save()
- publisher.call_pubd(cb, eb)
+ yield publisher.call_pubd()
def check_failed_publication(self, publisher, check_all = True):
@@ -1476,27 +1541,27 @@ class Child(models.Model):
elements = ("bpki_cert", "bpki_glue"))
- def xml_pre_delete_hook(self, rpkid, cb, eb):
+ @tornado.gen.coroutine
+ def xml_pre_delete_hook(self, rpkid):
publisher = rpki.rpkid.publication_queue(rpkid)
for child_cert in self.child_certs.all():
child_cert.revoke(publisher = publisher, generate_crl_and_manifest = True)
- publisher.call_pubd(cb, eb)
+ yield publisher.call_pubd()
- def xml_post_save_hook(self, rpkid, q_pdu, cb, eb):
+ @tornado.gen.coroutine
+ def xml_post_save_hook(self, rpkid, q_pdu):
if q_pdu.get("clear_replay_protection"):
self.clear_replay_protection()
if q_pdu.get("reissue"):
- self.serve_reissue(rpkid, cb, eb)
- else:
- cb()
+ yield self.serve_reissue(rpkid)
- def serve_reissue(self, rpkid, cb, eb):
+ def serve_reissue(self, rpkid):
publisher = rpki.rpkid.publication_queue(rpkid)
for child_cert in self.child_certs.all():
child_cert.reissue(child_cert.ca_detail, publisher, force = True)
- publisher.call_pubd(cb, eb)
+ yield publisher.call_pubd()
def clear_replay_protection(self):
@@ -1504,82 +1569,40 @@ class Child(models.Model):
self.save()
- def up_down_handle_list(self, rpkid, q_msg, r_msg, callback, errback):
- def got_resources(irdb_resources):
- if irdb_resources.valid_until < rpki.sundial.now():
- logger.debug("Child %s's resources expired %s", self.child_handle, irdb_resources.valid_until)
- else:
- for ca_detail in CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"):
- resources = ca_detail.latest_ca_cert.get_3779resources() & irdb_resources
- if resources.empty():
- logger.debug("No overlap between received resources and what child %s should get ([%s], [%s])",
- self.child_handle, ca_detail.latest_ca_cert.get_3779resources(), irdb_resources)
- continue
- rc = SubElement(r_msg, rpki.up_down.tag_class,
- class_name = ca_detail.ca.parent_resource_class,
- cert_url = ca_detail.ca_cert_uri,
- resource_set_as = str(resources.asn),
- resource_set_ipv4 = str(resources.v4),
- resource_set_ipv6 = str(resources.v6),
- resource_set_notafter = str(resources.valid_until))
- for child_cert in self.child_certs.filter(ca_detail = ca_detail):
- c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri)
- c.text = child_cert.cert.get_Base64()
- SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64()
- callback()
- rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle, got_resources, errback)
-
-
- def up_down_handle_issue(self, rpkid, q_msg, r_msg, callback, errback):
-
- def got_resources(irdb_resources):
-
- def done():
+ @tornado.gen.coroutine
+ def up_down_handle_list(self, rpkid, q_msg, r_msg):
+
+ irdb_resources = yield rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle)
+
+ if irdb_resources.valid_until < rpki.sundial.now():
+ logger.debug("Child %s's resources expired %s", self.child_handle, irdb_resources.valid_until)
+
+ else:
+
+ for ca_detail in CADetail.objects.filter(ca__parent__tenant = self.tenant, state = "active"):
+ resources = ca_detail.latest_ca_cert.get_3779resources() & irdb_resources
+
+ if resources.empty():
+ logger.debug("No overlap between received resources and what child %s should get ([%s], [%s])",
+ self.child_handle, ca_detail.latest_ca_cert.get_3779resources(), irdb_resources)
+ continue
+
rc = SubElement(r_msg, rpki.up_down.tag_class,
- class_name = class_name,
+ class_name = ca_detail.ca.parent_resource_class,
cert_url = ca_detail.ca_cert_uri,
resource_set_as = str(resources.asn),
resource_set_ipv4 = str(resources.v4),
resource_set_ipv6 = str(resources.v6),
resource_set_notafter = str(resources.valid_until))
- c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri)
- c.text = child_cert.cert.get_Base64()
- SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64()
- callback()
-
- if irdb_resources.valid_until < rpki.sundial.now():
- raise rpki.exceptions.IRDBExpired("IRDB entry for child %s expired %s" % (
- self.child_handle, irdb_resources.valid_until))
-
- resources = irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
- resources.valid_until = irdb_resources.valid_until
- req_key = pkcs10.getPublicKey()
- req_sia = pkcs10.get_SIA()
-
- # Generate new cert or regenerate old one if necessary
- publisher = rpki.rpkid.publication_queue(rpkid)
-
- try:
- child_cert = self.child_certs.get(ca_detail = ca_detail, ski = req_key.get_SKI())
-
- except ChildCert.DoesNotExist:
- child_cert = ca_detail.issue(
- ca = ca_detail.ca,
- child = self,
- subject_key = req_key,
- sia = req_sia,
- resources = resources,
- publisher = publisher)
+ for child_cert in self.child_certs.filter(ca_detail = ca_detail):
+ c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri)
+ c.text = child_cert.cert.get_Base64()
+ SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64()
- else:
- child_cert = child_cert.reissue(
- ca_detail = ca_detail,
- sia = req_sia,
- resources = resources,
- publisher = publisher)
- publisher.call_pubd(done, errback)
+ @tornado.gen.coroutine
+ def up_down_handle_issue(self, rpkid, q_msg, r_msg):
req = q_msg[0]
assert req.tag == rpki.up_down.tag_request
@@ -1591,24 +1614,60 @@ class Child(models.Model):
class_name = req.get("class_name")
pkcs10 = rpki.x509.PKCS10(Base64 = req.text)
+ pkcs10.check_valid_request_ca()
+ ca_detail = CADetail.objects.get(ca__parent__tenant = self.tenant, state = "active",
+ ca__parent_resource_class = class_name)
- # XXX
- logger.debug("Child.up_down_handle_issue(): PKCS #10 %s", pkcs10.get_Base64())
- sia = pkcs10.get_SIA()
- logger.debug("Child.up_down_handle_issue(): PKCS #10 SIA %r (%r, %r, %r, %r) %r",
- type(sia), type(sia[0]), type(sia[1]), type(sia[2]), type(sia[3]), sia)
+ irdb_resources = yield rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle)
- pkcs10.check_valid_request_ca()
- ca_detail = CADetail.objects.get(ca__parent__tenant = self.tenant,
- ca__parent_resource_class = class_name,
- state = "active")
- rpkid.irdb_query_child_resources(self.tenant.tenant_handle, self.child_handle, got_resources, errback)
+ if irdb_resources.valid_until < rpki.sundial.now():
+ raise rpki.exceptions.IRDBExpired("IRDB entry for child %s expired %s" % (
+ self.child_handle, irdb_resources.valid_until))
+
+ resources = irdb_resources & ca_detail.latest_ca_cert.get_3779resources()
+ resources.valid_until = irdb_resources.valid_until
+ req_key = pkcs10.getPublicKey()
+ req_sia = pkcs10.get_SIA()
+
+ # Generate new cert or regenerate old one if necessary
+
+ publisher = rpki.rpkid.publication_queue(rpkid)
+
+ try:
+ child_cert = self.child_certs.get(ca_detail = ca_detail, ski = req_key.get_SKI())
+ except ChildCert.DoesNotExist:
+ child_cert = ca_detail.issue(
+ ca = ca_detail.ca,
+ child = self,
+ subject_key = req_key,
+ sia = req_sia,
+ resources = resources,
+ publisher = publisher)
- def up_down_handle_revoke(self, rpkid, q_msg, r_msg, callback, errback):
- def done():
- SubElement(r_msg, key.tag, class_name = class_name, ski = key.get("ski"))
- callback()
+ else:
+ child_cert = child_cert.reissue(
+ ca_detail = ca_detail,
+ sia = req_sia,
+ resources = resources,
+ publisher = publisher)
+
+ yield publisher.call_pubd()
+
+ rc = SubElement(r_msg, rpki.up_down.tag_class,
+ class_name = class_name,
+ cert_url = ca_detail.ca_cert_uri,
+ resource_set_as = str(resources.asn),
+ resource_set_ipv4 = str(resources.v4),
+ resource_set_ipv6 = str(resources.v6),
+ resource_set_notafter = str(resources.valid_until))
+ c = SubElement(rc, rpki.up_down.tag_certificate, cert_url = child_cert.uri)
+ c.text = child_cert.cert.get_Base64()
+ SubElement(rc, rpki.up_down.tag_issuer).text = ca_detail.latest_ca_cert.get_Base64()
+
+
+ @tornado.gen.coroutine
+ def up_down_handle_revoke(self, rpkid, q_msg, r_msg):
key = q_msg[0]
assert key.tag == rpki.up_down.tag_key
class_name = key.get("class_name")
@@ -1618,37 +1677,27 @@ class Child(models.Model):
ca_detail__ca__parent_resource_class = class_name,
ski = ski):
child_cert.revoke(publisher = publisher)
- publisher.call_pubd(done, errback)
+ yield publisher.call_pubd()
+ SubElement(r_msg, key.tag, class_name = class_name, ski = key.get("ski"))
- def serve_up_down(self, rpkid, q_der, callback):
+ @tornado.gen.coroutine
+ def serve_up_down(self, rpkid, q_der):
"""
Outer layer of server handling for one up-down PDU from this child.
"""
- def done():
- callback(rpki.up_down.cms_msg().wrap(r_msg,
- self.bsc.private_key_id,
- self.bsc.signing_cert,
- self.bsc.signing_cert_crl))
-
- def lose(e):
- logger.exception("Unhandled exception serving child %r", self)
- rpki.up_down.generate_error_response_from_exception(r_msg, e, q_type)
- done()
-
if self.bsc is None:
raise rpki.exceptions.BSCNotFound("Could not find BSC")
+
q_cms = rpki.up_down.cms_msg(DER = q_der)
- q_msg = q_cms.unwrap((rpkid.bpki_ta,
- self.tenant.bpki_cert,
- self.tenant.bpki_glue,
- self.bpki_cert,
- self.bpki_glue))
+ q_msg = q_cms.unwrap((rpkid.bpki_ta, self.tenant.bpki_cert, self.tenant.bpki_glue, self.bpki_cert, self.bpki_glue))
q_cms.check_replay_sql(self, "child", self.child_handle)
q_type = q_msg.get("type")
+
logger.info("Serving %s query from child %s [sender %s, recipient %s]",
q_type, self.child_handle, q_msg.get("sender"), q_msg.get("recipient"))
+
if rpki.up_down.enforce_strict_up_down_xml_sender and q_msg.get("sender") != self.child_handle:
raise rpki.exceptions.BadSender("Unexpected XML sender %s" % q_msg.get("sender"))
@@ -1656,12 +1705,14 @@ class Child(models.Model):
sender = q_msg.get("recipient"), recipient = q_msg.get("sender"), type = q_type + "_response")
try:
- getattr(self, "up_down_handle_" + q_type)(rpkid, q_msg, r_msg, done, lose)
- except (rpki.async.ExitNow, SystemExit):
- raise
+ yield getattr(self, "up_down_handle_" + q_type)(rpkid, q_msg, r_msg)
+
except Exception, e:
- lose(e)
+ logger.exception("Unhandled exception serving child %r", self)
+ rpki.up_down.generate_error_response_from_exception(r_msg, e, q_type)
+ r_der = rpki.up_down.cms_msg().wrap(r_msg, self.bsc.private_key_id, self.bsc.signing_cert, self.bsc.signing_cert_crl)
+ raise tornado.gen.Return(r_der)
class ChildCert(models.Model):
cert = CertificateField()
@@ -1840,8 +1891,7 @@ class EECertificate(models.Model):
cn = cn,
sn = sn,
eku = eku)
- self = cls(ca_detail = ca_detail, cert = cert, ski = subject_key.get_SKI())
- self.tenant = ca_detail.ca.parent.tenant
+ self = cls(tenant = ca_detail.ca.parent.tenant, ca_detail = ca_detail, cert = cert, ski = subject_key.get_SKI())
publisher.queue(
uri = self.uri,
new_obj = self.cert,
diff --git a/rpki/up_down.py b/rpki/up_down.py
index 1303647e..90965e45 100644
--- a/rpki/up_down.py
+++ b/rpki/up_down.py
@@ -38,8 +38,8 @@ version = "1"
## @var content_type
# MIME content type to use when sending up-down queries.
-#content_type = "application/rpki-updown"
-content_type = "application/x-rpki"
+content_type = "application/rpki-updown"
+#content_type = "application/x-rpki"
## @var allowed_content_types
# MIME content types which we consider acceptable for incoming up-down
diff --git a/rpki/x509.py b/rpki/x509.py
index 1be2f9a3..16981d06 100644
--- a/rpki/x509.py
+++ b/rpki/x509.py
@@ -47,7 +47,6 @@ import rpki.resource_set
import rpki.oids
import rpki.sundial
import rpki.log
-import rpki.async
import rpki.relaxng
logger = logging.getLogger(__name__)
@@ -1510,9 +1509,7 @@ class CMS_object(DER_object):
try:
cms = self.get_POW()
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
+ except:
if self.print_on_der_error:
logger.debug("Problem parsing DER CMS message, might not really be DER: %r",
self.get_DER())
@@ -1593,9 +1590,7 @@ class CMS_object(DER_object):
try:
content = cms.verify(store)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
+ except:
if self.dump_on_verify_failure:
if self.dump_using_dumpasn1:
dbg = self.dumpasn1()
@@ -1623,9 +1618,7 @@ class CMS_object(DER_object):
try:
cms = self.get_POW()
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
+ except:
raise rpki.exceptions.UnparsableCMSDER
if cms.eContentType() != self.econtent_oid: