diff options
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r-- | rpki/rpkid.py | 2990 |
1 files changed, 644 insertions, 2346 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 628209af..e647fe12 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -22,2470 +22,768 @@ RPKI CA engine. """ import os -import re import time import random -import base64 import logging +import weakref import argparse +import urlparse + +import tornado.gen +import tornado.web +import tornado.locks +import tornado.ioloop +import tornado.queues +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + +from lxml.etree import Element, SubElement, tostring as ElementToString + import rpki.resource_set import rpki.up_down import rpki.left_right import rpki.x509 -import rpki.sql -import rpki.http import rpki.config import rpki.exceptions import rpki.relaxng import rpki.log -import rpki.async import rpki.daemonize -import rpki.rpkid_tasks - -logger = logging.getLogger(__name__) - -class main(object): - """ - Main program for rpkid. - """ - - def __init__(self): - - os.environ["TZ"] = "UTC" - time.tzset() - - self.irdbd_cms_timestamp = None - self.irbe_cms_timestamp = None - self.task_current = None - self.task_queue = [] - - parser = argparse.ArgumentParser(description = __doc__) - parser.add_argument("-c", "--config", - help = "override default location of configuration file") - parser.add_argument("-f", "--foreground", action = "store_true", - help = "do not daemonize") - parser.add_argument("--pidfile", - help = "override default location of pid file") - parser.add_argument("--profile", - help = "enable profiling, saving data to PROFILE") - rpki.log.argparse_setup(parser) - args = parser.parse_args() - - self.profile = args.profile - - rpki.log.init("rpkid", args) - - self.cfg = rpki.config.parser(args.config, "rpkid") - self.cfg.set_global_flags() - - if not args.foreground: - rpki.daemonize.daemon(pidfile = args.pidfile) - - if self.profile: - import cProfile - prof = cProfile.Profile() - try: - prof.runcall(self.main) - finally: - prof.dump_stats(self.profile) - logger.info("Dumped profile data to %s", self.profile) - else: - self.main() - - def main(self): - - startup_msg = self.cfg.get("startup-message", "") - if startup_msg: - logger.info(startup_msg) - - if self.profile: - logger.info("Running in profile mode with output to %s", self.profile) - - self.sql = rpki.sql.session(self.cfg) - - self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) - self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert")) - self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) - self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert")) - self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key")) - - self.irdb_url = self.cfg.get("irdb-url") - - self.http_server_host = self.cfg.get("server-host", "") - self.http_server_port = self.cfg.getint("server-port") - - self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/") - - # Icky hack to let Iain do some testing quickly, should go away - # once we sort out whether we can make this change permanent. - # - # OK, the stuff to add router certificate support makes enough - # other changes that we're going to need a migration program in - # any case, so might as well throw the switch here too, or at - # least find out if it (still) works as expected. - - self.merge_publication_directories = self.cfg.getboolean("merge_publication_directories", - True) - - self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) - - self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), - self.cfg.getint("initial-delay-max", 120)) - - # Should be much longer in production - self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120)) - self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0)) - if not self.cron_keepalive: - self.cron_keepalive = self.cron_period * 4 - self.cron_timeout = None - - self.start_cron() - - rpki.http.server( - host = self.http_server_host, - port = self.http_server_port, - handlers = (("/left-right", self.left_right_handler), - ("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types), - ("/cronjob", self.cronjob_handler))) - - def start_cron(self): - """ - Start clock for rpkid's internal cron process. - """ - - if self.use_internal_cron: - self.cron_timer = rpki.async.timer(handler = self.cron) - when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay) - logger.debug("Scheduling initial cron pass at %s", when) - self.cron_timer.set(when) - else: - logger.debug("Not using internal clock, start_cron() call ignored") - - def irdb_query(self, callback, errback, *q_pdus, **kwargs): - """ - Perform an IRDB callback query. - """ - - try: - q_types = tuple(type(q_pdu) for q_pdu in q_pdus) - - expected_pdu_count = kwargs.pop("expected_pdu_count", None) - assert len(kwargs) == 0 - - q_msg = rpki.left_right.msg.query() - q_msg.extend(q_pdus) - q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) - - def unwrap(r_der): - try: - r_cms = rpki.left_right.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) - self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - if not r_msg.is_reply() or not all(type(r_pdu) in q_types for r_pdu in r_msg): - raise rpki.exceptions.BadIRDBReply( - "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - if expected_pdu_count is not None and len(r_msg) != expected_pdu_count: - assert isinstance(expected_pdu_count, (int, long)) - raise rpki.exceptions.BadIRDBReply( - "Expected exactly %d PDU%s from IRDB: %s" % ( - expected_pdu_count, "" if expected_pdu_count == 1 else "s", - r_cms.pretty_print_content())) - callback(r_msg) - except Exception, e: - errback(e) - - rpki.http.client( - url = self.irdb_url, - msg = q_der, - callback = unwrap, - errback = errback) - - except Exception, e: - errback(e) - - - def irdb_query_child_resources(self, self_handle, child_handle, callback, errback): - """ - Ask IRDB about a child's resources. - """ - - q_pdu = rpki.left_right.list_resources_elt() - q_pdu.self_handle = self_handle - q_pdu.child_handle = child_handle - - def done(r_msg): - callback(rpki.resource_set.resource_bag( - asn = r_msg[0].asn, - v4 = r_msg[0].ipv4, - v6 = r_msg[0].ipv6, - valid_until = r_msg[0].valid_until)) - - self.irdb_query(done, errback, q_pdu, expected_pdu_count = 1) - - def irdb_query_roa_requests(self, self_handle, callback, errback): - """ - Ask IRDB about self's ROA requests. - """ - - q_pdu = rpki.left_right.list_roa_requests_elt() - q_pdu.self_handle = self_handle - - self.irdb_query(callback, errback, q_pdu) - - def irdb_query_ghostbuster_requests(self, self_handle, parent_handles, callback, errback): - """ - Ask IRDB about self's ghostbuster record requests. - """ - - q_pdus = [] - - for parent_handle in parent_handles: - q_pdu = rpki.left_right.list_ghostbuster_requests_elt() - q_pdu.self_handle = self_handle - q_pdu.parent_handle = parent_handle - q_pdus.append(q_pdu) - - self.irdb_query(callback, errback, *q_pdus) - - def irdb_query_ee_certificate_requests(self, self_handle, callback, errback): - """ - Ask IRDB about self's EE certificate requests. - """ - - q_pdu = rpki.left_right.list_ee_certificate_requests_elt() - q_pdu.self_handle = self_handle - - self.irdb_query(callback, errback, q_pdu) - - def left_right_handler(self, query, path, cb): - """ - Process one left-right PDU. - """ - - def done(r_msg): - reply = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert) - self.sql.sweep() - cb(200, body = reply) - - try: - q_cms = rpki.left_right.cms_msg(DER = query) - q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) - self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path) - if not q_msg.is_query(): - raise rpki.exceptions.BadQuery("Message type is not query") - q_msg.serve_top_level(self, done) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - logger.exception("Unhandled exception serving left-right request") - cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) - - up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I) - - def up_down_handler(self, query, path, cb): - """ - Process one up-down PDU. - """ - - def done(reply): - self.sql.sweep() - cb(200, body = reply) - - try: - match = self.up_down_url_regexp.search(path) - if match is None: - raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path) - self_handle, child_handle = match.groups() - child = rpki.left_right.child_elt.sql_fetch_where1(self, - "self.self_handle = %s AND child.child_handle = %s AND child.self_id = self.self_id", - (self_handle, child_handle), - "self") - if child is None: - raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % (child_handle, self_handle)) - child.serve_up_down(query, done) - except (rpki.async.ExitNow, SystemExit): - raise - except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e: - logger.warning(str(e)) - cb(400, reason = str(e)) - except Exception, e: - logger.exception("Unhandled exception processing up-down request") - cb(400, reason = "Could not process PDU: %s" % e) - - def checkpoint(self, force = False): - """ - Record that we were still alive when we got here, by resetting - keepalive timer. - """ - if force or self.cron_timeout is not None: - self.cron_timeout = rpki.sundial.now() + self.cron_keepalive - - def task_add(self, task): - """ - Add a task to the scheduler task queue, unless it's already queued. - """ - if task not in self.task_queue: - logger.debug("Adding %r to task queue", task) - self.task_queue.append(task) - return True - else: - logger.debug("Task %r was already in the task queue", task) - return False - - def task_next(self): - """ - Pull next task from the task queue and put it the deferred event - queue (we don't want to run it directly, as that could eventually - blow out our call stack). - """ - try: - self.task_current = self.task_queue.pop(0) - except IndexError: - self.task_current = None - else: - rpki.async.event_defer(self.task_current) - - def task_run(self): - """ - Run first task on the task queue, unless one is running already. - """ - if self.task_current is None: - self.task_next() - - def cron(self, cb = None): - """ - Periodic tasks. - """ - - now = rpki.sundial.now() - - logger.debug("Starting cron run") - - def done(): - self.sql.sweep() - self.cron_timeout = None - logger.info("Finished cron run started at %s", now) - if cb is not None: - cb() - - completion = rpki.rpkid_tasks.CompletionHandler(done) - try: - selves = rpki.left_right.self_elt.sql_fetch_all(self) - except Exception: - logger.exception("Error pulling self_elts from SQL, maybe SQL server is down?") - else: - for s in selves: - s.schedule_cron_tasks(completion) - nothing_queued = completion.count == 0 - - assert self.use_internal_cron or self.cron_timeout is None - - if self.cron_timeout is not None and self.cron_timeout < now: - logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout) - self.cron_timeout = None - - if self.use_internal_cron: - when = now + self.cron_period - logger.debug("Scheduling next cron run at %s", when) - self.cron_timer.set(when) - - if self.cron_timeout is None: - self.checkpoint(self.use_internal_cron) - self.task_run() - - elif self.use_internal_cron: - logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout) - - if nothing_queued: - done() - - def cronjob_handler(self, query, path, cb): - """ - External trigger for periodic tasks. This is somewhat obsolete - now that we have internal timers, but the test framework still - uses it. - """ - - def done(): - cb(200, body = "OK") - - if self.use_internal_cron: - cb(500, reason = "Running cron internally") - else: - logger.debug("Starting externally triggered cron") - self.cron(done) - -class ca_obj(rpki.sql.sql_persistent): - """ - Internal CA object. - """ - - sql_template = rpki.sql.template( - "ca", - "ca_id", - "last_crl_sn", - ("next_crl_update", rpki.sundial.datetime), - "last_issued_sn", - "last_manifest_sn", - ("next_manifest_update", rpki.sundial.datetime), - "sia_uri", - "parent_id", - "parent_resource_class") - - last_crl_sn = 0 - last_issued_sn = 0 - last_manifest_sn = 0 - - def __repr__(self): - return rpki.log.log_repr(self, repr(self.parent), self.parent_resource_class) - - @property - @rpki.sql.cache_reference - def parent(self): - """ - Fetch parent object to which this CA object links. - """ - return rpki.left_right.parent_elt.sql_fetch(self.gctx, self.parent_id) - - @property - def ca_details(self): - """ - Fetch all ca_detail objects that link to this CA object. - """ - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s", (self.ca_id,)) - - @property - def pending_ca_details(self): - """ - Fetch the pending ca_details for this CA, if any. - """ - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'pending'", (self.ca_id,)) - - @property - def active_ca_detail(self): - """ - Fetch the active ca_detail for this CA, if any. - """ - return ca_detail_obj.sql_fetch_where1(self.gctx, "ca_id = %s AND state = 'active'", (self.ca_id,)) - - @property - def deprecated_ca_details(self): - """ - Fetch deprecated ca_details for this CA, if any. - """ - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'deprecated'", (self.ca_id,)) - - @property - def active_or_deprecated_ca_details(self): - """ - Fetch active and deprecated ca_details for this CA, if any. - """ - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND (state = 'active' OR state = 'deprecated')", (self.ca_id,)) - - @property - def revoked_ca_details(self): - """ - Fetch revoked ca_details for this CA, if any. - """ - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'revoked'", (self.ca_id,)) - - @property - def issue_response_candidate_ca_details(self): - """ - Fetch ca_details which are candidates for consideration when - processing an up-down issue_response PDU. - """ - #return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)) - return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state != 'revoked'", (self.ca_id,)) - - def construct_sia_uri(self, parent, rc): - """ - Construct the sia_uri value for this CA given configured - information and the parent's up-down protocol list_response PDU. - """ - - sia_uri = rc.suggested_sia_head and rc.suggested_sia_head.rsync() - if not sia_uri or not sia_uri.startswith(parent.sia_base): - sia_uri = parent.sia_base - if not sia_uri.endswith("/"): - raise rpki.exceptions.BadURISyntax("SIA URI must end with a slash: %s" % sia_uri) - # With luck this can go away sometime soon. - if self.gctx.merge_publication_directories: - return sia_uri - else: - return sia_uri + str(self.ca_id) + "/" - - def check_for_updates(self, parent, rc, cb, eb): - """ - Parent has signaled continued existance of a resource class we - already knew about, so we need to check for an updated - certificate, changes in resource coverage, revocation and reissue - with the same key, etc. - """ - - sia_uri = self.construct_sia_uri(parent, rc) - sia_uri_changed = self.sia_uri != sia_uri - if sia_uri_changed: - logger.debug("SIA changed: was %s now %s", self.sia_uri, sia_uri) - self.sia_uri = sia_uri - self.sql_mark_dirty() - - rc_resources = rc.to_resource_bag() - cert_map = dict((c.cert.get_SKI(), c) for c in rc.certs) - - def loop(iterator, ca_detail): - - self.gctx.checkpoint() - - rc_cert = cert_map.pop(ca_detail.public_key.get_SKI(), None) - - if rc_cert is None: - - logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, maybe parent certificate went away?", - ca_detail.public_key.gSKI(), rc.class_name, parent.self.self_handle, parent.parent_handle) - publisher = publication_queue() - ca_detail.delete(ca = ca_detail.ca, publisher = publisher) - return publisher.call_pubd(iterator, eb) - - else: - - if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert.cert_url.rsync(): - logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert.cert_url.rsync()) - ca_detail.ca_cert_uri = rc_cert.cert_url.rsync() - ca_detail.sql_mark_dirty() - - if ca_detail.state in ("pending", "active"): - - if ca_detail.state == "pending": - current_resources = rpki.resource_set.resource_bag() - else: - current_resources = ca_detail.latest_ca_cert.get_3779resources() - - if (ca_detail.state == "pending" or - sia_uri_changed or - ca_detail.latest_ca_cert != rc_cert.cert or - ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or - current_resources.undersized(rc_resources) or - current_resources.oversized(rc_resources)): - return ca_detail.update( - parent = parent, - ca = self, - rc = rc, - sia_uri_changed = sia_uri_changed, - old_resources = current_resources, - callback = iterator, - errback = eb) - - iterator() - - def done(): - if cert_map: - logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?", - "" if len(cert_map) == 1 else "s", - ", ".join(c.cert.gSKI() for c in cert_map.values()), - rc.class_name, parent.self.self_handle, parent.parent_handle) - self.gctx.sql.sweep() - self.gctx.checkpoint() - cb() - - ca_details = self.issue_response_candidate_ca_details - - if True: - skis_parent = set(x.cert.gSKI() - for x in cert_map.itervalues()) - skis_me = set(x.latest_ca_cert.gSKI() - for x in ca_details - if x.latest_ca_cert is not None) - for ski in skis_parent & skis_me: - logger.debug("Parent %s agrees that %s has SKI %s in resource class %s", - parent.parent_handle, parent.self.self_handle, ski, rc.class_name) - for ski in skis_parent - skis_me: - logger.debug("Parent %s thinks %s has SKI %s in resource class %s but I don't think so", - parent.parent_handle, parent.self.self_handle, ski, rc.class_name) - for ski in skis_me - skis_parent: - logger.debug("I think %s has SKI %s in resource class %s but parent %s doesn't think so", - parent.self.self_handle, ski, rc.class_name, parent.parent_handle) - - if ca_details: - rpki.async.iterator(ca_details, loop, done) - else: - logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", - rc.class_name, parent.self.self_handle, parent.parent_handle) - self.gctx.checkpoint() - self.rekey(cb, eb) - - @classmethod - def create(cls, parent, rc, cb, eb): - """ - Parent has signaled existance of a new resource class, so we need - to create and set up a corresponding CA object. - """ - - self = cls() - self.gctx = parent.gctx - self.parent_id = parent.parent_id - self.parent_resource_class = rc.class_name - self.sql_store() - try: - self.sia_uri = self.construct_sia_uri(parent, rc) - except rpki.exceptions.BadURISyntax: - self.sql_delete() - raise - ca_detail = ca_detail_obj.create(self) - - def done(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) - ca_detail.activate( - ca = self, - cert = c.cert, - uri = c.cert_url, - callback = cb, - errback = eb) - - logger.debug("Sending issue request to %r from %r", parent, self.create) - rpki.up_down.issue_pdu.query(parent, self, ca_detail, done, eb) - - def delete(self, parent, callback): - """ - The list of current resource classes received from parent does not - include the class corresponding to this CA, so we need to delete - it (and its little dog too...). - - All certs published by this CA are now invalid, so need to - withdraw them, the CRL, and the manifest from the repository, - delete all child_cert and ca_detail records associated with this - CA, then finally delete this CA itself. - """ - - def lose(e): - logger.exception("Could not delete CA %r, skipping", self) - callback() - - def done(): - logger.debug("Deleting %r", self) - self.sql_delete() - callback() - - publisher = publication_queue() - for ca_detail in self.ca_details: - ca_detail.delete(ca = self, publisher = publisher, allow_failure = True) - publisher.call_pubd(done, lose) - - def next_serial_number(self): - """ - Allocate a certificate serial number. - """ - self.last_issued_sn += 1 - self.sql_mark_dirty() - return self.last_issued_sn - - def next_manifest_number(self): - """ - Allocate a manifest serial number. - """ - self.last_manifest_sn += 1 - self.sql_mark_dirty() - return self.last_manifest_sn - - def next_crl_number(self): - """ - Allocate a CRL serial number. - """ - self.last_crl_sn += 1 - self.sql_mark_dirty() - return self.last_crl_sn - - def rekey(self, cb, eb): - """ - Initiate a rekey operation for this ca. Generate a new keypair. - Request cert from parent using new keypair. Mark result as our - active ca_detail. Reissue all child certs issued by this ca using - the new ca_detail. - """ - - parent = self.parent - old_detail = self.active_ca_detail - new_detail = ca_detail_obj.create(self) - - def done(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) - new_detail.activate( - ca = self, - cert = c.cert, - uri = c.cert_url, - predecessor = old_detail, - callback = cb, - errback = eb) - - logger.debug("Sending issue request to %r from %r", parent, self.rekey) - rpki.up_down.issue_pdu.query(parent, self, new_detail, done, eb) - - def revoke(self, cb, eb, revoke_all = False): - """ - Revoke deprecated ca_detail objects associated with this CA, or - all ca_details associated with this CA if revoke_all is set. - """ - - def loop(iterator, ca_detail): - ca_detail.revoke(cb = iterator, eb = eb) - - ca_details = self.ca_details if revoke_all else self.deprecated_ca_details - - rpki.async.iterator(ca_details, loop, cb) - - def reissue(self, cb, eb): - """ - Reissue all current certificates issued by this CA. - """ - - ca_detail = self.active_ca_detail - if ca_detail: - ca_detail.reissue(cb, eb) - else: - cb() - -class ca_detail_obj(rpki.sql.sql_persistent): - """ - Internal CA detail object. - """ - - sql_template = rpki.sql.template( - "ca_detail", - "ca_detail_id", - ("private_key_id", rpki.x509.RSA), - ("public_key", rpki.x509.PublicKey), - ("latest_ca_cert", rpki.x509.X509), - ("manifest_private_key_id", rpki.x509.RSA), - ("manifest_public_key", rpki.x509.PublicKey), - ("latest_manifest_cert", rpki.x509.X509), - ("latest_manifest", rpki.x509.SignedManifest), - ("latest_crl", rpki.x509.CRL), - ("crl_published", rpki.sundial.datetime), - ("manifest_published", rpki.sundial.datetime), - "state", - "ca_cert_uri", - "ca_id") - - crl_published = None - manifest_published = None - latest_ca_cert = None - latest_crl = None - latest_manifest = None - ca_cert_uri = None - - def __repr__(self): - return rpki.log.log_repr(self, repr(self.ca), self.state, self.ca_cert_uri) - - def sql_decode(self, vals): - """ - Extra assertions for SQL decode of a ca_detail_obj. - """ - rpki.sql.sql_persistent.sql_decode(self, vals) - assert self.public_key is None or self.private_key_id is None or self.public_key.get_DER() == self.private_key_id.get_public_DER() - assert self.manifest_public_key is None or self.manifest_private_key_id is None or self.manifest_public_key.get_DER() == self.manifest_private_key_id.get_public_DER() - - @property - @rpki.sql.cache_reference - def ca(self): - """ - Fetch CA object to which this ca_detail links. - """ - return ca_obj.sql_fetch(self.gctx, self.ca_id) - - def fetch_child_certs(self, child = None, ski = None, unique = False, unpublished = None): - """ - Fetch all child_cert objects that link to this ca_detail. - """ - return rpki.rpkid.child_cert_obj.fetch(self.gctx, child, self, ski, unique, unpublished) - - @property - def child_certs(self): - """ - Fetch all child_cert objects that link to this ca_detail. - """ - return self.fetch_child_certs() - - def unpublished_child_certs(self, when): - """ - Fetch all unpublished child_cert objects linked to this ca_detail - with attempted publication dates older than when. - """ - return self.fetch_child_certs(unpublished = when) - - @property - def revoked_certs(self): - """ - Fetch all revoked_cert objects that link to this ca_detail. - """ - return revoked_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) - - @property - def roas(self): - """ - Fetch all ROA objects that link to this ca_detail. - """ - return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) - - def unpublished_roas(self, when): - """ - Fetch all unpublished ROA objects linked to this ca_detail with - attempted publication dates older than when. - """ - return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when)) - - @property - def ghostbusters(self): - """ - Fetch all Ghostbuster objects that link to this ca_detail. - """ - return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) - - @property - def ee_certificates(self): - """ - Fetch all EE certificate objects that link to this ca_detail. - """ - return rpki.rpkid.ee_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) - - def unpublished_ghostbusters(self, when): - """ - Fetch all unpublished Ghostbusters objects linked to this - ca_detail with attempted publication dates older than when. - """ - return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when)) - - @property - def crl_uri(self): - """ - Return publication URI for this ca_detail's CRL. - """ - return self.ca.sia_uri + self.crl_uri_tail - - @property - def crl_uri_tail(self): - """ - Return tail (filename portion) of publication URI for this ca_detail's CRL. - """ - return self.public_key.gSKI() + ".crl" - - @property - def manifest_uri(self): - """ - Return publication URI for this ca_detail's manifest. - """ - return self.ca.sia_uri + self.public_key.gSKI() + ".mft" - - def has_expired(self): - """ - Return whether this ca_detail's certificate has expired. - """ - return self.latest_ca_cert.getNotAfter() <= rpki.sundial.now() - - def covers(self, target): - """ - Test whether this ca-detail covers a given set of resources. - """ - - assert not target.asn.inherit and not target.v4.inherit and not target.v6.inherit - me = self.latest_ca_cert.get_3779resources() - return target.asn <= me.asn and target.v4 <= me.v4 and target.v6 <= me.v6 - - def activate(self, ca, cert, uri, callback, errback, predecessor = None): - """ - Activate this ca_detail. - """ - - publisher = publication_queue() - - self.latest_ca_cert = cert - self.ca_cert_uri = uri.rsync() - self.generate_manifest_cert() - self.state = "active" - self.generate_crl(publisher = publisher) - self.generate_manifest(publisher = publisher) - self.sql_store() - - if predecessor is not None: - predecessor.state = "deprecated" - predecessor.sql_store() - for child_cert in predecessor.child_certs: - child_cert.reissue(ca_detail = self, publisher = publisher) - for roa in predecessor.roas: - roa.regenerate(publisher = publisher) - for ghostbuster in predecessor.ghostbusters: - ghostbuster.regenerate(publisher = publisher) - predecessor.generate_crl(publisher = publisher) - predecessor.generate_manifest(publisher = publisher) - - publisher.call_pubd(callback, errback) - - def delete(self, ca, publisher, allow_failure = False): - """ - Delete this ca_detail and all of the certs it issued. - - If allow_failure is true, we clean up as much as we can but don't - raise an exception. - """ - - repository = ca.parent.repository - handler = False if allow_failure else None - for child_cert in self.child_certs: - publisher.withdraw(cls = rpki.publication.certificate_elt, - uri = child_cert.uri, - obj = child_cert.cert, - repository = repository, - handler = handler) - child_cert.sql_mark_deleted() - for roa in self.roas: - roa.revoke(publisher = publisher, allow_failure = allow_failure, fast = True) - for ghostbuster in self.ghostbusters: - ghostbuster.revoke(publisher = publisher, allow_failure = allow_failure, fast = True) - try: - latest_manifest = self.latest_manifest - except AttributeError: - latest_manifest = None - if latest_manifest is not None: - publisher.withdraw(cls = rpki.publication.manifest_elt, - uri = self.manifest_uri, - obj = self.latest_manifest, - repository = repository, - handler = handler) - try: - latest_crl = self.latest_crl - except AttributeError: - latest_crl = None - if latest_crl is not None: - publisher.withdraw(cls = rpki.publication.crl_elt, - uri = self.crl_uri, - obj = self.latest_crl, - repository = repository, - handler = handler) - self.gctx.sql.sweep() - for cert in self.revoked_certs: # + self.child_certs - logger.debug("Deleting %r", cert) - cert.sql_delete() - logger.debug("Deleting %r", self) - self.sql_delete() - - def revoke(self, cb, eb): - """ - Request revocation of all certificates whose SKI matches the key - for this ca_detail. - - Tasks: - - - Request revocation of old keypair by parent. - - - Revoke all child certs issued by the old keypair. - - - Generate a final CRL, signed with the old keypair, listing all - the revoked certs, with a next CRL time after the last cert or - CRL signed by the old keypair will have expired. - - - Generate a corresponding final manifest. - - - Destroy old keypairs. - - - Leave final CRL and manifest in place until their nextupdate - time has passed. - """ - - ca = self.ca - parent = ca.parent - - def parent_revoked(r_msg): - - if r_msg.payload.ski != self.latest_ca_cert.gSKI(): - raise rpki.exceptions.SKIMismatch - - logger.debug("Parent revoked %s, starting cleanup", self.latest_ca_cert.gSKI()) - - crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) - - nextUpdate = rpki.sundial.now() - - if self.latest_manifest is not None: - self.latest_manifest.extract_if_needed() - nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate()) - - if self.latest_crl is not None: - nextUpdate = nextUpdate.later(self.latest_crl.getNextUpdate()) - - publisher = publication_queue() - - for child_cert in self.child_certs: - nextUpdate = nextUpdate.later(child_cert.cert.getNotAfter()) - child_cert.revoke(publisher = publisher) - - for roa in self.roas: - nextUpdate = nextUpdate.later(roa.cert.getNotAfter()) - roa.revoke(publisher = publisher) - - for ghostbuster in self.ghostbusters: - nextUpdate = nextUpdate.later(ghostbuster.cert.getNotAfter()) - ghostbuster.revoke(publisher = publisher) - - nextUpdate += crl_interval - self.generate_crl(publisher = publisher, nextUpdate = nextUpdate) - self.generate_manifest(publisher = publisher, nextUpdate = nextUpdate) - self.private_key_id = None - self.manifest_private_key_id = None - self.manifest_public_key = None - self.latest_manifest_cert = None - self.state = "revoked" - self.sql_mark_dirty() - publisher.call_pubd(cb, eb) - - logger.debug("Asking parent to revoke CA certificate %s", self.latest_ca_cert.gSKI()) - rpki.up_down.revoke_pdu.query(ca, self.latest_ca_cert.gSKI(), parent_revoked, eb) - def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback, errback): - """ - Need to get a new certificate for this ca_detail and perhaps frob - children of this ca_detail. - """ - - def issued(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) - - if self.state == "pending": - return self.activate( - ca = ca, - cert = c.cert, - uri = c.cert_url, - callback = callback, - errback = errback) - - validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != c.cert.getNotAfter() - - publisher = publication_queue() - - if self.latest_ca_cert != c.cert: - self.latest_ca_cert = c.cert - self.sql_mark_dirty() - self.generate_manifest_cert() - self.generate_crl(publisher = publisher) - self.generate_manifest(publisher = publisher) - - new_resources = self.latest_ca_cert.get_3779resources() - - if sia_uri_changed or old_resources.oversized(new_resources): - for child_cert in self.child_certs: - child_resources = child_cert.cert.get_3779resources() - if sia_uri_changed or child_resources.oversized(new_resources): - child_cert.reissue( - ca_detail = self, - resources = child_resources & new_resources, - publisher = publisher) - - if sia_uri_changed or validity_changed or old_resources.oversized(new_resources): - for roa in self.roas: - roa.update(publisher = publisher, fast = True) - - if sia_uri_changed or validity_changed: - for ghostbuster in self.ghostbusters: - ghostbuster.update(publisher = publisher, fast = True) - - publisher.call_pubd(callback, errback) - - logger.debug("Sending issue request to %r from %r", parent, self.update) - rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback) - - @classmethod - def create(cls, ca): - """ - Create a new ca_detail object for a specified CA. - """ - self = cls() - self.gctx = ca.gctx - self.ca_id = ca.ca_id - self.state = "pending" - - self.private_key_id = rpki.x509.RSA.generate() - self.public_key = self.private_key_id.get_public() - - self.manifest_private_key_id = rpki.x509.RSA.generate() - self.manifest_public_key = self.manifest_private_key_id.get_public() - - self.sql_store() - return self - - def issue_ee(self, ca, resources, subject_key, sia, - cn = None, sn = None, notAfter = None, eku = None): - """ - Issue a new EE certificate. - """ - - if notAfter is None: - notAfter = self.latest_ca_cert.getNotAfter() - - return self.latest_ca_cert.issue( - keypair = self.private_key_id, - subject_key = subject_key, - serial = ca.next_serial_number(), - sia = sia, - aia = self.ca_cert_uri, - crldp = self.crl_uri, - resources = resources, - notAfter = notAfter, - is_ca = False, - cn = cn, - sn = sn, - eku = eku) - - def generate_manifest_cert(self): - """ - Generate a new manifest certificate for this ca_detail. - """ - - resources = rpki.resource_set.resource_bag.from_inheritance() - self.latest_manifest_cert = self.issue_ee( - ca = self.ca, - resources = resources, - subject_key = self.manifest_public_key, - sia = (None, None, self.manifest_uri)) - - def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None): - """ - Issue a new certificate to a child. Optional child_cert argument - specifies an existing child_cert object to update in place; if not - specified, we create a new one. Returns the child_cert object - containing the newly issued cert. - """ - - self.check_failed_publication(publisher) - - assert child_cert is None or child_cert.child_id == child.child_id - - cert = self.latest_ca_cert.issue( - keypair = self.private_key_id, - subject_key = subject_key, - serial = ca.next_serial_number(), - aia = self.ca_cert_uri, - crldp = self.crl_uri, - sia = sia, - resources = resources, - notAfter = resources.valid_until) - - if child_cert is None: - child_cert = rpki.rpkid.child_cert_obj( - gctx = child.gctx, - child_id = child.child_id, - ca_detail_id = self.ca_detail_id, - cert = cert) - logger.debug("Created new child_cert %r", child_cert) - else: - child_cert.cert = cert - del child_cert.ca_detail - child_cert.ca_detail_id = self.ca_detail_id - logger.debug("Reusing existing child_cert %r", child_cert) - - child_cert.ski = cert.get_SKI() - child_cert.published = rpki.sundial.now() - child_cert.sql_store() - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = child_cert.uri, - obj = child_cert.cert, - repository = ca.parent.repository, - handler = child_cert.published_callback) - self.generate_manifest(publisher = publisher) - return child_cert - - def generate_crl(self, publisher, nextUpdate = None): - """ - Generate a new CRL for this ca_detail. At the moment this is - unconditional, that is, it is up to the caller to decide whether a - new CRL is needed. - """ - - self.check_failed_publication(publisher) - - ca = self.ca - parent = ca.parent - crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) - now = rpki.sundial.now() - - if nextUpdate is None: - nextUpdate = now + crl_interval - - certlist = [] - for revoked_cert in self.revoked_certs: - if now > revoked_cert.expires + crl_interval: - revoked_cert.sql_delete() - else: - certlist.append((revoked_cert.serial, revoked_cert.revoked)) - certlist.sort() - - self.latest_crl = rpki.x509.CRL.generate( - keypair = self.private_key_id, - issuer = self.latest_ca_cert, - serial = ca.next_crl_number(), - thisUpdate = now, - nextUpdate = nextUpdate, - revokedCertificates = certlist) - - self.crl_published = rpki.sundial.now() - self.sql_mark_dirty() - publisher.publish( - cls = rpki.publication.crl_elt, - uri = self.crl_uri, - obj = self.latest_crl, - repository = parent.repository, - handler = self.crl_published_callback) - - def crl_published_callback(self, pdu): - """ - Check result of CRL publication. - """ - pdu.raise_if_error() - self.crl_published = None - self.sql_mark_dirty() - - def generate_manifest(self, publisher, nextUpdate = None): - """ - Generate a new manifest for this ca_detail. - """ - - self.check_failed_publication(publisher) - - ca = self.ca - parent = ca.parent - crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) - now = rpki.sundial.now() - uri = self.manifest_uri - - if nextUpdate is None: - nextUpdate = now + crl_interval - - if (self.latest_manifest_cert is None or - (self.latest_manifest_cert.getNotAfter() < nextUpdate and - self.latest_manifest_cert.getNotAfter() < self.latest_ca_cert.getNotAfter())): - logger.debug("Generating EE certificate for %s", uri) - self.generate_manifest_cert() - logger.debug("Latest CA cert notAfter %s, new %s EE notAfter %s", - self.latest_ca_cert.getNotAfter(), uri, self.latest_manifest_cert.getNotAfter()) - - logger.debug("Constructing manifest object list for %s", uri) - objs = [(self.crl_uri_tail, self.latest_crl)] - objs.extend((c.uri_tail, c.cert) for c in self.child_certs) - objs.extend((r.uri_tail, r.roa) for r in self.roas if r.roa is not None) - objs.extend((g.uri_tail, g.ghostbuster) for g in self.ghostbusters) - objs.extend((e.uri_tail, e.cert) for e in self.ee_certificates) - - logger.debug("Building manifest object %s", uri) - self.latest_manifest = rpki.x509.SignedManifest.build( - serial = ca.next_manifest_number(), - thisUpdate = now, - nextUpdate = nextUpdate, - names_and_objs = objs, - keypair = self.manifest_private_key_id, - certs = self.latest_manifest_cert) - - logger.debug("Manifest generation took %s", rpki.sundial.now() - now) - - self.manifest_published = rpki.sundial.now() - self.sql_mark_dirty() - publisher.publish(cls = rpki.publication.manifest_elt, - uri = uri, - obj = self.latest_manifest, - repository = parent.repository, - handler = self.manifest_published_callback) - - def manifest_published_callback(self, pdu): - """ - Check result of manifest publication. - """ - pdu.raise_if_error() - self.manifest_published = None - self.sql_mark_dirty() - - def reissue(self, cb, eb): - """ - Reissue all current certificates issued by this ca_detail. - """ +import rpki.rpkid_tasks - publisher = publication_queue() - self.check_failed_publication(publisher) - for roa in self.roas: - roa.regenerate(publisher, fast = True) - for ghostbuster in self.ghostbusters: - ghostbuster.regenerate(publisher, fast = True) - for ee_certificate in self.ee_certificates: - ee_certificate.reissue(publisher, force = True) - for child_cert in self.child_certs: - child_cert.reissue(self, publisher, force = True) - self.gctx.sql.sweep() - self.generate_manifest_cert() - self.sql_mark_dirty() - self.generate_crl(publisher = publisher) - self.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() - publisher.call_pubd(cb, eb) - - def check_failed_publication(self, publisher, check_all = True): - """ - Check for failed publication of objects issued by this ca_detail. - - All publishable objects have timestamp fields recording time of - last attempted publication, and callback methods which clear these - timestamps once publication has succeeded. Our task here is to - look for objects issued by this ca_detail which have timestamps - set (indicating that they have not been published) and for which - the timestamps are not very recent (for some definition of very - recent -- intent is to allow a bit of slack in case pubd is just - being slow). In such cases, we want to retry publication. - - As an optimization, we can probably skip checking other products - if manifest and CRL have been published, thus saving ourselves - several complex SQL queries. Not sure yet whether this - optimization is worthwhile. - - For the moment we check everything without optimization, because - it simplifies testing. - - For the moment our definition of staleness is hardwired; this - should become configurable. - """ - logger.debug("Checking for failed publication for %r", self) - - stale = rpki.sundial.now() - rpki.sundial.timedelta(seconds = 60) - repository = self.ca.parent.repository - - if self.latest_crl is not None and \ - self.crl_published is not None and \ - self.crl_published < stale: - logger.debug("Retrying publication for %s", self.crl_uri) - publisher.publish(cls = rpki.publication.crl_elt, - uri = self.crl_uri, - obj = self.latest_crl, - repository = repository, - handler = self.crl_published_callback) - - if self.latest_manifest is not None and \ - self.manifest_published is not None and \ - self.manifest_published < stale: - logger.debug("Retrying publication for %s", self.manifest_uri) - publisher.publish(cls = rpki.publication.manifest_elt, - uri = self.manifest_uri, - obj = self.latest_manifest, - repository = repository, - handler = self.manifest_published_callback) - - if not check_all: - return - - # Might also be able to return here if manifest and CRL are up to - # date, but let's avoid premature optimization - - for child_cert in self.unpublished_child_certs(stale): - logger.debug("Retrying publication for %s", child_cert) - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = child_cert.uri, - obj = child_cert.cert, - repository = repository, - handler = child_cert.published_callback) - - for roa in self.unpublished_roas(stale): - logger.debug("Retrying publication for %s", roa) - publisher.publish( - cls = rpki.publication.roa_elt, - uri = roa.uri, - obj = roa.roa, - repository = repository, - handler = roa.published_callback) - - for ghostbuster in self.unpublished_ghostbusters(stale): - logger.debug("Retrying publication for %s", ghostbuster) - publisher.publish( - cls = rpki.publication.ghostbuster_elt, - uri = ghostbuster.uri, - obj = ghostbuster.ghostbuster, - repository = repository, - handler = ghostbuster.published_callback) - -class child_cert_obj(rpki.sql.sql_persistent): - """ - Certificate that has been issued to a child. - """ - - sql_template = rpki.sql.template( - "child_cert", - "child_cert_id", - ("cert", rpki.x509.X509), - "child_id", - "ca_detail_id", - "ski", - ("published", rpki.sundial.datetime)) - - def __repr__(self): - args = [self] - try: - args.append(self.uri) - except: # pylint: disable=W0702 - pass - return rpki.log.log_repr(*args) - - def __init__(self, gctx = None, child_id = None, ca_detail_id = None, cert = None): - """ - Initialize a child_cert_obj. - """ - rpki.sql.sql_persistent.__init__(self) - self.gctx = gctx - self.child_id = child_id - self.ca_detail_id = ca_detail_id - self.cert = cert - self.published = None - if child_id or ca_detail_id or cert: - self.sql_mark_dirty() - - @property - @rpki.sql.cache_reference - def child(self): - """ - Fetch child object to which this child_cert object links. - """ - return rpki.left_right.child_elt.sql_fetch(self.gctx, self.child_id) - - @property - @rpki.sql.cache_reference - def ca_detail(self): - """ - Fetch ca_detail object to which this child_cert object links. - """ - return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) +logger = logging.getLogger(__name__) - @ca_detail.deleter - def ca_detail(self): - try: - del self._ca_detail - except AttributeError: - pass - @property - def uri_tail(self): +class main(object): """ - Return the tail (filename) portion of the URI for this child_cert. + Main program for rpkid. """ - return self.cert.gSKI() + ".cer" - @property - def uri(self): - """ - Return the publication URI for this child_cert. - """ - return self.ca_detail.ca.sia_uri + self.uri_tail + def __init__(self): - def revoke(self, publisher, generate_crl_and_manifest = True): - """ - Revoke a child cert. - """ + os.environ.update(TZ = "UTC", + DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid") + time.tzset() - ca_detail = self.ca_detail - ca = ca_detail.ca - logger.debug("Revoking %r %r", self, self.uri) - revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) - publisher.withdraw( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, - repository = ca.parent.repository) - self.gctx.sql.sweep() - self.sql_delete() - if generate_crl_and_manifest: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - - def reissue(self, ca_detail, publisher, resources = None, sia = None, force = False): - """ - Reissue an existing child cert, reusing the public key. If the - child cert we would generate is identical to the one we already - have, we just return the one we already have. If we have to - revoke the old child cert when generating the new one, we have to - generate a new child_cert_obj, so calling code that needs the - updated child_cert_obj must use the return value from this method. - """ + self.irdbd_cms_timestamp = None + self.irbe_cms_timestamp = None - ca = ca_detail.ca - child = self.child + self.task_queue = tornado.queues.Queue() + self.task_ready = set() - old_resources = self.cert.get_3779resources() - old_sia = self.cert.get_SIA() - old_aia = self.cert.get_AIA()[0] - old_ca_detail = self.ca_detail + self.http_client_serialize = weakref.WeakValueDictionary() - needed = False + self.cfg = rpki.config.argparser(section = "rpkid", doc = __doc__) + self.cfg.add_boolean_argument("--foreground", + default = False, + help = "whether to daemonize") + self.cfg.add_argument("--pidfile", + default = os.path.join(rpki.daemonize.default_pid_directory, + "rpkid.pid"), + help = "override default location of pid file") + self.cfg.add_argument("--profile", + default = "", + help = "enable profiling, saving data to PROFILE") + self.cfg.add_logging_arguments() + args = self.cfg.argparser.parse_args() - if resources is None: - resources = old_resources + self.cfg.configure_logging(args = args, ident = "rpkid") - if sia is None: - sia = old_sia + self.profile = args.profile - assert resources.valid_until is not None and old_resources.valid_until is not None + try: + self.cfg.set_global_flags() - if resources.asn != old_resources.asn or resources.v4 != old_resources.v4 or resources.v6 != old_resources.v6: - logger.debug("Resources changed for %r: old %s new %s", self, old_resources, resources) - needed = True + if not args.foreground: + rpki.daemonize.daemon(pidfile = args.pidfile) - if resources.valid_until != old_resources.valid_until: - logger.debug("Validity changed for %r: old %s new %s", - self, old_resources.valid_until, resources.valid_until) - needed = True + if self.profile: + import cProfile + prof = cProfile.Profile() + try: + prof.runcall(self.main) + finally: + prof.dump_stats(self.profile) + logger.info("Dumped profile data to %s", self.profile) + else: + self.main() + except: + logger.exception("Unandled exception in rpki.rpkid.main()") + sys.exit(1) - if sia != old_sia: - logger.debug("SIA changed for %r: old %r new %r", self, old_sia, sia) - needed = True - if ca_detail != old_ca_detail: - logger.debug("Issuer changed for %r: old %r new %r", self, old_ca_detail, ca_detail) - needed = True + def main(self): - if ca_detail.ca_cert_uri != old_aia: - logger.debug("AIA changed for %r: old %r new %r", self, old_aia, ca_detail.ca_cert_uri) - needed = True + startup_msg = self.cfg.get("startup-message", "") + if startup_msg: + logger.info(startup_msg) - must_revoke = old_resources.oversized(resources) or old_resources.valid_until > resources.valid_until - if must_revoke: - logger.debug("Must revoke any existing cert(s) for %r", self) - needed = True + if self.profile: + logger.info("Running in profile mode with output to %s", self.profile) - if not needed and force: - logger.debug("No change needed for %r, forcing reissuance anyway", self) - needed = True + logger.debug("Initializing Django") + import django + django.setup() - if not needed: - logger.debug("No change to %r", self) - return self + logger.debug("Initializing rpkidb...") + global rpki # pylint: disable=W0602 + import rpki.rpkidb # pylint: disable=W0621 - if must_revoke: - for x in child.fetch_child_certs(ca_detail = ca_detail, ski = self.ski): - logger.debug("Revoking child_cert %r", x) - x.revoke(publisher = publisher) - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) + logger.debug("Initializing rpkidb...done") - child_cert = ca_detail.issue( - ca = ca, - child = child, - subject_key = self.cert.getPublicKey(), - sia = sia, - resources = resources, - child_cert = None if must_revoke else self, - publisher = publisher) + self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) + self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert")) + self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) + self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert")) + self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key")) - logger.debug("New child_cert %r uri %s", child_cert, child_cert.uri) + self.irdb_url = self.cfg.get("irdb-url") - return child_cert + self.http_server_host = self.cfg.get("server-host", "") + self.http_server_port = self.cfg.getint("server-port") - @classmethod - def fetch(cls, gctx = None, child = None, ca_detail = None, ski = None, unique = False, unpublished = None): - """ - Fetch all child_cert objects matching a particular set of - parameters. This is a wrapper to consolidate various queries that - would otherwise be inline SQL WHERE expressions. In most cases - code calls this indirectly, through methods in other classes. - """ + self.http_client_timeout = self.cfg.getint("http-client-timeout", 900) - args = [] - where = [] + self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) - if child: - where.append("child_id = %s") - args.append(child.child_id) + self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), + self.cfg.getint("initial-delay-max", 120)) - if ca_detail: - where.append("ca_detail_id = %s") - args.append(ca_detail.ca_detail_id) + self.cron_period = self.cfg.getint("cron-period", 1800) - if ski: - where.append("ski = %s") - args.append(ski) + if self.use_internal_cron: + logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) + tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) - if unpublished is not None: - where.append("published IS NOT NULL AND published < %s") - args.append(unpublished) + logger.debug("Scheduling task loop") + tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop) - where = " AND ".join(where) + rpkid = self - gctx = gctx or (child and child.gctx) or (ca_detail and ca_detail.gctx) or None + class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.left_right_handler(self) - if unique: - return cls.sql_fetch_where1(gctx, where, args) - else: - return cls.sql_fetch_where(gctx, where, args) + class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self, tenant_handle, child_handle): # pylint: disable=W0221 + yield rpkid.up_down_handler(self, tenant_handle, child_handle) - def published_callback(self, pdu): - """ - Publication callback: check result and mark published. - """ - pdu.raise_if_error() - self.published = None - self.sql_mark_dirty() - -class revoked_cert_obj(rpki.sql.sql_persistent): - """ - Tombstone for a revoked certificate. - """ - - sql_template = rpki.sql.template( - "revoked_cert", - "revoked_cert_id", - "serial", - "ca_detail_id", - ("revoked", rpki.sundial.datetime), - ("expires", rpki.sundial.datetime)) - - def __repr__(self): - return rpki.log.log_repr(self, repr(self.ca_detail), self.serial, self.revoked) - - def __init__(self, gctx = None, serial = None, revoked = None, expires = None, ca_detail_id = None): - """ - Initialize a revoked_cert_obj. - """ - rpki.sql.sql_persistent.__init__(self) - self.gctx = gctx - self.serial = serial - self.revoked = revoked - self.expires = expires - self.ca_detail_id = ca_detail_id - if serial or revoked or expires or ca_detail_id: - self.sql_mark_dirty() - - @property - @rpki.sql.cache_reference - def ca_detail(self): - """ - Fetch ca_detail object to which this revoked_cert_obj links. - """ - return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) + class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.cronjob_handler(self) - @classmethod - def revoke(cls, cert, ca_detail): - """ - Revoke a certificate. - """ - return cls( - serial = cert.getSerial(), - expires = cert.getNotAfter(), - revoked = rpki.sundial.now(), - gctx = ca_detail.gctx, - ca_detail_id = ca_detail.ca_detail_id) - -class roa_obj(rpki.sql.sql_persistent): - """ - Route Origin Authorization. - """ - - sql_template = rpki.sql.template( - "roa", - "roa_id", - "ca_detail_id", - "self_id", - "asn", - ("roa", rpki.x509.ROA), - ("cert", rpki.x509.X509), - ("published", rpki.sundial.datetime)) - - ca_detail_id = None - cert = None - roa = None - published = None - - @property - @rpki.sql.cache_reference - def self(self): - """ - Fetch self object to which this roa_obj links. - """ - return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) + application = tornado.web.Application(( + (r"/left-right", LeftRightHandler), + (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler), + (r"/cronjob", CronjobHandler))) - @property - @rpki.sql.cache_reference - def ca_detail(self): - """ - Fetch ca_detail object to which this roa_obj links. - """ - return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) + application.listen( + address = self.http_server_host, + port = self.http_server_port) - @ca_detail.deleter - def ca_detail(self): - try: - del self._ca_detail - except AttributeError: - pass + tornado.ioloop.IOLoop.current().start() - def sql_fetch_hook(self): - """ - Extra SQL fetch actions for roa_obj -- handle prefix lists. - """ - for version, datatype, attribute in ((4, rpki.resource_set.roa_prefix_set_ipv4, "ipv4"), - (6, rpki.resource_set.roa_prefix_set_ipv6, "ipv6")): - setattr(self, attribute, datatype.from_sql( - self.gctx.sql, + def task_add(self, *tasks): + """ + Add tasks to the task queue. """ - SELECT prefix, prefixlen, max_prefixlen FROM roa_prefix - WHERE roa_id = %s AND version = %s - """, - (self.roa_id, version))) - - def sql_insert_hook(self): - """ - Extra SQL insert actions for roa_obj -- handle prefix lists. - """ - for version, prefix_set in ((4, self.ipv4), (6, self.ipv6)): - if prefix_set: - self.gctx.sql.executemany( - """ - INSERT roa_prefix (roa_id, prefix, prefixlen, max_prefixlen, version) - VALUES (%s, %s, %s, %s, %s) - """, - ((self.roa_id, x.prefix, x.prefixlen, x.max_prefixlen, version) - for x in prefix_set)) - - def sql_delete_hook(self): - """ - Extra SQL delete actions for roa_obj -- handle prefix lists. - """ - self.gctx.sql.execute("DELETE FROM roa_prefix WHERE roa_id = %s", (self.roa_id,)) - - def __repr__(self): - args = [self, self.asn, self.ipv4, self.ipv6] - try: - args.append(self.uri) - except: # pylint: disable=W0702 - pass - return rpki.log.log_repr(*args) - - def __init__(self, gctx = None, self_id = None, asn = None, ipv4 = None, ipv6 = None): - rpki.sql.sql_persistent.__init__(self) - self.gctx = gctx - self.self_id = self_id - self.asn = asn - self.ipv4 = ipv4 - self.ipv6 = ipv6 - - # Defer marking new ROA as dirty until .generate() has a chance to - # finish setup, otherwise we get SQL consistency errors. - # - #if self_id or asn or ipv4 or ipv6: self.sql_mark_dirty() - - def update(self, publisher, fast = False): - """ - Bring this roa_obj's ROA up to date if necesssary. - """ - - v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() - v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() - - if self.roa is None: - logger.debug("%r doesn't exist, generating", self) - return self.generate(publisher = publisher, fast = fast) - - ca_detail = self.ca_detail - - if ca_detail is None: - logger.debug("%r has no associated ca_detail, generating", self) - return self.generate(publisher = publisher, fast = fast) - - if ca_detail.state != "active": - logger.debug("ca_detail associated with %r not active (state %s), regenerating", self, ca_detail.state) - return self.regenerate(publisher = publisher, fast = fast) - - now = rpki.sundial.now() - regen_time = self.cert.getNotAfter() - rpki.sundial.timedelta(seconds = self.self.regen_margin) - - if now > regen_time and self.cert.getNotAfter() < ca_detail.latest_ca_cert.getNotAfter(): - logger.debug("%r past threshold %s, regenerating", self, regen_time) - return self.regenerate(publisher = publisher, fast = fast) - - if now > regen_time: - logger.warning("%r is past threshold %s but so is issuer %r, can't regenerate", self, regen_time, ca_detail) - - ca_resources = ca_detail.latest_ca_cert.get_3779resources() - ee_resources = self.cert.get_3779resources() - - if ee_resources.oversized(ca_resources): - logger.debug("%r oversized with respect to CA, regenerating", self) - return self.regenerate(publisher = publisher, fast = fast) - - if ee_resources.v4 != v4 or ee_resources.v6 != v6: - logger.debug("%r resources do not match EE, regenerating", self) - return self.regenerate(publisher = publisher, fast = fast) - - if self.cert.get_AIA()[0] != ca_detail.ca_cert_uri: - logger.debug("%r AIA changed, regenerating", self) - return self.regenerate(publisher = publisher, fast = fast) - - def generate(self, publisher, fast = False): - """ - Generate a ROA. - - At present we have no way of performing a direct lookup from a - desired set of resources to a covering certificate, so we have to - search. This could be quite slow if we have a lot of active - ca_detail objects. Punt on the issue for now, revisit if - profiling shows this as a hotspot. - - Once we have the right covering certificate, we generate the ROA - payload, generate a new EE certificate, use the EE certificate to - sign the ROA payload, publish the result, then throw away the - private key for the EE cert, all per the ROA specification. This - implies that generating a lot of ROAs will tend to thrash - /dev/random, but there is not much we can do about that. - - If fast is set, we leave generating the new manifest for our - caller to handle, presumably at the end of a bulk operation. - """ - - if self.ipv4 is None and self.ipv6 is None: - raise rpki.exceptions.EmptyROAPrefixList - - # Ugly and expensive search for covering ca_detail, there has to - # be a better way, but it would require the ability to test for - # resource subsets in SQL. - - v4 = self.ipv4.to_resource_set() if self.ipv4 is not None else rpki.resource_set.resource_set_ipv4() - v6 = self.ipv6.to_resource_set() if self.ipv6 is not None else rpki.resource_set.resource_set_ipv6() - - ca_detail = self.ca_detail - if ca_detail is None or ca_detail.state != "active" or ca_detail.has_expired(): - logger.debug("Searching for new ca_detail for ROA %r", self) - ca_detail = None - for parent in self.self.parents: - for ca in parent.cas: - ca_detail = ca.active_ca_detail - assert ca_detail is None or ca_detail.state == "active" - if ca_detail is not None and not ca_detail.has_expired(): - resources = ca_detail.latest_ca_cert.get_3779resources() - if v4.issubset(resources.v4) and v6.issubset(resources.v6): - break - ca_detail = None - if ca_detail is not None: - break - else: - logger.debug("Keeping old ca_detail for ROA %r", self) - - if ca_detail is None: - raise rpki.exceptions.NoCoveringCertForROA("Could not find a certificate covering %r" % self) - - logger.debug("Using new ca_detail %r for ROA %r, ca_detail_state %s", - ca_detail, self, ca_detail.state) - - ca = ca_detail.ca - resources = rpki.resource_set.resource_bag(v4 = v4, v6 = v6) - keypair = rpki.x509.RSA.generate() - - del self.ca_detail - self.ca_detail_id = ca_detail.ca_detail_id - self.cert = ca_detail.issue_ee( - ca = ca, - resources = resources, - subject_key = keypair.get_public(), - sia = (None, None, self.uri_from_key(keypair))) - self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,)) - self.published = rpki.sundial.now() - self.sql_store() - - logger.debug("Generating %r URI %s", self, self.uri) - publisher.publish( - cls = rpki.publication.roa_elt, - uri = self.uri, - obj = self.roa, - repository = ca.parent.repository, - handler = self.published_callback) - if not fast: - ca_detail.generate_manifest(publisher = publisher) - - - def published_callback(self, pdu): - """ - Check publication result. - """ - pdu.raise_if_error() - self.published = None - self.sql_mark_dirty() - - def revoke(self, publisher, regenerate = False, allow_failure = False, fast = False): - """ - Withdraw ROA associated with this roa_obj. - - In order to preserve make-before-break properties without - duplicating code, this method also handles generating a - replacement ROA when requested. - - If allow_failure is set, failing to withdraw the ROA will not be - considered an error. - - If fast is set, SQL actions will be deferred, on the assumption - that our caller will handle regenerating CRL and manifest and - flushing the SQL cache. - """ - - ca_detail = self.ca_detail - cert = self.cert - roa = self.roa - uri = self.uri - - logger.debug("%s %r, ca_detail %r state is %s", - "Regenerating" if regenerate else "Not regenerating", - self, ca_detail, ca_detail.state) - - if regenerate: - self.generate(publisher = publisher, fast = fast) - - logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri) - rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.roa_elt, uri = uri, obj = roa, - repository = ca_detail.ca.parent.repository, - handler = False if allow_failure else None) - - if not regenerate: - self.sql_mark_deleted() - - if not fast: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() - - def regenerate(self, publisher, fast = False): - """ - Reissue ROA associated with this roa_obj. - """ - if self.ca_detail is None: - self.generate(publisher = publisher, fast = fast) - else: - self.revoke(publisher = publisher, regenerate = True, fast = fast) - - def uri_from_key(self, key): - """ - Return publication URI for a public key. - """ - return self.ca_detail.ca.sia_uri + key.gSKI() + ".roa" - - @property - def uri(self): - """ - Return the publication URI for this roa_obj's ROA. - """ - return self.ca_detail.ca.sia_uri + self.uri_tail - - @property - def uri_tail(self): - """ - Return the tail (filename portion) of the publication URI for this - roa_obj's ROA. - """ - return self.cert.gSKI() + ".roa" - - -class ghostbuster_obj(rpki.sql.sql_persistent): - """ - Ghostbusters record. - """ - - sql_template = rpki.sql.template( - "ghostbuster", - "ghostbuster_id", - "ca_detail_id", - "self_id", - "vcard", - ("ghostbuster", rpki.x509.Ghostbuster), - ("cert", rpki.x509.X509), - ("published", rpki.sundial.datetime)) - - ca_detail_id = None - cert = None - ghostbuster = None - published = None - vcard = None - - def __repr__(self): - args = [self] - try: - args.extend(self.vcard.splitlines()[2:-1]) - except: # pylint: disable=W0702 - pass - try: - args.append(self.uri) - except: # pylint: disable=W0702 - pass - return rpki.log.log_repr(*args) - - @property - @rpki.sql.cache_reference - def self(self): - """ - Fetch self object to which this ghostbuster_obj links. - """ - return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) - - @property - @rpki.sql.cache_reference - def ca_detail(self): - """ - Fetch ca_detail object to which this ghostbuster_obj links. - """ - return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) - - def __init__(self, gctx = None, self_id = None, ca_detail_id = None, vcard = None): - rpki.sql.sql_persistent.__init__(self) - self.gctx = gctx - self.self_id = self_id - self.ca_detail_id = ca_detail_id - self.vcard = vcard - - # Defer marking new ghostbuster as dirty until .generate() has a chance to - # finish setup, otherwise we get SQL consistency errors. - - def update(self, publisher, fast = False): - """ - Bring this ghostbuster_obj up to date if necesssary. - """ - - if self.ghostbuster is None: - logger.debug("Ghostbuster record doesn't exist, generating") - return self.generate(publisher = publisher, fast = fast) - now = rpki.sundial.now() - regen_time = self.cert.getNotAfter() - rpki.sundial.timedelta(seconds = self.self.regen_margin) + for task in tasks: + if task in self.task_ready: + logger.debug("Task %r already queued", task) + else: + logger.debug("Adding %r to task queue", task) + self.task_queue.put(task) + self.task_ready.add(task) - if now > regen_time and self.cert.getNotAfter() < self.ca_detail.latest_ca_cert.getNotAfter(): - logger.debug("%r past threshold %s, regenerating", self, regen_time) - return self.regenerate(publisher = publisher, fast = fast) + @tornado.gen.coroutine + def task_loop(self): + """ + Asynchronous infinite loop to run background tasks. + """ - if now > regen_time: - logger.warning("%r is past threshold %s but so is issuer %r, can't regenerate", self, regen_time, self.ca_detail) + logger.debug("Starting task loop") - if self.cert.get_AIA()[0] != self.ca_detail.ca_cert_uri: - logger.debug("%r AIA changed, regenerating", self) - return self.regenerate(publisher = publisher, fast = fast) + while True: + task = None + try: + task = yield self.task_queue.get() + self.task_ready.discard(task) + yield task.start() + except: + logger.exception("Unhandled exception from %r", task) - def generate(self, publisher, fast = False): - """ - Generate a Ghostbuster record + @tornado.gen.coroutine + def cron_loop(self): + """ + Asynchronous infinite loop to drive internal cron cycle. + """ - Once we have the right covering certificate, we generate the - ghostbuster payload, generate a new EE certificate, use the EE - certificate to sign the ghostbuster payload, publish the result, - then throw away the private key for the EE cert. This is modeled - after the way we handle ROAs. + logger.debug("cron_loop(): Starting") + assert self.use_internal_cron + logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay) + yield tornado.gen.sleep(self.initial_delay) + while True: + logger.debug("cron_loop(): Running") + try: + self.cron_run() + except: + logger.exception("Error queuing cron tasks") + logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) + yield tornado.gen.sleep(self.cron_period) + + def cron_run(self): + """ + Schedule periodic tasks. + """ - If fast is set, we leave generating the new manifest for our - caller to handle, presumably at the end of a bulk operation. - """ + for tenant in rpki.rpkidb.models.Tenant.objects.all(): + self.task_add(*tenant.cron_tasks(self)) - ca_detail = self.ca_detail - ca = ca_detail.ca - - resources = rpki.resource_set.resource_bag.from_inheritance() - keypair = rpki.x509.RSA.generate() - - self.cert = ca_detail.issue_ee( - ca = ca, - resources = resources, - subject_key = keypair.get_public(), - sia = (None, None, self.uri_from_key(keypair))) - self.ghostbuster = rpki.x509.Ghostbuster.build(self.vcard, keypair, (self.cert,)) - self.published = rpki.sundial.now() - self.sql_store() - - logger.debug("Generating Ghostbuster record %r", self.uri) - publisher.publish( - cls = rpki.publication.ghostbuster_elt, - uri = self.uri, - obj = self.ghostbuster, - repository = ca.parent.repository, - handler = self.published_callback) - if not fast: - ca_detail.generate_manifest(publisher = publisher) - - def published_callback(self, pdu): - """ - Check publication result. - """ - pdu.raise_if_error() - self.published = None - self.sql_mark_dirty() - - def revoke(self, publisher, regenerate = False, allow_failure = False, fast = False): - """ - Withdraw Ghostbuster associated with this ghostbuster_obj. + @tornado.gen.coroutine + def cronjob_handler(self, handler): + """ + External trigger to schedule periodic tasks. Obsolete for + production use, but portions of the test framework still use this. + """ - In order to preserve make-before-break properties without - duplicating code, this method also handles generating a - replacement ghostbuster when requested. + if self.use_internal_cron: + handler.set_status(500, "Running cron internally") + else: + logger.debug("Starting externally triggered cron") + self.cron_run() + handler.set_status(200) + handler.finish() - If allow_failure is set, failing to withdraw the ghostbuster will not be - considered an error. + @tornado.gen.coroutine + def http_fetch(self, request, serialize_on_full_url = False): + """ + Wrapper around tornado.httpclient.AsyncHTTPClient() which + serializes requests to any particular HTTP server, to avoid + spurious CMS replay errors. + """ - If fast is set, SQL actions will be deferred, on the assumption - that our caller will handle regenerating CRL and manifest and - flushing the SQL cache. - """ + # The current definition of "particular HTTP server" is based only + # on the "netloc" portion of the URL, which could in theory could + # cause deadlocks in a loopback scenario; no such deadlocks have + # shown up in testing, but if such a thing were to occur, it would + # look like an otherwise inexplicable HTTP timeout. The solution, + # should this occur, would be to use the entire URL as the lookup + # key, perhaps only for certain protocols. + # + # The reason for the current scheme is that at least one protocol + # (publication) uses RESTful URLs but has a single service-wide + # CMS replay detection database, which translates to meaning that + # we need to serialize all requests for that service, not just + # requests to a particular URL. + + if serialize_on_full_url: + netlock = request.url + else: + netlock = urlparse.urlparse(request.url).netloc - ca_detail = self.ca_detail - cert = self.cert - ghostbuster = self.ghostbuster - uri = self.uri + try: + lock = self.http_client_serialize[netlock] + except KeyError: + lock = self.http_client_serialize[netlock] = tornado.locks.Lock() - logger.debug("%s %r, ca_detail %r state is %s", - "Regenerating" if regenerate else "Not regenerating", - self, ca_detail, ca_detail.state) + http_client = tornado.httpclient.AsyncHTTPClient() - if regenerate: - self.generate(publisher = publisher, fast = fast) + with (yield lock.acquire()): + try: + started = time.time() + response = yield http_client.fetch(request) + except tornado.httpclient.HTTPError as e: - logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri) - rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.ghostbuster_elt, uri = uri, obj = ghostbuster, - repository = ca_detail.ca.parent.repository, - handler = False if allow_failure else None) + # XXX This is not a solution, just an attempt to + # gather data on whether the timeout arguments are + # working as expected. - if not regenerate: - self.sql_mark_deleted() + logger.warning("%r: HTTP error contacting %r: %s", self, request, e) + if e.code == 599: + logger.warning("%r: HTTP timeout after time %s seconds", self, time.time() - started) + raise - if not fast: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() + raise tornado.gen.Return(response) - def regenerate(self, publisher, fast = False): - """ - Reissue Ghostbuster associated with this ghostbuster_obj. - """ - if self.ghostbuster is None: - self.generate(publisher = publisher, fast = fast) - else: - self.revoke(publisher = publisher, regenerate = True, fast = fast) + @staticmethod + def compose_left_right_query(): + """ + Compose top level element of a left-right query to irdbd. + """ - def uri_from_key(self, key): - """ - Return publication URI for a public key. - """ - return self.ca_detail.ca.sia_uri + key.gSKI() + ".gbr" + return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "query", version = rpki.left_right.version) - @property - def uri(self): - """ - Return the publication URI for this ghostbuster_obj's ghostbuster. - """ - return self.ca_detail.ca.sia_uri + self.uri_tail + @tornado.gen.coroutine + def irdb_query(self, q_msg): + """ + Perform an IRDB callback query. + """ - @property - def uri_tail(self): - """ - Return the tail (filename portion) of the publication URI for this - ghostbuster_obj's ghostbuster. - """ - return self.cert.gSKI() + ".gbr" - - -class ee_cert_obj(rpki.sql.sql_persistent): - """ - EE certificate (router certificate or generic). - """ - - sql_template = rpki.sql.template( - "ee_cert", - "ee_cert_id", - "self_id", - "ca_detail_id", - "ski", - ("cert", rpki.x509.X509), - ("published", rpki.sundial.datetime)) - - def __repr__(self): - return rpki.log.log_repr(self, self.cert.getSubject(), self.uri) - - def __init__(self, gctx = None, self_id = None, ca_detail_id = None, cert = None): - rpki.sql.sql_persistent.__init__(self) - self.gctx = gctx - self.self_id = self_id - self.ca_detail_id = ca_detail_id - self.cert = cert - self.ski = None if cert is None else cert.get_SKI() - self.published = None - if self_id or ca_detail_id or cert: - self.sql_mark_dirty() - - @property - @rpki.sql.cache_reference - def self(self): - """ - Fetch self object to which this ee_cert_obj links. - """ - return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) + q_tags = set(q_pdu.tag for q_pdu in q_msg) - @property - @rpki.sql.cache_reference - def ca_detail(self): - """ - Fetch ca_detail object to which this ee_cert_obj links. - """ - return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) + q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) - @ca_detail.deleter - def ca_detail(self): - try: - del self._ca_detail - except AttributeError: - pass + http_request = tornado.httpclient.HTTPRequest( + url = self.irdb_url, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.left_right.content_type }, + connect_timeout = self.http_client_timeout, + request_timeout = self.http_client_timeout) - @property - def gski(self): - """ - Calculate g(SKI), for ease of comparison with XML. + http_response = yield self.http_fetch(http_request) - Although, really, one has to ask why we don't just store g(SKI) - in rpkid.sql instead of ski.... - """ - return base64.urlsafe_b64encode(self.ski).rstrip("=") + # Tornado already checked http_response.code for us - @gski.setter - def gski(self, val): - self.ski = base64.urlsafe_b64decode(val + ("=" * ((4 - len(val)) % 4))) + content_type = http_response.headers.get("Content-Type") - @property - def uri(self): - """ - Return the publication URI for this ee_cert_obj. - """ - return self.ca_detail.ca.sia_uri + self.uri_tail + if content_type not in rpki.left_right.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type)) - @property - def uri_tail(self): - """ - Return the tail (filename portion) of the publication URI for this - ee_cert_obj. - """ - return self.cert.gSKI() + ".cer" + r_der = http_response.body - @classmethod - def create(cls, ca_detail, subject_name, subject_key, resources, publisher, eku = None): - """ - Generate a new certificate and stuff it in a new ee_cert_obj. - """ + r_cms = rpki.left_right.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) - cn, sn = subject_name.extract_cn_and_sn() - ca = ca_detail.ca + self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - cert = ca_detail.issue_ee( - ca = ca, - subject_key = subject_key, - sia = None, - resources = resources, - notAfter = resources.valid_until, - cn = cn, - sn = sn, - eku = eku) + #rpki.left_right.check_response(r_msg) - self = cls( - gctx = ca_detail.gctx, - self_id = ca.parent.self.self_id, - ca_detail_id = ca_detail.ca_detail_id, - cert = cert) + if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): + raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, - repository = ca.parent.repository, - handler = self.published_callback) + raise tornado.gen.Return(r_msg) - self.sql_store() + @tornado.gen.coroutine + def irdb_query_children_resources(self, tenant_handle, child_handles): + """ + Ask IRDB about resources for one or more children. + """ - ca_detail.generate_manifest(publisher = publisher) + q_msg = self.compose_left_right_query() + for child_handle in child_handles: + SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle) - logger.debug("New ee_cert %r", self) + r_msg = yield self.irdb_query(q_msg) - return self + if len(r_msg) != len(q_msg): + raise rpki.exceptions.BadIRDBReply("Expected IRDB response to be same length as query: %s" % r_msg.pretty_print_content()) - def revoke(self, publisher, generate_crl_and_manifest = True): - """ - Revoke and withdraw an EE certificate. - """ + bags = [rpki.resource_set.resource_bag(asn = r_pdu.get("asn"), + v4 = r_pdu.get("ipv4"), + v6 = r_pdu.get("ipv6"), + valid_until = r_pdu.get("valid_until")) + for r_pdu in r_msg] - ca_detail = self.ca_detail - ca = ca_detail.ca - logger.debug("Revoking %r %r", self, self.uri) - revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, - repository = ca.parent.repository) - self.gctx.sql.sweep() - self.sql_delete() - if generate_crl_and_manifest: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - - def reissue(self, publisher, ca_detail = None, resources = None, force = False): - """ - Reissue an existing EE cert, reusing the public key. If the EE - cert we would generate is identical to the one we already have, we - just return; if we need to reissue, we reuse this ee_cert_obj and - just update its contents, as the publication URI will not have - changed. - """ + raise tornado.gen.Return(bags) - needed = False + @tornado.gen.coroutine + def irdb_query_child_resources(self, tenant_handle, child_handle): + """ + Ask IRDB about a single child's resources. + """ - old_cert = self.cert + bags = yield self.irdb_query_children_resources(tenant_handle, (child_handle,)) + raise tornado.gen.Return(bags[0]) - old_ca_detail = self.ca_detail - if ca_detail is None: - ca_detail = old_ca_detail + @tornado.gen.coroutine + def irdb_query_roa_requests(self, tenant_handle): + """ + Ask IRDB about self's ROA requests. + """ - assert ca_detail.ca is old_ca_detail.ca + q_msg = self.compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - old_resources = old_cert.get_3779resources() - if resources is None: - resources = old_resources + @tornado.gen.coroutine + def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): + """ + Ask IRDB about self's ghostbuster record requests. + """ - assert resources.valid_until is not None and old_resources.valid_until is not None + q_msg = self.compose_left_right_query() + for parent_handle in parent_handles: + SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, + tenant_handle = tenant_handle, parent_handle = parent_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - assert ca_detail.covers(resources) + @tornado.gen.coroutine + def irdb_query_ee_certificate_requests(self, tenant_handle): + """ + Ask IRDB about self's EE certificate requests. + """ - if ca_detail != self.ca_detail: - logger.debug("ca_detail changed for %r: old %r new %r", - self, self.ca_detail, ca_detail) - needed = True + q_msg = self.compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - if ca_detail.ca_cert_uri != old_cert.get_AIA()[0]: - logger.debug("AIA changed for %r: old %s new %s", - self, old_cert.get_AIA()[0], ca_detail.ca_cert_uri) - needed = True + @property + def left_right_models(self): + """ + Map element tag to rpkidb model. + """ - if resources.valid_until != old_resources.valid_until: - logger.debug("Validity changed for %r: old %s new %s", - self, old_resources.valid_until, resources.valid_until) - needed = True + # pylint: disable=W0621,W0201 - if resources.asn != old_resources.asn or resources.v4 != old_resources.v4 or resources.v6 != old_resources.v6: - logger.debug("Resources changed for %r: old %s new %s", - self, old_resources, resources) - needed = True + try: + return self._left_right_models + except AttributeError: + import rpki.rpkidb.models + self._left_right_models = { + rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant, + rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC, + rpki.left_right.tag_parent : rpki.rpkidb.models.Parent, + rpki.left_right.tag_child : rpki.rpkidb.models.Child, + rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } + return self._left_right_models + + @property + def left_right_trivial_handlers(self): + """ + Map element tag to bound handler methods for trivial PDU types. + """ - must_revoke = (old_resources.oversized(resources) or - old_resources.valid_until > resources.valid_until) - if must_revoke: - logger.debug("Must revoke existing cert(s) for %r", self) - needed = True + # pylint: disable=W0201 - if not needed and force: - logger.debug("No change needed for %r, forcing reissuance anyway", self) - needed = True + try: + return self._left_right_trivial_handlers + except AttributeError: + self._left_right_trivial_handlers = { + rpki.left_right.tag_list_published_objects : self.handle_list_published_objects, + rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } + return self._left_right_trivial_handlers + + def handle_list_published_objects(self, q_pdu, r_msg): + """ + <list_published_objects/> server. + """ - if not needed: - logger.debug("No change to %r", self) - return + tenant_handle = q_pdu.get("tenant_handle") + msg_tag = q_pdu.get("tag") + + kw = dict(tenant_handle = tenant_handle) + if msg_tag is not None: + kw.update(tag = msg_tag) + + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64() + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64() + for c in ca_detail.child_certs.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64() + for r in ca_detail.roas.filter(roa__isnull = False): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = r.uri, **kw).text = r.roa.get_Base64() + for g in ca_detail.ghostbusters.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = g.uri, **kw).text = g.ghostbuster.get_Base64() + for c in ca_detail.ee_certificates.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, **kw).text = c.cert.get_Base64() + + def handle_list_received_resources(self, q_pdu, r_msg): + """ + <list_received_resources/> server. + """ - cn, sn = self.cert.getSubject().extract_cn_and_sn() + logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu)) + tenant_handle = q_pdu.get("tenant_handle") + msg_tag = q_pdu.get("tag") + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, + state = "active", latest_ca_cert__isnull = False): + cert = ca_detail.latest_ca_cert + resources = cert.get_3779resources() + r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources, + tenant_handle = tenant_handle, + parent_handle = ca_detail.ca.parent.parent_handle, + uri = ca_detail.ca_cert_uri, + notBefore = str(cert.getNotBefore()), + notAfter = str(cert.getNotAfter()), + sia_uri = cert.get_sia_directory_uri(), + aia_uri = cert.get_aia_uri(), + asn = str(resources.asn), + ipv4 = str(resources.v4), + ipv6 = str(resources.v6)) + if msg_tag is not None: + r_pdu.set("tag", msg_tag) + + @tornado.gen.coroutine + def left_right_handler(self, handler): + """ + Process one left-right message. + """ - self.cert = ca_detail.issue_ee( - ca = ca_detail.ca, - subject_key = self.cert.getPublicKey(), - eku = self.cert.get_EKU(), - sia = None, - resources = resources, - notAfter = resources.valid_until, - cn = cn, - sn = sn) + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.left_right.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + return - self.sql_mark_dirty() + handler.set_header("Content-Type", rpki.left_right.content_type) - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, - repository = ca_detail.ca.parent.repository, - handler = self.published_callback) + try: + q_cms = rpki.left_right.cms_msg(DER = handler.request.body) + q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) + r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "reply", version = rpki.left_right.version) + self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path) + + assert q_msg.tag.startswith(rpki.left_right.xmlns) + assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) + + if q_msg.get("version") != rpki.left_right.version: + raise rpki.exceptions.BadQuery("Unrecognized protocol version") + + if q_msg.get("type") != "query": + raise rpki.exceptions.BadQuery("Message type is not query") + + for q_pdu in q_msg: + + try: + action = q_pdu.get("action") + model = self.left_right_models.get(q_pdu.tag) + + if q_pdu.tag in self.left_right_trivial_handlers: + self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) + + elif action in ("get", "list"): + for obj in model.objects.xml_list(q_pdu): + obj.xml_template.encode(obj, q_pdu, r_msg) + + elif action == "destroy": + obj = model.objects.xml_get_for_delete(q_pdu) + yield obj.xml_pre_delete_hook(self) + obj.delete() + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + elif action in ("create", "set"): + obj = model.objects.xml_get_or_create(q_pdu) + obj.xml_template.decode(obj, q_pdu) + obj.xml_pre_save_hook(q_pdu) + obj.save() + yield obj.xml_post_save_hook(self, q_pdu) + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + else: + raise rpki.exceptions.BadQuery("Unrecognized action %r" % action) + + except Exception, e: + if not isinstance(e, rpki.exceptions.NotFound): + logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) + error_tenant_handle = q_pdu.get("tenant_handle") + error_tag = q_pdu.get("tag") + r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) + if error_tag is not None: + r_pdu.set("tag", error_tag) + if error_tenant_handle is not None: + r_pdu.set("tenant_handle", error_tenant_handle) + break + + handler.set_status(200) + handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) - if must_revoke: - revoked_cert_obj.revoke(cert = old_cert.cert, ca_detail = old_ca_detail) + except Exception, e: + logger.exception("Unhandled exception serving left-right request") + handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.finish() - self.gctx.sql.sweep() + @tornado.gen.coroutine + def up_down_handler(self, handler, tenant_handle, child_handle): + """ + Process one up-down PDU. + """ - if must_revoke: - ca_detail.generate_crl(publisher = publisher) - self.gctx.sql.sweep() + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.up_down.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + return - ca_detail.generate_manifest(publisher = publisher) + try: + child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) + q_der = handler.request.body + r_der = yield child.serve_up_down(self, q_der) + handler.set_header("Content-Type", rpki.up_down.content_type) + handler.set_status(200) + handler.finish(r_der) + + except rpki.rpkidb.models.Child.DoesNotExist: + logger.info("Child %r of tenant %r not found", child_handle, tenant_handle) + handler.set_status(400, "Child %r not found" % child_handle) + handler.finish() - def published_callback(self, pdu): - """ - Publication callback: check result and mark published. - """ - pdu.raise_if_error() - self.published = None - self.sql_mark_dirty() + except Exception, e: + logger.exception("Unhandled exception processing up-down request") + handler.set_status(400, "Could not process PDU: %s" % e) + handler.finish() class publication_queue(object): - """ - Utility to simplify publication from within rpkid. - - General idea here is to accumulate a collection of objects to be - published, in one or more repositories, each potentially with its - own completion callback. Eventually we want to publish everything - we've accumulated, at which point we need to iterate over the - collection and do repository.call_pubd() for each repository. - """ - - replace = True - - def __init__(self): - self.clear() - - def clear(self): - self.repositories = {} - self.msgs = {} - self.handlers = {} - if self.replace: - self.uris = {} - - def _add(self, uri, obj, repository, handler, make_pdu): - rid = id(repository) - if rid not in self.repositories: - self.repositories[rid] = repository - self.msgs[rid] = rpki.publication.msg.query() - if self.replace and uri in self.uris: - logger.debug("Removing publication duplicate <%s %r %r>", - self.uris[uri].action, self.uris[uri].uri, self.uris[uri].payload) - self.msgs[rid].remove(self.uris.pop(uri)) - pdu = make_pdu(uri = uri, obj = obj) - if handler is not None: - self.handlers[id(pdu)] = handler - pdu.tag = id(pdu) - self.msgs[rid].append(pdu) - if self.replace: - self.uris[uri] = pdu - - def publish(self, cls, uri, obj, repository, handler = None): - return self._add( uri, obj, repository, handler, cls.make_publish) - - def withdraw(self, cls, uri, obj, repository, handler = None): - return self._add( uri, obj, repository, handler, cls.make_withdraw) - - def call_pubd(self, cb, eb): - def loop(iterator, rid): - logger.debug("Calling pubd[%r]", self.repositories[rid]) - self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) - def done(): - self.clear() - cb() - rpki.async.iterator(self.repositories, loop, done) - - @property - def size(self): - return sum(len(self.msgs[rid]) for rid in self.repositories) - - def empty(self): - assert (not self.msgs) == (self.size == 0) - return not self.msgs + """ + Utility to simplify publication from within rpkid. + + General idea here is to accumulate a collection of objects to be + published, in one or more repositories, each potentially with its + own completion callback. Eventually we want to publish everything + we've accumulated, at which point we need to iterate over the + collection and do repository.call_pubd() for each repository. + """ + + # At present, ._inplay and .inplay() are debugging tools only. If + # there turns out to be a real race condition here, this might + # evolve into the hook for some kind of Condition()-based + # mechanism. + + _inplay = weakref.WeakValueDictionary() + + def __init__(self, rpkid): + self.rpkid = rpkid + self.clear() + + def clear(self): + self.repositories = {} + self.msgs = {} + self.handlers = {} + self.uris = {} + + def inplay(self, uri): + who = self._inplay.get(uri, self) + return who is not self and uri in who.uris + + def queue(self, uri, repository, handler = None, + old_obj = None, new_obj = None, old_hash = None): + + assert old_obj is not None or new_obj is not None or old_hash is not None + assert old_obj is None or old_hash is None + assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri)) + assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri)) + + logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s", + uri, old_obj, new_obj, old_hash) + + if self.inplay(uri): + logger.warning("%s is already in play", uri) + + rid = repository.peer_contact_uri + if rid not in self.repositories: + self.repositories[rid] = repository + self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + + if uri in self.uris: + logger.debug("Removing publication duplicate %r %s hash %s", + self.uris[uri], uri, self.uris[uri].get("hash")) + old_pdu = self.uris.pop(uri) + self.msgs[rid].remove(old_pdu) + pdu_hash = old_pdu.get("hash") + if pdu_hash is None and new_obj is None: + logger.debug("Withdrawing object %r which was never published simplifies to no-op", + old_pdu) + return + elif old_hash is not None: + logger.debug("Old hash supplied") # XXX Debug log + pdu_hash = old_hash + elif old_obj is None: + logger.debug("No old object present") # XXX Debug log + pdu_hash = None + else: + logger.debug("Calculating hash of old object") # XXX Debug log + pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex") + + logger.debug("uri %s old hash %s new hash %s", uri, pdu_hash, # XXX Debug log + None if new_obj is None else rpki.x509.sha256(new_obj.get_DER()).encode("hex")) + + if new_obj is None: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash) + else: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri) + pdu.text = new_obj.get_Base64() + if pdu_hash is not None: + pdu.set("hash", pdu_hash) + + if handler is not None: + self.handlers[uri] = handler + + self.uris[uri] = pdu + self._inplay[uri] = self + + @tornado.gen.coroutine + def call_pubd(self): + for rid in self.repositories: + logger.debug("Calling pubd[%r]", self.repositories[rid]) + try: + yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + except (rpki.exceptions.ExistingObjectAtURI, + rpki.exceptions.DifferentObjectAtURI, + rpki.exceptions.NoObjectAtURI) as e: + logger.warn("Lost synchronization with %r: %s", self.repositories[rid], e) + yield self.resync(self.repositories[rid]) + for k in self.uris.iterkeys(): + if self._inplay.get(k) is self: + del self._inplay[k] + self.clear() + + @tornado.gen.coroutine + def resync(self, repository): + logger.info("Attempting resynchronization with %r", repository) + + # A lot of this is copy and paste from .serve_publish_world_now(). + # Refactor when we have more of a clue about how this should work. + + q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + SubElement(q_msg, rpki.publication.tag_list, tag = "list") + r_msg = yield repository.call_pubd(self.rpkid, q_msg, length_check = False) + + if not all(r_pdu.tag == rpki.publication.tag_list for r_pdu in r_msg): + raise rpki.exceptions.BadPublicationReply("Unexpected XML tag in publication response") + + pubd_objs = dict((r_pdu.get("uri"), r_pdu.get("hash")) for r_pdu in r_msg) + + our_objs = [] + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter( + ca__parent__tenant = repository.tenant, state = "active"): + our_objs = [(ca_detail.crl_uri, ca_detail.latest_crl), + (ca_detail.manifest_uri, ca_detail.latest_manifest)] + our_objs.extend((c.uri, c.cert) for c in ca_detail.child_certs.all()) + our_objs.extend((r.uri, r.roa) for r in ca_detail.roas.filter(roa__isnull = False)) + our_objs.extend((g.uri, g.ghostbuster) for g in ca_detail.ghostbusters.all()) + our_objs.extend((c.uri, c.cert) for c in ca_detail.ee_certificates.all()) + + q_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + + for uri, obj in our_objs: + if uri not in pubd_objs: + SubElement(q_msg, rpki.publication.tag_publish, uri = uri).text = obj.get_Base64() + else: + h = pubd_objs.pop(uri) + if h != rpki.x509.sha256(obj.get_DER()).encode("hex"): + SubElement(q_msg, rpki.publication.tag_publish, + uri = uri, hash = h).text = obj.get_Base64() + + for uri, h in pubd_objs.iteritems(): + SubElement(q_msg, rpki.publication.tag_withdraw, uri = uri, hash = h) + + yield repository.call_pubd(self.rpkid, q_msg) + + @property + def size(self): + return sum(len(self.msgs[rid]) for rid in self.repositories) + + def empty(self): + return not self.msgs |