diff options
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r-- | rpki/rpkid.py | 811 |
1 files changed, 583 insertions, 228 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index 628209af..bc13cd9a 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -28,6 +28,7 @@ import random import base64 import logging import argparse + import rpki.resource_set import rpki.up_down import rpki.left_right @@ -42,8 +43,11 @@ import rpki.async import rpki.daemonize import rpki.rpkid_tasks +from lxml.etree import Element, SubElement, tostring as ElementToString + logger = logging.getLogger(__name__) + class main(object): """ Main program for rpkid. @@ -75,7 +79,7 @@ class main(object): rpki.log.init("rpkid", args) - self.cfg = rpki.config.parser(args.config, "rpkid") + self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid") self.cfg.set_global_flags() if not args.foreground: @@ -101,6 +105,12 @@ class main(object): if self.profile: logger.info("Running in profile mode with output to %s", self.profile) + import django + django.setup() + + global rpki # pylint: disable=W0602 + import rpki.rpkidb # pylint: disable=W0621 + self.sql = rpki.sql.session(self.cfg) self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) @@ -116,17 +126,6 @@ class main(object): self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/") - # Icky hack to let Iain do some testing quickly, should go away - # once we sort out whether we can make this change permanent. - # - # OK, the stuff to add router certificate support makes enough - # other changes that we're going to need a migration program in - # any case, so might as well throw the switch here too, or at - # least find out if it (still) works as expected. - - self.merge_publication_directories = self.cfg.getboolean("merge_publication_directories", - True) - self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), @@ -161,19 +160,23 @@ class main(object): else: logger.debug("Not using internal clock, start_cron() call ignored") - def irdb_query(self, callback, errback, *q_pdus, **kwargs): + @staticmethod + def _compose_left_right_query(): + """ + Compose top level element of a left-right query to irdbd. + """ + + return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "query", version = rpki.left_right.version) + + def irdb_query(self, q_msg, callback, errback): """ Perform an IRDB callback query. """ try: - q_types = tuple(type(q_pdu) for q_pdu in q_pdus) + q_tags = set(q_pdu.tag for q_pdu in q_msg) - expected_pdu_count = kwargs.pop("expected_pdu_count", None) - assert len(kwargs) == 0 - - q_msg = rpki.left_right.msg.query() - q_msg.extend(q_pdus) q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) def unwrap(r_der): @@ -181,15 +184,10 @@ class main(object): r_cms = rpki.left_right.cms_msg(DER = r_der) r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - if not r_msg.is_reply() or not all(type(r_pdu) in q_types for r_pdu in r_msg): + #rpki.left_right.check_response(r_msg) + if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): raise rpki.exceptions.BadIRDBReply( "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - if expected_pdu_count is not None and len(r_msg) != expected_pdu_count: - assert isinstance(expected_pdu_count, (int, long)) - raise rpki.exceptions.BadIRDBReply( - "Expected exactly %d PDU%s from IRDB: %s" % ( - expected_pdu_count, "" if expected_pdu_count == 1 else "s", - r_cms.pretty_print_content())) callback(r_msg) except Exception, e: errback(e) @@ -209,100 +207,335 @@ class main(object): Ask IRDB about a child's resources. """ - q_pdu = rpki.left_right.list_resources_elt() - q_pdu.self_handle = self_handle - q_pdu.child_handle = child_handle + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_resources, + self_handle = self_handle, child_handle = child_handle) def done(r_msg): + if len(r_msg) != 1: + raise rpki.exceptions.BadIRDBReply( + "Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) callback(rpki.resource_set.resource_bag( - asn = r_msg[0].asn, - v4 = r_msg[0].ipv4, - v6 = r_msg[0].ipv6, - valid_until = r_msg[0].valid_until)) + asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), + v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), + v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), + valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))) - self.irdb_query(done, errback, q_pdu, expected_pdu_count = 1) + self.irdb_query(q_msg, done, errback) def irdb_query_roa_requests(self, self_handle, callback, errback): """ Ask IRDB about self's ROA requests. """ - q_pdu = rpki.left_right.list_roa_requests_elt() - q_pdu.self_handle = self_handle - - self.irdb_query(callback, errback, q_pdu) + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_roa_requests, self_handle = self_handle) + self.irdb_query(q_msg, callback, errback) def irdb_query_ghostbuster_requests(self, self_handle, parent_handles, callback, errback): """ Ask IRDB about self's ghostbuster record requests. """ - q_pdus = [] - + q_msg = self._compose_left_right_query() for parent_handle in parent_handles: - q_pdu = rpki.left_right.list_ghostbuster_requests_elt() - q_pdu.self_handle = self_handle - q_pdu.parent_handle = parent_handle - q_pdus.append(q_pdu) - - self.irdb_query(callback, errback, *q_pdus) + SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, + self_handle = self_handle, parent_handle = parent_handle) + self.irdb_query(q_msg, callback, errback) def irdb_query_ee_certificate_requests(self, self_handle, callback, errback): """ Ask IRDB about self's EE certificate requests. """ - q_pdu = rpki.left_right.list_ee_certificate_requests_elt() - q_pdu.self_handle = self_handle + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, self_handle = self_handle) + self.irdb_query(q_msg, callback, errback) + + @property + def left_right_models(self): + """ + Map element tag to rpkidb model. + """ + + try: + return self._left_right_models + except AttributeError: + import rpki.rpkidb.models # pylint: disable=W0621 + self._left_right_models = { + rpki.left_right.tag_self : rpki.rpkidb.models.Self, + rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC, + rpki.left_right.tag_parent : rpki.rpkidb.models.Parent, + rpki.left_right.tag_child : rpki.rpkidb.models.Child, + rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } + return self._left_right_models + + @property + def left_right_trivial_handlers(self): + """ + Map element tag to bound handler methods for trivial PDU types. + """ + + try: + return self._left_right_trivial_handlers + except AttributeError: + self._left_right_trivial_handlers = { + rpki.left_right.tag_list_published_objects : self.handle_list_published_objects, + rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } + return self._left_right_trivial_handlers + + def handle_list_published_objects(self, q_pdu, r_msg): + """ + <list_published_objects/> server. + + This is written for the old SQL API, will need rewriting once we + switch rpkid to Django ORM. + """ + + logger.debug(".handle_list_published_objects() %s", ElementToString(q_pdu)) + + self_handle = q_pdu.get("self_handle") + msg_tag = q_pdu.get("tag") + + kw = dict(self_handle = self_handle) + if msg_tag is not None: + kw.update(tag = msg_tag) + + for parent in rpki.left_right.self_elt.serve_fetch_handle(self, None, self_handle).parents: + for ca in parent.cas: + ca_detail = ca.active_ca_detail + if ca_detail is not None: + + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64() + + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64() + + for c in ca_detail.child_certs: + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64() + + for r in ca_detail.roas: + if r.roa is not None: + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = r.uri, **kw).text = r.roa.get_Base64() + + for g in ca_detail.ghostbusters: + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = g.uri, **kw).text = g.ghostbuster.get_Base64() + + for c in ca_detail.ee_certificates: + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, **kw).text = c.cert.get_Base64() + + def handle_list_received_resources(self, q_pdu, r_msg): + """ + <list_received_resources/> server. + + This is written for the old SQL API, will need rewriting once we + switch rpkid to Django ORM. + """ + + logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu)) + + self_handle = q_pdu.get("self_handle") + msg_tag = q_pdu.get("tag") + + for parent in rpki.left_right.self_elt.serve_fetch_handle(self, None, self_handle).parents: + for ca in parent.cas: + ca_detail = ca.active_ca_detail + if ca_detail is not None and ca_detail.latest_ca_cert is not None: + + cert = ca_detail.latest_ca_cert + resources = cert.get_3779resources() + + r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources, + self_handle = self_handle, + parent_handle = parent.parent_handle, + uri = ca_detail.ca_cert_uri, + notBefore = str(cert.getNotBefore()), + notAfter = str(cert.getNotAfter()), + sia_uri = cert.get_sia_directory_uri(), + aia_uri = cert.get_aia_uri(), + asn = str(resources.asn), + ipv4 = str(resources.v4), + ipv6 = str(resources.v6)) + + if msg_tag is not None: + r_pdu.set("tag", msg_tag) - self.irdb_query(callback, errback, q_pdu) def left_right_handler(self, query, path, cb): """ Process one left-right PDU. """ - def done(r_msg): - reply = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert) - self.sql.sweep() - cb(200, body = reply) + # This handles five persistent classes (self, bsc, parent, child, + # repository) and two simple queries (list_published_objects and + # list_received_resources). The former probably need to dispatch + # via methods to the corresponding model classes; the latter + # probably just become calls to ordinary methods of this + # (rpki.rpkid.main) class. + # + # Need to clone logic from rpki.pubd.main.control_handler(). try: q_cms = rpki.left_right.cms_msg(DER = query) q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) + r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "reply", version = rpki.left_right.version) self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path) - if not q_msg.is_query(): + + assert q_msg.tag.startswith(rpki.left_right.xmlns) + assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) + + if q_msg.get("version") != rpki.left_right.version: + raise rpki.exceptions.BadQuery("Unrecognized protocol version") + + if q_msg.get("type") != "query": raise rpki.exceptions.BadQuery("Message type is not query") - q_msg.serve_top_level(self, done) + + def done(): + self.sql.sweep() + cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + + def loop(iterator, q_pdu): + + def fail(e): + if not isinstance(e, rpki.exceptions.NotFound): + logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) + # Compatability kludge + if isinstance(q_pdu, rpki.left_right.base_elt): + error_self_handle = q_pdu.self_handle + error_tag = q_pdu.tag + else: + error_self_handle = q_pdu.get("self_handle") + error_tag = q_pdu.get("tag") + r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) + if error_tag is not None: + r_pdu.set("tag", error_tag) + if error_self_handle is not None: + r_pdu.set("self_handle", error_self_handle) + self.sql.sweep() + cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + + try: + if q_pdu.tag in self.left_right_trivial_handlers: + self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) + iterator() + + elif True: # Old-style handlers + + q_map = { rpki.left_right.tag_self : rpki.left_right.self_elt, + rpki.left_right.tag_bsc : rpki.left_right.bsc_elt, + rpki.left_right.tag_parent : rpki.left_right.parent_elt, + rpki.left_right.tag_child : rpki.left_right.child_elt, + rpki.left_right.tag_repository : rpki.left_right.repository_elt } + q_pdu = q_map[q_pdu.tag].fromXML(q_pdu) + q_pdu.gctx = self + q_pdu.serve_dispatch(r_msg, iterator, fail) + + else: # New-style handlers + + # Notes on hooks in old code + # + # .serve_pre_save_hook(): used by all classes to do some + # kind of handle fixup which I think is now OBE. Also + # used by BSC for key generation, because schema (and + # corresponding new model) don't allow NULL for private + # key or PKCS10 request, so either we have to relax the + # schema constraint or generate key before saving. + # (bsc) + # + # .serve_destroy_hook(): used by several objects to + # trigger revocation of related objects. Will probably + # need to preserve this behavior. + # (self, parent, child) + # + # .serve_post_save_hook(): used to trigger various actions + # based on boolean attributes in XML. + # (self, repository, parent, child) + + action = q_pdu.get("action") + model = self.left_right_models[q_pdu.tag] + + if action in ("get", "list"): + for obj in model.objects.xml_list(q_pdu): + obj.xml_template.encode(obj, r_msg) + + elif action == "destroy": + obj = model.objects.xml_get_for_delete(q_pdu) + try: + hook = obj.xml_pre_delete_hook + except AttributeError: + pass + else: + hook() + obj.delete() + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + elif action in ("create", "set"): + obj = model.objects.xml_get_or_create(q_pdu) + obj.xml_template.decode(obj, q_pdu) + try: + hook = obj.xml_pre_save_hook + except AttributeError: + pass + else: + hook(q_pdu) + obj.save() + try: + hook = obj.xml_post_save_hook + except AttributeError: + pass + else: + hook(q_pdu) + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + else: + raise rpki.exceptions.BadQuery + + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + fail(e) + + rpki.async.iterator(q_msg, loop, done) + except (rpki.async.ExitNow, SystemExit): raise + except Exception, e: logger.exception("Unhandled exception serving left-right request") cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I) - def up_down_handler(self, query, path, cb): + def up_down_handler(self, q_der, path, cb): """ Process one up-down PDU. """ - def done(reply): + def done(r_der): self.sql.sweep() - cb(200, body = reply) + cb(200, body = r_der) try: match = self.up_down_url_regexp.search(path) if match is None: raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path) self_handle, child_handle = match.groups() - child = rpki.left_right.child_elt.sql_fetch_where1(self, - "self.self_handle = %s AND child.child_handle = %s AND child.self_id = self.self_id", - (self_handle, child_handle), - "self") + child = rpki.left_right.child_elt.sql_fetch_where1( + gctx = self, + where = "self.self_handle = %s AND child.child_handle = %s AND child.self_id = self.self_id", + args = (self_handle, child_handle), + also_from = "self") if child is None: - raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % (child_handle, self_handle)) - child.serve_up_down(query, done) + raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % ( + child_handle, self_handle)) + child.serve_up_down(q_der, done) except (rpki.async.ExitNow, SystemExit): raise except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e: @@ -317,6 +550,7 @@ class main(object): Record that we were still alive when we got here, by resetting keepalive timer. """ + if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive @@ -324,6 +558,7 @@ class main(object): """ Add a task to the scheduler task queue, unless it's already queued. """ + if task not in self.task_queue: logger.debug("Adding %r to task queue", task) self.task_queue.append(task) @@ -338,6 +573,7 @@ class main(object): queue (we don't want to run it directly, as that could eventually blow out our call stack). """ + try: self.task_current = self.task_queue.pop(0) except IndexError: @@ -349,6 +585,7 @@ class main(object): """ Run first task on the task queue, unless one is running already. """ + if self.task_current is None: self.task_next() @@ -445,6 +682,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch parent object to which this CA object links. """ + return rpki.left_right.parent_elt.sql_fetch(self.gctx, self.parent_id) @property @@ -452,6 +690,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch all ca_detail objects that link to this CA object. """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s", (self.ca_id,)) @property @@ -459,6 +698,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch the pending ca_details for this CA, if any. """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'pending'", (self.ca_id,)) @property @@ -466,6 +706,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch the active ca_detail for this CA, if any. """ + return ca_detail_obj.sql_fetch_where1(self.gctx, "ca_id = %s AND state = 'active'", (self.ca_id,)) @property @@ -473,6 +714,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch deprecated ca_details for this CA, if any. """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'deprecated'", (self.ca_id,)) @property @@ -480,6 +722,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch active and deprecated ca_details for this CA, if any. """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND (state = 'active' OR state = 'deprecated')", (self.ca_id,)) @property @@ -487,6 +730,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Fetch revoked ca_details for this CA, if any. """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'revoked'", (self.ca_id,)) @property @@ -495,7 +739,7 @@ class ca_obj(rpki.sql.sql_persistent): Fetch ca_details which are candidates for consideration when processing an up-down issue_response PDU. """ - #return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND latest_ca_cert IS NOT NULL AND state != 'revoked'", (self.ca_id,)) + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state != 'revoked'", (self.ca_id,)) def construct_sia_uri(self, parent, rc): @@ -504,16 +748,12 @@ class ca_obj(rpki.sql.sql_persistent): information and the parent's up-down protocol list_response PDU. """ - sia_uri = rc.suggested_sia_head and rc.suggested_sia_head.rsync() - if not sia_uri or not sia_uri.startswith(parent.sia_base): + sia_uri = rc.get("suggested_sia_head", "") + if not sia_uri.startswith("rsync://") or not sia_uri.startswith(parent.sia_base): sia_uri = parent.sia_base if not sia_uri.endswith("/"): raise rpki.exceptions.BadURISyntax("SIA URI must end with a slash: %s" % sia_uri) - # With luck this can go away sometime soon. - if self.gctx.merge_publication_directories: - return sia_uri - else: - return sia_uri + str(self.ca_id) + "/" + return sia_uri def check_for_updates(self, parent, rc, cb, eb): """ @@ -530,28 +770,40 @@ class ca_obj(rpki.sql.sql_persistent): self.sia_uri = sia_uri self.sql_mark_dirty() - rc_resources = rc.to_resource_bag() - cert_map = dict((c.cert.get_SKI(), c) for c in rc.certs) + class_name = rc.get("class_name") + + rc_resources = rpki.resource_set.resource_bag( + rc.get("resource_set_as"), + rc.get("resource_set_ipv4"), + rc.get("resource_set_ipv6"), + rc.get("resource_set_notafter")) + + cert_map = {} + for c in rc.getiterator(rpki.up_down.tag_certificate): + x = rpki.x509.X509(Base64 = c.text) + u = rpki.up_down.multi_uri(c.get("cert_url")).rsync() + cert_map[x.gSKI()] = (x, u) def loop(iterator, ca_detail): self.gctx.checkpoint() - rc_cert = cert_map.pop(ca_detail.public_key.get_SKI(), None) + rc_cert, rc_cert_uri = cert_map.pop(ca_detail.public_key.gSKI(), (None, None)) if rc_cert is None: - logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, maybe parent certificate went away?", - ca_detail.public_key.gSKI(), rc.class_name, parent.self.self_handle, parent.parent_handle) + logger.warning("SKI %s in resource class %s is in database but missing from list_response to %s from %s, " + "maybe parent certificate went away?", + ca_detail.public_key.gSKI(), class_name, parent.self.self_handle, parent.parent_handle) publisher = publication_queue() - ca_detail.delete(ca = ca_detail.ca, publisher = publisher) + ca_detail.destroy(ca = ca_detail.ca, publisher = publisher) return publisher.call_pubd(iterator, eb) else: - if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert.cert_url.rsync(): - logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert.cert_url.rsync()) - ca_detail.ca_cert_uri = rc_cert.cert_url.rsync() + if ca_detail.state == "active" and ca_detail.ca_cert_uri != rc_cert_uri: + logger.debug("AIA changed: was %s now %s", ca_detail.ca_cert_uri, rc_cert_uri) + ca_detail.ca_cert_uri = rc_cert_uri ca_detail.sql_mark_dirty() if ca_detail.state in ("pending", "active"): @@ -563,7 +815,7 @@ class ca_obj(rpki.sql.sql_persistent): if (ca_detail.state == "pending" or sia_uri_changed or - ca_detail.latest_ca_cert != rc_cert.cert or + ca_detail.latest_ca_cert != rc_cert or ca_detail.latest_ca_cert.getNotAfter() != rc_resources.valid_until or current_resources.undersized(rc_resources) or current_resources.oversized(rc_resources)): @@ -581,9 +833,7 @@ class ca_obj(rpki.sql.sql_persistent): def done(): if cert_map: logger.warning("Unknown certificate SKI%s %s in resource class %s in list_response to %s from %s, maybe you want to \"revoke_forgotten\"?", - "" if len(cert_map) == 1 else "s", - ", ".join(c.cert.gSKI() for c in cert_map.values()), - rc.class_name, parent.self.self_handle, parent.parent_handle) + "" if len(cert_map) == 1 else "s", ", ".join(cert_map), class_name, parent.self.self_handle, parent.parent_handle) self.gctx.sql.sweep() self.gctx.checkpoint() cb() @@ -591,29 +841,30 @@ class ca_obj(rpki.sql.sql_persistent): ca_details = self.issue_response_candidate_ca_details if True: - skis_parent = set(x.cert.gSKI() - for x in cert_map.itervalues()) + skis_parent = set(cert_map) skis_me = set(x.latest_ca_cert.gSKI() for x in ca_details if x.latest_ca_cert is not None) for ski in skis_parent & skis_me: logger.debug("Parent %s agrees that %s has SKI %s in resource class %s", - parent.parent_handle, parent.self.self_handle, ski, rc.class_name) + parent.parent_handle, parent.self.self_handle, ski, class_name) for ski in skis_parent - skis_me: logger.debug("Parent %s thinks %s has SKI %s in resource class %s but I don't think so", - parent.parent_handle, parent.self.self_handle, ski, rc.class_name) + parent.parent_handle, parent.self.self_handle, ski, class_name) for ski in skis_me - skis_parent: logger.debug("I think %s has SKI %s in resource class %s but parent %s doesn't think so", - parent.self.self_handle, ski, rc.class_name, parent.parent_handle) + parent.self.self_handle, ski, class_name, parent.parent_handle) if ca_details: rpki.async.iterator(ca_details, loop, done) else: logger.warning("Existing resource class %s to %s from %s with no certificates, rekeying", - rc.class_name, parent.self.self_handle, parent.parent_handle) + class_name, parent.self.self_handle, parent.parent_handle) self.gctx.checkpoint() self.rekey(cb, eb) + # Called from exactly one place, in rpki.rpkid_tasks.PollParentTask.class_loop(). + # Probably want to refactor. @classmethod def create(cls, parent, rc, cb, eb): """ @@ -624,7 +875,7 @@ class ca_obj(rpki.sql.sql_persistent): self = cls() self.gctx = parent.gctx self.parent_id = parent.parent_id - self.parent_resource_class = rc.class_name + self.parent_resource_class = rc.get("class_name") self.sql_store() try: self.sia_uri = self.construct_sia_uri(parent, rc) @@ -633,20 +884,20 @@ class ca_obj(rpki.sql.sql_persistent): raise ca_detail = ca_detail_obj.create(self) - def done(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) + def done(r_msg): + c = r_msg[0][0] + logger.debug("CA %r received certificate %s", self, c.get("cert_url")) ca_detail.activate( ca = self, - cert = c.cert, - uri = c.cert_url, + cert = rpki.x509.X509(Base64 = c.text), + uri = c.get("cert_url"), callback = cb, errback = eb) logger.debug("Sending issue request to %r from %r", parent, self.create) - rpki.up_down.issue_pdu.query(parent, self, ca_detail, done, eb) + parent.up_down_issue_query(self, ca_detail, done, eb) - def delete(self, parent, callback): + def destroy(self, parent, callback): """ The list of current resource classes received from parent does not include the class corresponding to this CA, so we need to delete @@ -669,13 +920,14 @@ class ca_obj(rpki.sql.sql_persistent): publisher = publication_queue() for ca_detail in self.ca_details: - ca_detail.delete(ca = self, publisher = publisher, allow_failure = True) + ca_detail.destroy(ca = self, publisher = publisher, allow_failure = True) publisher.call_pubd(done, lose) def next_serial_number(self): """ Allocate a certificate serial number. """ + self.last_issued_sn += 1 self.sql_mark_dirty() return self.last_issued_sn @@ -684,6 +936,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Allocate a manifest serial number. """ + self.last_manifest_sn += 1 self.sql_mark_dirty() return self.last_manifest_sn @@ -692,6 +945,7 @@ class ca_obj(rpki.sql.sql_persistent): """ Allocate a CRL serial number. """ + self.last_crl_sn += 1 self.sql_mark_dirty() return self.last_crl_sn @@ -708,19 +962,19 @@ class ca_obj(rpki.sql.sql_persistent): old_detail = self.active_ca_detail new_detail = ca_detail_obj.create(self) - def done(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) + def done(r_msg): + c = r_msg[0][0] + logger.debug("CA %r received certificate %s", self, c.get("cert_url")) new_detail.activate( ca = self, - cert = c.cert, - uri = c.cert_url, + cert = rpki.x509.X509(Base64 = c.text), + uri = c.get("cert_url"), predecessor = old_detail, callback = cb, errback = eb) logger.debug("Sending issue request to %r from %r", parent, self.rekey) - rpki.up_down.issue_pdu.query(parent, self, new_detail, done, eb) + parent.up_down_issue_query(self, new_detail, done, eb) def revoke(self, cb, eb, revoke_all = False): """ @@ -782,6 +1036,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Extra assertions for SQL decode of a ca_detail_obj. """ + rpki.sql.sql_persistent.sql_decode(self, vals) assert self.public_key is None or self.private_key_id is None or self.public_key.get_DER() == self.private_key_id.get_public_DER() assert self.manifest_public_key is None or self.manifest_private_key_id is None or self.manifest_public_key.get_DER() == self.manifest_private_key_id.get_public_DER() @@ -792,12 +1047,14 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Fetch CA object to which this ca_detail links. """ + return ca_obj.sql_fetch(self.gctx, self.ca_id) def fetch_child_certs(self, child = None, ski = None, unique = False, unpublished = None): """ Fetch all child_cert objects that link to this ca_detail. """ + return rpki.rpkid.child_cert_obj.fetch(self.gctx, child, self, ski, unique, unpublished) @property @@ -805,6 +1062,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Fetch all child_cert objects that link to this ca_detail. """ + return self.fetch_child_certs() def unpublished_child_certs(self, when): @@ -812,6 +1070,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): Fetch all unpublished child_cert objects linked to this ca_detail with attempted publication dates older than when. """ + return self.fetch_child_certs(unpublished = when) @property @@ -819,6 +1078,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Fetch all revoked_cert objects that link to this ca_detail. """ + return revoked_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) @property @@ -826,6 +1086,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Fetch all ROA objects that link to this ca_detail. """ + return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) def unpublished_roas(self, when): @@ -833,34 +1094,52 @@ class ca_detail_obj(rpki.sql.sql_persistent): Fetch all unpublished ROA objects linked to this ca_detail with attempted publication dates older than when. """ - return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when)) + + return rpki.rpkid.roa_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", + (self.ca_detail_id, when)) @property def ghostbusters(self): """ Fetch all Ghostbuster objects that link to this ca_detail. """ + return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) + def unpublished_ghostbusters(self, when): + """ + Fetch all unpublished Ghostbusters objects linked to this + ca_detail with attempted publication dates older than when. + """ + + return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, + "ca_detail_id = %s AND published IS NOT NULL and published < %s", + (self.ca_detail_id, when)) + @property def ee_certificates(self): """ Fetch all EE certificate objects that link to this ca_detail. """ + return rpki.rpkid.ee_cert_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s", (self.ca_detail_id,)) - def unpublished_ghostbusters(self, when): + def unpublished_ee_certificates(self, when): """ - Fetch all unpublished Ghostbusters objects linked to this + Fetch all unpublished EE certificate objects linked to this ca_detail with attempted publication dates older than when. """ - return rpki.rpkid.ghostbuster_obj.sql_fetch_where(self.gctx, "ca_detail_id = %s AND published IS NOT NULL and published < %s", (self.ca_detail_id, when)) + + return rpki.rpkid.ee_cert_obj.sql_fetch_where(self.gctx, + "ca_detail_id = %s AND published IS NOT NULL and published < %s", + (self.ca_detail_id, when)) @property def crl_uri(self): """ Return publication URI for this ca_detail's CRL. """ + return self.ca.sia_uri + self.crl_uri_tail @property @@ -868,6 +1147,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Return tail (filename portion) of publication URI for this ca_detail's CRL. """ + return self.public_key.gSKI() + ".crl" @property @@ -875,17 +1155,19 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Return publication URI for this ca_detail's manifest. """ + return self.ca.sia_uri + self.public_key.gSKI() + ".mft" def has_expired(self): """ Return whether this ca_detail's certificate has expired. """ + return self.latest_ca_cert.getNotAfter() <= rpki.sundial.now() def covers(self, target): """ - Test whether this ca-detail covers a given set of resources. + Test whether this ca_detail covers a given set of resources. """ assert not target.asn.inherit and not target.v4.inherit and not target.v6.inherit @@ -900,7 +1182,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): publisher = publication_queue() self.latest_ca_cert = cert - self.ca_cert_uri = uri.rsync() + self.ca_cert_uri = uri self.generate_manifest_cert() self.state = "active" self.generate_crl(publisher = publisher) @@ -921,7 +1203,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): publisher.call_pubd(callback, errback) - def delete(self, ca, publisher, allow_failure = False): + def destroy(self, ca, publisher, allow_failure = False): """ Delete this ca_detail and all of the certs it issued. @@ -932,11 +1214,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): repository = ca.parent.repository handler = False if allow_failure else None for child_cert in self.child_certs: - publisher.withdraw(cls = rpki.publication.certificate_elt, - uri = child_cert.uri, - obj = child_cert.cert, - repository = repository, - handler = handler) + publisher.queue(uri = child_cert.uri, + old_obj = child_cert.cert, + repository = repository, + handler = handler) child_cert.sql_mark_deleted() for roa in self.roas: roa.revoke(publisher = publisher, allow_failure = allow_failure, fast = True) @@ -947,21 +1228,19 @@ class ca_detail_obj(rpki.sql.sql_persistent): except AttributeError: latest_manifest = None if latest_manifest is not None: - publisher.withdraw(cls = rpki.publication.manifest_elt, - uri = self.manifest_uri, - obj = self.latest_manifest, - repository = repository, - handler = handler) + publisher.queue(uri = self.manifest_uri, + old_obj = self.latest_manifest, + repository = repository, + handler = handler) try: latest_crl = self.latest_crl except AttributeError: latest_crl = None if latest_crl is not None: - publisher.withdraw(cls = rpki.publication.crl_elt, - uri = self.crl_uri, - obj = self.latest_crl, - repository = repository, - handler = handler) + publisher.queue(uri = self.crl_uri, + old_obj = self.latest_crl, + repository = repository, + handler = handler) self.gctx.sql.sweep() for cert in self.revoked_certs: # + self.child_certs logger.debug("Deleting %r", cert) @@ -994,13 +1273,18 @@ class ca_detail_obj(rpki.sql.sql_persistent): ca = self.ca parent = ca.parent + class_name = ca.parent_resource_class + gski = self.latest_ca_cert.gSKI() def parent_revoked(r_msg): - if r_msg.payload.ski != self.latest_ca_cert.gSKI(): + if r_msg[0].get("class_name") != class_name: + raise rpki.exceptions.ResourceClassMismatch + + if r_msg[0].get("ski") != gski: raise rpki.exceptions.SKIMismatch - logger.debug("Parent revoked %s, starting cleanup", self.latest_ca_cert.gSKI()) + logger.debug("Parent revoked %s, starting cleanup", gski) crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) @@ -1038,8 +1322,9 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.sql_mark_dirty() publisher.call_pubd(cb, eb) - logger.debug("Asking parent to revoke CA certificate %s", self.latest_ca_cert.gSKI()) - rpki.up_down.revoke_pdu.query(ca, self.latest_ca_cert.gSKI(), parent_revoked, eb) + logger.debug("Asking parent to revoke CA certificate %s", gski) + parent.up_down_revoke_query(class_name, gski, parent_revoked, eb) + def update(self, parent, ca, rc, sia_uri_changed, old_resources, callback, errback): """ @@ -1047,24 +1332,27 @@ class ca_detail_obj(rpki.sql.sql_persistent): children of this ca_detail. """ - def issued(issue_response): - c = issue_response.payload.classes[0].certs[0] - logger.debug("CA %r received certificate %s", self, c.cert_url) + def issued(r_msg): + c = r_msg[0][0] + cert = rpki.x509.X509(Base64 = c.text) + cert_url = c.get("cert_url") + + logger.debug("CA %r received certificate %s", self, cert_url) if self.state == "pending": return self.activate( ca = ca, - cert = c.cert, - uri = c.cert_url, + cert = cert, + uri = cert_url, callback = callback, errback = errback) - validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != c.cert.getNotAfter() + validity_changed = self.latest_ca_cert is None or self.latest_ca_cert.getNotAfter() != cert.getNotAfter() publisher = publication_queue() - if self.latest_ca_cert != c.cert: - self.latest_ca_cert = c.cert + if self.latest_ca_cert != cert: + self.latest_ca_cert = cert self.sql_mark_dirty() self.generate_manifest_cert() self.generate_crl(publisher = publisher) @@ -1092,7 +1380,8 @@ class ca_detail_obj(rpki.sql.sql_persistent): publisher.call_pubd(callback, errback) logger.debug("Sending issue request to %r from %r", parent, self.update) - rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback) + parent.up_down_issue_query(ca, self, issued, errback) + @classmethod def create(cls, ca): @@ -1146,7 +1435,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): ca = self.ca, resources = resources, subject_key = self.manifest_public_key, - sia = (None, None, self.manifest_uri)) + sia = (None, None, self.manifest_uri, self.ca.parent.repository.rrdp_notification_uri)) def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None): """ @@ -1171,6 +1460,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): notAfter = resources.valid_until) if child_cert is None: + old_cert = None child_cert = rpki.rpkid.child_cert_obj( gctx = child.gctx, child_id = child.child_id, @@ -1178,6 +1468,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): cert = cert) logger.debug("Created new child_cert %r", child_cert) else: + old_cert = child_cert.cert child_cert.cert = cert del child_cert.ca_detail child_cert.ca_detail_id = self.ca_detail_id @@ -1186,10 +1477,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): child_cert.ski = cert.get_SKI() child_cert.published = rpki.sundial.now() child_cert.sql_store() - publisher.publish( - cls = rpki.publication.certificate_elt, + publisher.queue( uri = child_cert.uri, - obj = child_cert.cert, + old_obj = old_cert, + new_obj = child_cert.cert, repository = ca.parent.repository, handler = child_cert.published_callback) self.generate_manifest(publisher = publisher) @@ -1220,6 +1511,8 @@ class ca_detail_obj(rpki.sql.sql_persistent): certlist.append((revoked_cert.serial, revoked_cert.revoked)) certlist.sort() + old_crl = self.latest_crl + self.latest_crl = rpki.x509.CRL.generate( keypair = self.private_key_id, issuer = self.latest_ca_cert, @@ -1230,10 +1523,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.crl_published = rpki.sundial.now() self.sql_mark_dirty() - publisher.publish( - cls = rpki.publication.crl_elt, + publisher.queue( uri = self.crl_uri, - obj = self.latest_crl, + old_obj = old_crl, + new_obj = self.latest_crl, repository = parent.repository, handler = self.crl_published_callback) @@ -1241,7 +1534,8 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ Check result of CRL publication. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.crl_published = None self.sql_mark_dirty() @@ -1277,6 +1571,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): objs.extend((e.uri_tail, e.cert) for e in self.ee_certificates) logger.debug("Building manifest object %s", uri) + old_manifest = self.latest_manifest self.latest_manifest = rpki.x509.SignedManifest.build( serial = ca.next_manifest_number(), thisUpdate = now, @@ -1289,17 +1584,18 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.manifest_published = rpki.sundial.now() self.sql_mark_dirty() - publisher.publish(cls = rpki.publication.manifest_elt, - uri = uri, - obj = self.latest_manifest, - repository = parent.repository, - handler = self.manifest_published_callback) + publisher.queue(uri = uri, + old_obj = old_manifest, + new_obj = self.latest_manifest, + repository = parent.repository, + handler = self.manifest_published_callback) def manifest_published_callback(self, pdu): """ Check result of manifest publication. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.manifest_published = None self.sql_mark_dirty() @@ -1360,21 +1656,19 @@ class ca_detail_obj(rpki.sql.sql_persistent): self.crl_published is not None and \ self.crl_published < stale: logger.debug("Retrying publication for %s", self.crl_uri) - publisher.publish(cls = rpki.publication.crl_elt, - uri = self.crl_uri, - obj = self.latest_crl, - repository = repository, - handler = self.crl_published_callback) + publisher.queue(uri = self.crl_uri, + new_obj = self.latest_crl, + repository = repository, + handler = self.crl_published_callback) if self.latest_manifest is not None and \ self.manifest_published is not None and \ self.manifest_published < stale: logger.debug("Retrying publication for %s", self.manifest_uri) - publisher.publish(cls = rpki.publication.manifest_elt, - uri = self.manifest_uri, - obj = self.latest_manifest, - repository = repository, - handler = self.manifest_published_callback) + publisher.queue(uri = self.manifest_uri, + new_obj = self.latest_manifest, + repository = repository, + handler = self.manifest_published_callback) if not check_all: return @@ -1384,31 +1678,37 @@ class ca_detail_obj(rpki.sql.sql_persistent): for child_cert in self.unpublished_child_certs(stale): logger.debug("Retrying publication for %s", child_cert) - publisher.publish( - cls = rpki.publication.certificate_elt, + publisher.queue( uri = child_cert.uri, - obj = child_cert.cert, + new_obj = child_cert.cert, repository = repository, handler = child_cert.published_callback) for roa in self.unpublished_roas(stale): logger.debug("Retrying publication for %s", roa) - publisher.publish( - cls = rpki.publication.roa_elt, + publisher.queue( uri = roa.uri, - obj = roa.roa, + new_obj = roa.roa, repository = repository, handler = roa.published_callback) for ghostbuster in self.unpublished_ghostbusters(stale): logger.debug("Retrying publication for %s", ghostbuster) - publisher.publish( - cls = rpki.publication.ghostbuster_elt, + publisher.queue( uri = ghostbuster.uri, - obj = ghostbuster.ghostbuster, + new_obj = ghostbuster.ghostbuster, repository = repository, handler = ghostbuster.published_callback) + for ee_cert in self.unpublished_ee_certificates(stale): + logger.debug("Retrying publication for %s", ee_cert) + publisher.queue( + uri = ee_cert.uri, + new_obj = ee_cert.cert, + repository = repository, + handler = ee_cert.published_callback) + + class child_cert_obj(rpki.sql.sql_persistent): """ Certificate that has been issued to a child. @@ -1435,6 +1735,7 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Initialize a child_cert_obj. """ + rpki.sql.sql_persistent.__init__(self) self.gctx = gctx self.child_id = child_id @@ -1450,6 +1751,7 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Fetch child object to which this child_cert object links. """ + return rpki.left_right.child_elt.sql_fetch(self.gctx, self.child_id) @property @@ -1458,6 +1760,7 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Fetch ca_detail object to which this child_cert object links. """ + return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) @ca_detail.deleter @@ -1472,6 +1775,7 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Return the tail (filename) portion of the URI for this child_cert. """ + return self.cert.gSKI() + ".cer" @property @@ -1479,6 +1783,7 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Return the publication URI for this child_cert. """ + return self.ca_detail.ca.sia_uri + self.uri_tail def revoke(self, publisher, generate_crl_and_manifest = True): @@ -1490,10 +1795,9 @@ class child_cert_obj(rpki.sql.sql_persistent): ca = ca_detail.ca logger.debug("Revoking %r %r", self, self.uri) revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) - publisher.withdraw( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, + publisher.queue( + uri = self.uri, + old_obj = self.cert, repository = ca.parent.repository) self.gctx.sql.sweep() self.sql_delete() @@ -1624,7 +1928,8 @@ class child_cert_obj(rpki.sql.sql_persistent): """ Publication callback: check result and mark published. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.published = None self.sql_mark_dirty() @@ -1648,6 +1953,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent): """ Initialize a revoked_cert_obj. """ + rpki.sql.sql_persistent.__init__(self) self.gctx = gctx self.serial = serial @@ -1663,6 +1969,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent): """ Fetch ca_detail object to which this revoked_cert_obj links. """ + return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) @classmethod @@ -1670,6 +1977,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent): """ Revoke a certificate. """ + return cls( serial = cert.getSerial(), expires = cert.getNotAfter(), @@ -1711,6 +2019,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Fetch ca_detail object to which this roa_obj links. """ + return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) @ca_detail.deleter @@ -1724,6 +2033,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Extra SQL fetch actions for roa_obj -- handle prefix lists. """ + for version, datatype, attribute in ((4, rpki.resource_set.roa_prefix_set_ipv4, "ipv4"), (6, rpki.resource_set.roa_prefix_set_ipv6, "ipv6")): setattr(self, attribute, datatype.from_sql( @@ -1738,6 +2048,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Extra SQL insert actions for roa_obj -- handle prefix lists. """ + for version, prefix_set in ((4, self.ipv4), (6, self.ipv6)): if prefix_set: self.gctx.sql.executemany( @@ -1752,6 +2063,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Extra SQL delete actions for roa_obj -- handle prefix lists. """ + self.gctx.sql.execute("DELETE FROM roa_prefix WHERE roa_id = %s", (self.roa_id,)) def __repr__(self): @@ -1887,16 +2199,15 @@ class roa_obj(rpki.sql.sql_persistent): ca = ca, resources = resources, subject_key = keypair.get_public(), - sia = (None, None, self.uri_from_key(keypair))) + sia = (None, None, self.uri_from_key(keypair), ca.parent.repository.rrdp_notification_uri)) self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,)) self.published = rpki.sundial.now() self.sql_store() logger.debug("Generating %r URI %s", self, self.uri) - publisher.publish( - cls = rpki.publication.roa_elt, + publisher.queue( uri = self.uri, - obj = self.roa, + new_obj = self.roa, repository = ca.parent.repository, handler = self.published_callback) if not fast: @@ -1907,7 +2218,8 @@ class roa_obj(rpki.sql.sql_persistent): """ Check publication result. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.published = None self.sql_mark_dirty() @@ -1941,9 +2253,10 @@ class roa_obj(rpki.sql.sql_persistent): logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri) rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.roa_elt, uri = uri, obj = roa, - repository = ca_detail.ca.parent.repository, - handler = False if allow_failure else None) + publisher.queue(uri = uri, + old_obj = roa, + repository = ca_detail.ca.parent.repository, + handler = False if allow_failure else None) if not regenerate: self.sql_mark_deleted() @@ -1957,6 +2270,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Reissue ROA associated with this roa_obj. """ + if self.ca_detail is None: self.generate(publisher = publisher, fast = fast) else: @@ -1966,6 +2280,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Return publication URI for a public key. """ + return self.ca_detail.ca.sia_uri + key.gSKI() + ".roa" @property @@ -1973,6 +2288,7 @@ class roa_obj(rpki.sql.sql_persistent): """ Return the publication URI for this roa_obj's ROA. """ + return self.ca_detail.ca.sia_uri + self.uri_tail @property @@ -1981,6 +2297,7 @@ class roa_obj(rpki.sql.sql_persistent): Return the tail (filename portion) of the publication URI for this roa_obj's ROA. """ + return self.cert.gSKI() + ".roa" @@ -2023,6 +2340,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Fetch self object to which this ghostbuster_obj links. """ + return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) @property @@ -2031,6 +2349,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Fetch ca_detail object to which this ghostbuster_obj links. """ + return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) def __init__(self, gctx = None, self_id = None, ca_detail_id = None, vcard = None): @@ -2090,16 +2409,15 @@ class ghostbuster_obj(rpki.sql.sql_persistent): ca = ca, resources = resources, subject_key = keypair.get_public(), - sia = (None, None, self.uri_from_key(keypair))) + sia = (None, None, self.uri_from_key(keypair), ca.parent.repository.rrdp_notification_uri)) self.ghostbuster = rpki.x509.Ghostbuster.build(self.vcard, keypair, (self.cert,)) self.published = rpki.sundial.now() self.sql_store() logger.debug("Generating Ghostbuster record %r", self.uri) - publisher.publish( - cls = rpki.publication.ghostbuster_elt, + publisher.queue( uri = self.uri, - obj = self.ghostbuster, + new_obj = self.ghostbuster, repository = ca.parent.repository, handler = self.published_callback) if not fast: @@ -2109,7 +2427,8 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Check publication result. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.published = None self.sql_mark_dirty() @@ -2143,9 +2462,10 @@ class ghostbuster_obj(rpki.sql.sql_persistent): logger.debug("Withdrawing %r %s and revoking its EE cert", self, uri) rpki.rpkid.revoked_cert_obj.revoke(cert = cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.ghostbuster_elt, uri = uri, obj = ghostbuster, - repository = ca_detail.ca.parent.repository, - handler = False if allow_failure else None) + publisher.queue(uri = uri, + old_obj = ghostbuster, + repository = ca_detail.ca.parent.repository, + handler = False if allow_failure else None) if not regenerate: self.sql_mark_deleted() @@ -2159,6 +2479,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Reissue Ghostbuster associated with this ghostbuster_obj. """ + if self.ghostbuster is None: self.generate(publisher = publisher, fast = fast) else: @@ -2168,6 +2489,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Return publication URI for a public key. """ + return self.ca_detail.ca.sia_uri + key.gSKI() + ".gbr" @property @@ -2175,6 +2497,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): """ Return the publication URI for this ghostbuster_obj's ghostbuster. """ + return self.ca_detail.ca.sia_uri + self.uri_tail @property @@ -2183,6 +2506,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): Return the tail (filename portion) of the publication URI for this ghostbuster_obj's ghostbuster. """ + return self.cert.gSKI() + ".gbr" @@ -2220,6 +2544,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): """ Fetch self object to which this ee_cert_obj links. """ + return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) @property @@ -2228,6 +2553,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): """ Fetch ca_detail object to which this ee_cert_obj links. """ + return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) @ca_detail.deleter @@ -2245,6 +2571,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): Although, really, one has to ask why we don't just store g(SKI) in rpkid.sql instead of ski.... """ + return base64.urlsafe_b64encode(self.ski).rstrip("=") @gski.setter @@ -2256,6 +2583,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): """ Return the publication URI for this ee_cert_obj. """ + return self.ca_detail.ca.sia_uri + self.uri_tail @property @@ -2264,6 +2592,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): Return the tail (filename portion) of the publication URI for this ee_cert_obj. """ + return self.cert.gSKI() + ".cer" @classmethod @@ -2275,10 +2604,12 @@ class ee_cert_obj(rpki.sql.sql_persistent): cn, sn = subject_name.extract_cn_and_sn() ca = ca_detail.ca + sia = (None, None, ca_detail.ca.sia_uri + subject_key.gSKI() + ".cer", ca.parent.repository.rrdp_notification_uri) + cert = ca_detail.issue_ee( ca = ca, subject_key = subject_key, - sia = None, + sia = sia, resources = resources, notAfter = resources.valid_until, cn = cn, @@ -2291,12 +2622,11 @@ class ee_cert_obj(rpki.sql.sql_persistent): ca_detail_id = ca_detail.ca_detail_id, cert = cert) - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, + publisher.queue( + uri = self.uri, + new_obj = self.cert, repository = ca.parent.repository, - handler = self.published_callback) + handler = self.published_callback) self.sql_store() @@ -2315,10 +2645,9 @@ class ee_cert_obj(rpki.sql.sql_persistent): ca = ca_detail.ca logger.debug("Revoking %r %r", self, self.uri) revoked_cert_obj.revoke(cert = self.cert, ca_detail = ca_detail) - publisher.withdraw(cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, - repository = ca.parent.repository) + publisher.queue(uri = self.uri, + old_obj = self.cert, + repository = ca.parent.repository) self.gctx.sql.sweep() self.sql_delete() if generate_crl_and_manifest: @@ -2392,7 +2721,7 @@ class ee_cert_obj(rpki.sql.sql_persistent): ca = ca_detail.ca, subject_key = self.cert.getPublicKey(), eku = self.cert.get_EKU(), - sia = None, + sia = (None, None, self.uri, ca_detail.ca.parent.repository.rrdp_notification_uri), resources = resources, notAfter = resources.valid_until, cn = cn, @@ -2400,12 +2729,12 @@ class ee_cert_obj(rpki.sql.sql_persistent): self.sql_mark_dirty() - publisher.publish( - cls = rpki.publication.certificate_elt, - uri = self.uri, - obj = self.cert, + publisher.queue( + uri = self.uri, + old_obj = old_cert, + new_obj = self.cert, repository = ca_detail.ca.parent.repository, - handler = self.published_callback) + handler = self.published_callback) if must_revoke: revoked_cert_obj.revoke(cert = old_cert.cert, ca_detail = old_ca_detail) @@ -2422,7 +2751,8 @@ class ee_cert_obj(rpki.sql.sql_persistent): """ Publication callback: check result and mark published. """ - pdu.raise_if_error() + + rpki.publication.raise_if_error(pdu) self.published = None self.sql_mark_dirty() @@ -2450,29 +2780,54 @@ class publication_queue(object): if self.replace: self.uris = {} - def _add(self, uri, obj, repository, handler, make_pdu): + def queue(self, uri, repository, handler = None, + old_obj = None, new_obj = None, old_hash = None): + + assert old_obj is not None or new_obj is not None or old_hash is not None + assert old_obj is None or old_hash is None + assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri)) + assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri)) + + logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s", + uri, old_obj, new_obj, old_hash) + + # id(repository) may need to change to repository.peer_contact_uri + # once we convert from our custom SQL cache to Django ORM. + rid = id(repository) if rid not in self.repositories: self.repositories[rid] = repository - self.msgs[rid] = rpki.publication.msg.query() + self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + if self.replace and uri in self.uris: - logger.debug("Removing publication duplicate <%s %r %r>", - self.uris[uri].action, self.uris[uri].uri, self.uris[uri].payload) - self.msgs[rid].remove(self.uris.pop(uri)) - pdu = make_pdu(uri = uri, obj = obj) + logger.debug("Removing publication duplicate %r", self.uris[uri]) + old_pdu = self.uris.pop(uri) + self.msgs[rid].remove(old_pdu) + pdu_hash = old_pdu.get("hash") + elif old_hash is not None: + pdu_hash = old_hash + elif old_obj is None: + pdu_hash = None + else: + pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex") + + if new_obj is None: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash) + else: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri) + pdu.text = new_obj.get_Base64() + if pdu_hash is not None: + pdu.set("hash", pdu_hash) + if handler is not None: - self.handlers[id(pdu)] = handler - pdu.tag = id(pdu) - self.msgs[rid].append(pdu) + tag = str(id(pdu)) + self.handlers[tag] = handler + pdu.set("tag", tag) + if self.replace: self.uris[uri] = pdu - def publish(self, cls, uri, obj, repository, handler = None): - return self._add( uri, obj, repository, handler, cls.make_publish) - - def withdraw(self, cls, uri, obj, repository, handler = None): - return self._add( uri, obj, repository, handler, cls.make_withdraw) - def call_pubd(self, cb, eb): def loop(iterator, rid): logger.debug("Calling pubd[%r]", self.repositories[rid]) @@ -2487,5 +2842,5 @@ class publication_queue(object): return sum(len(self.msgs[rid]) for rid in self.repositories) def empty(self): - assert (not self.msgs) == (self.size == 0) + assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size) return not self.msgs |