# $Id$
#
# Copyright (C) 2013--2014 Dragon Research Labs ("DRL")
# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL,
# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
RPKI CA engine.
"""
import os
import re
import time
import random
import logging
import argparse
import rpki.resource_set
import rpki.up_down
import rpki.left_right
import rpki.x509
import rpki.http
import rpki.config
import rpki.exceptions
import rpki.relaxng
import rpki.log
import rpki.async
import rpki.daemonize
import rpki.rpkid_tasks
from lxml.etree import Element, SubElement, tostring as ElementToString
logger = logging.getLogger(__name__)
class main(object):
"""
Main program for rpkid.
"""
def __init__(self):
os.environ.update(TZ = "UTC",
DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid")
time.tzset()
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
self.task_current = None
self.task_queue = []
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("-c", "--config",
help = "override default location of configuration file")
parser.add_argument("-f", "--foreground", action = "store_true",
help = "do not daemonize")
parser.add_argument("--pidfile",
help = "override default location of pid file")
parser.add_argument("--profile",
help = "enable profiling, saving data to PROFILE")
rpki.log.argparse_setup(parser)
args = parser.parse_args()
self.profile = args.profile
rpki.log.init("rpkid", args)
self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid")
self.cfg.set_global_flags()
if not args.foreground:
rpki.daemonize.daemon(pidfile = args.pidfile)
if self.profile:
import cProfile
prof = cProfile.Profile()
try:
prof.runcall(self.main)
finally:
prof.dump_stats(self.profile)
logger.info("Dumped profile data to %s", self.profile)
else:
self.main()
def main(self):
startup_msg = self.cfg.get("startup-message", "")
if startup_msg:
logger.info(startup_msg)
if self.profile:
logger.info("Running in profile mode with output to %s", self.profile)
logger.debug("Initializing Django")
import django
django.setup()
logger.debug("Initializing rpkidb...")
global rpki # pylint: disable=W0602
import rpki.rpkidb # pylint: disable=W0621
logger.debug("Initializing rpkidb...done")
self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert"))
self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert"))
self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key"))
self.irdb_url = self.cfg.get("irdb-url")
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/")
self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
self.cfg.getint("initial-delay-max", 120))
# Should be much longer in production
self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120))
self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0))
if not self.cron_keepalive:
self.cron_keepalive = self.cron_period * 4
self.cron_timeout = None
self.start_cron()
rpki.http.server(
host = self.http_server_host,
port = self.http_server_port,
handlers = (("/left-right", self.left_right_handler),
("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types),
("/cronjob", self.cronjob_handler)))
def start_cron(self):
"""
Start clock for rpkid's internal cron process.
"""
if self.use_internal_cron:
self.cron_timer = rpki.async.timer(handler = self.cron)
when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay)
logger.debug("Scheduling initial cron pass at %s", when)
self.cron_timer.set(when)
else:
logger.debug("Not using internal clock, start_cron() call ignored")
@staticmethod
def _compose_left_right_query():
"""
Compose top level element of a left-right query to irdbd.
"""
return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "query", version = rpki.left_right.version)
def irdb_query(self, q_msg, callback, errback):
"""
Perform an IRDB callback query.
"""
try:
q_tags = set(q_pdu.tag for q_pdu in q_msg)
q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert)
def unwrap(r_der):
try:
r_cms = rpki.left_right.cms_msg(DER = r_der)
r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert))
self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url)
#rpki.left_right.check_response(r_msg)
if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg):
raise rpki.exceptions.BadIRDBReply(
"Unexpected response to IRDB query: %s" % r_cms.pretty_print_content())
callback(r_msg)
except Exception, e:
errback(e)
rpki.http.client(
url = self.irdb_url,
msg = q_der,
callback = unwrap,
errback = errback)
except Exception, e:
errback(e)
def irdb_query_child_resources(self, tenant_handle, child_handle, callback, errback):
"""
Ask IRDB about a child's resources.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_resources,
tenant_handle = tenant_handle, child_handle = child_handle)
def done(r_msg):
if len(r_msg) != 1:
raise rpki.exceptions.BadIRDBReply(
"Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content())
callback(rpki.resource_set.resource_bag(
asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")),
v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")),
v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")),
valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))))
self.irdb_query(q_msg, done, errback)
def irdb_query_roa_requests(self, tenant_handle, callback, errback):
"""
Ask IRDB about self's ROA requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle)
self.irdb_query(q_msg, callback, errback)
def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles, callback, errback):
"""
Ask IRDB about self's ghostbuster record requests.
"""
q_msg = self._compose_left_right_query()
for parent_handle in parent_handles:
SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests,
tenant_handle = tenant_handle, parent_handle = parent_handle)
self.irdb_query(q_msg, callback, errback)
def irdb_query_ee_certificate_requests(self, tenant_handle, callback, errback):
"""
Ask IRDB about self's EE certificate requests.
"""
q_msg = self._compose_left_right_query()
SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle)
self.irdb_query(q_msg, callback, errback)
@property
def left_right_models(self):
"""
Map element tag to rpkidb model.
"""
try:
return self._left_right_models
except AttributeError:
import rpki.rpkidb.models # pylint: disable=W0621
self._left_right_models = {
rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant,
rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC,
rpki.left_right.tag_parent : rpki.rpkidb.models.Parent,
rpki.left_right.tag_child : rpki.rpkidb.models.Child,
rpki.left_right.tag_repository : rpki.rpkidb.models.Repository }
return self._left_right_models
@property
def left_right_trivial_handlers(self):
"""
Map element tag to bound handler methods for trivial PDU types.
"""
try:
return self._left_right_trivial_handlers
except AttributeError:
self._left_right_trivial_handlers = {
rpki.left_right.tag_list_published_objects : self.handle_list_published_objects,
rpki.left_right.tag_list_received_resources : self.handle_list_received_resources }
return self._left_right_trivial_handlers
def handle_list_published_objects(self, q_pdu, r_msg):
"""
server.
"""
tenant_handle = q_pdu.get("tenant_handle")
msg_tag = q_pdu.get("tag")
kw = dict(tenant_handle = tenant_handle)
if msg_tag is not None:
kw.update(tag = msg_tag)
for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"):
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64()
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64()
for c in ca_detail.child_certs.all():
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64()
for r in ca_detail.roas.filter(roa__isnull = False):
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = r.uri, **kw).text = r.roa.get_Base64()
for g in ca_detail.ghostbusters.all():
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = g.uri, **kw).text = g.ghostbuster.get_Base64()
for c in ca_detail.ee_certificates.all():
SubElement(r_msg, rpki.left_right.tag_list_published_objects,
uri = c.uri, **kw).text = c.cert.get_Base64()
def handle_list_received_resources(self, q_pdu, r_msg):
"""
server.
"""
logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu))
tenant_handle = q_pdu.get("tenant_handle")
msg_tag = q_pdu.get("tag")
for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle,
state = "active", latest_ca_cert__isnull = False):
cert = ca_detail.latest_ca_cert
resources = cert.get_3779resources()
r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources,
tenant_handle = tenant_handle,
parent_handle = ca_detail.ca.parent.parent_handle,
uri = ca_detail.ca_cert_uri,
notBefore = str(cert.getNotBefore()),
notAfter = str(cert.getNotAfter()),
sia_uri = cert.get_sia_directory_uri(),
aia_uri = cert.get_aia_uri(),
asn = str(resources.asn),
ipv4 = str(resources.v4),
ipv6 = str(resources.v6))
if msg_tag is not None:
r_pdu.set("tag", msg_tag)
def left_right_handler(self, query, path, cb):
"""
Process one left-right PDU.
"""
# This handles five persistent classes (self, bsc, parent, child,
# repository) and two simple queries (list_published_objects and
# list_received_resources). The former probably need to dispatch
# via methods to the corresponding model classes; the latter
# probably just become calls to ordinary methods of this
# (rpki.rpkid.main) class.
#
# Need to clone logic from rpki.pubd.main.control_handler().
logger.debug("Entering left_right_handler()")
try:
q_cms = rpki.left_right.cms_msg(DER = query)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap,
type = "reply", version = rpki.left_right.version)
self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path)
assert q_msg.tag.startswith(rpki.left_right.xmlns)
assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg)
if q_msg.get("version") != rpki.left_right.version:
raise rpki.exceptions.BadQuery("Unrecognized protocol version")
if q_msg.get("type") != "query":
raise rpki.exceptions.BadQuery("Message type is not query")
def done():
cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
def loop(iterator, q_pdu):
logger.debug("left_right_handler():loop(%r)", q_pdu)
def fail(e):
if not isinstance(e, rpki.exceptions.NotFound):
logger.exception("Unhandled exception serving left-right PDU %r", q_pdu)
error_tenant_handle = q_pdu.get("tenant_handle")
error_tag = q_pdu.get("tag")
r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__)
r_pdu.text = str(e)
if error_tag is not None:
r_pdu.set("tag", error_tag)
if error_tenant_handle is not None:
r_pdu.set("tenant_handle", error_tenant_handle)
cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert))
try:
if q_pdu.tag in self.left_right_trivial_handlers:
logger.debug("left_right_handler(): trivial handler")
self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg)
iterator()
else:
action = q_pdu.get("action")
model = self.left_right_models[q_pdu.tag]
logger.debug("left_right_handler(): action %s model %r", action, model)
if action in ("get", "list"):
logger.debug("left_right_handler(): get/list")
for obj in model.objects.xml_list(q_pdu):
logger.debug("left_right_handler(): get/list: encoding %r", obj)
obj.xml_template.encode(obj, q_pdu, r_msg)
iterator()
elif action == "destroy":
def destroy_cb():
obj.delete()
obj.xml_template.acknowledge(obj, q_pdu, r_msg)
iterator()
logger.debug("left_right_handler(): destroy")
obj = model.objects.xml_get_for_delete(q_pdu)
obj.xml_pre_delete_hook(self, destroy_cb, fail)
elif action in ("create", "set"):
def create_set_cb():
obj.xml_template.acknowledge(obj, q_pdu, r_msg)
iterator()
logger.debug("left_right_handler(): create/set")
obj = model.objects.xml_get_or_create(q_pdu)
obj.xml_template.decode(obj, q_pdu)
obj.xml_pre_save_hook(q_pdu)
obj.save()
obj.xml_post_save_hook(self, q_pdu, create_set_cb, fail)
else:
raise rpki.exceptions.BadQuery
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
fail(e)
rpki.async.iterator(q_msg, loop, done)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
logger.exception("Unhandled exception serving left-right request")
cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I)
def up_down_handler(self, q_der, path, cb):
"""
Process one up-down PDU.
"""
def done(r_der):
cb(200, body = r_der)
try:
match = self.up_down_url_regexp.search(path)
if match is None:
raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path)
tenant_handle, child_handle = match.groups()
try:
child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle)
except rpki.rpkidb.models.Child.DoesNotExist:
raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % (
child_handle, tenant_handle))
child.serve_up_down(self, q_der, done)
except (rpki.async.ExitNow, SystemExit):
raise
except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e:
logger.warning(str(e))
cb(400, reason = str(e))
except Exception, e:
logger.exception("Unhandled exception processing up-down request")
cb(400, reason = "Could not process PDU: %s" % e)
def checkpoint(self, force = False):
"""
Record that we were still alive when we got here, by resetting
keepalive timer.
"""
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
def task_add(self, task):
"""
Add a task to the scheduler task queue, unless it's already queued.
"""
if task not in self.task_queue:
logger.debug("Adding %r to task queue", task)
self.task_queue.append(task)
return True
else:
logger.debug("Task %r was already in the task queue", task)
return False
def task_next(self):
"""
Pull next task from the task queue and put it the deferred event
queue (we don't want to run it directly, as that could eventually
blow out our call stack).
"""
try:
self.task_current = self.task_queue.pop(0)
except IndexError:
self.task_current = None
else:
rpki.async.event_defer(self.task_current)
def task_run(self):
"""
Run first task on the task queue, unless one is running already.
"""
if self.task_current is None:
self.task_next()
def cron(self, cb = None):
"""
Periodic tasks.
"""
now = rpki.sundial.now()
logger.debug("Starting cron run")
def done():
self.cron_timeout = None
logger.info("Finished cron run started at %s", now)
if cb is not None:
cb()
completion = rpki.rpkid_tasks.CompletionHandler(done)
try:
selves = rpki.rpkidb.models.Tenant.objects.all()
except Exception:
logger.exception("Error pulling selves from SQL, maybe SQL server is down?")
else:
for s in selves:
s.schedule_cron_tasks(self, completion)
nothing_queued = completion.count == 0
assert self.use_internal_cron or self.cron_timeout is None
if self.cron_timeout is not None and self.cron_timeout < now:
logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout)
self.cron_timeout = None
if self.use_internal_cron:
when = now + self.cron_period
logger.debug("Scheduling next cron run at %s", when)
self.cron_timer.set(when)
if self.cron_timeout is None:
self.checkpoint(self.use_internal_cron)
self.task_run()
elif self.use_internal_cron:
logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout)
if nothing_queued:
done()
def cronjob_handler(self, query, path, cb):
"""
External trigger for periodic tasks. This is somewhat obsolete
now that we have internal timers, but the test framework still
uses it.
"""
def done():
cb(200, body = "OK")
if self.use_internal_cron:
cb(500, reason = "Running cron internally")
else:
logger.debug("Starting externally triggered cron")
self.cron(done)
class publication_queue(object):
"""
Utility to simplify publication from within rpkid.
General idea here is to accumulate a collection of objects to be
published, in one or more repositories, each potentially with its
own completion callback. Eventually we want to publish everything
we've accumulated, at which point we need to iterate over the
collection and do repository.call_pubd() for each repository.
"""
replace = True
def __init__(self, rpkid):
self.rpkid = rpkid
self.clear()
def clear(self):
self.repositories = {}
self.msgs = {}
self.handlers = {}
if self.replace:
self.uris = {}
def queue(self, uri, repository, handler = None,
old_obj = None, new_obj = None, old_hash = None):
assert old_obj is not None or new_obj is not None or old_hash is not None
assert old_obj is None or old_hash is None
assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri))
assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri))
logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s",
uri, old_obj, new_obj, old_hash)
# id(repository) may need to change to repository.peer_contact_uri
# once we convert from our custom SQL cache to Django ORM.
rid = id(repository)
if rid not in self.repositories:
self.repositories[rid] = repository
self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
type = "query", version = rpki.publication.version)
if self.replace and uri in self.uris:
logger.debug("Removing publication duplicate %r", self.uris[uri])
old_pdu = self.uris.pop(uri)
self.msgs[rid].remove(old_pdu)
pdu_hash = old_pdu.get("hash")
elif old_hash is not None:
pdu_hash = old_hash
elif old_obj is None:
pdu_hash = None
else:
pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex")
if new_obj is None:
pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash)
else:
pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri)
pdu.text = new_obj.get_Base64()
if pdu_hash is not None:
pdu.set("hash", pdu_hash)
if handler is not None:
tag = str(id(pdu))
self.handlers[tag] = handler
pdu.set("tag", tag)
if self.replace:
self.uris[uri] = pdu
def call_pubd(self, cb, eb):
def loop(iterator, rid):
logger.debug("Calling pubd[%r]", self.repositories[rid])
self.repositories[rid].call_pubd(self.rpkid, iterator, eb, self.msgs[rid], self.handlers)
def done():
self.clear()
cb()
rpki.async.iterator(self.repositories, loop, done)
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)
def empty(self):
assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size)
return not self.msgs