diff options
author | Rob Austein <sra@hactrn.net> | 2014-08-21 15:33:59 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2014-08-21 15:33:59 +0000 |
commit | 9edff5e5bb5a64b7a08e82b804d9d147d2258052 (patch) | |
tree | 53dd20a8cc16eb96d64823248495913d563b5a73 /rpki/pubd.py | |
parent | 82d157ad504a92ad7c8c703e14cadee65a449776 (diff) |
Generate rsync updates from RRDP delta we just committed to SQL.
Start moving publication protocol away from using SAX.
svn path=/branches/tk705/; revision=5932
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r-- | rpki/pubd.py | 117 |
1 files changed, 82 insertions, 35 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py index c6a2e2d2..4814dc24 100644 --- a/rpki/pubd.py +++ b/rpki/pubd.py @@ -51,7 +51,22 @@ rrdp_xmlns = rpki.relaxng.rrdp.xmlns rrdp_nsmap = rpki.relaxng.rrdp.nsmap rrdp_version = "1" -rpki_content_type = "application/x-rpki" +pub_xmlns = rpki.relaxng.publication.xmlns +pub_nsmap = rpki.relaxng.publication.nsmap +pub_version = rpki.relaxng.publication.version + +rrdp_tag_delta = rrdp_xmlns + "delta" +rrdp_tag_deltas = rrdp_xmlns + "deltas" +rrdp_tag_notification = rrdp_xmlns + "notification" +rrdp_tag_publish = rrdp_xmlns + "publish" +rrdp_tag_snapshot = rrdp_xmlns + "snapshot" +rrdp_tag_withdraw = rrdp_xmlns + "withdraw" + +pub_tag_msg = pub_xmlns + "msg" +pub_tag_list = pub_xmlns + "list" +pub_tag_publish = pub_xmlns + "publish" +pub_tag_withdraw = pub_xmlns + "withdraw" +pub_tag_report_error = pub_xmlns + "report_error" def DERSubElement(elt, name, der, attrib = None, **kwargs): @@ -189,38 +204,43 @@ class main(object): client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,)) if client is None: raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle) - q_cms = rpki.publication.cms_msg(DER = q_der) + q_cms = rpki.publication.cms_msg_no_sax(DER = q_der) q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) q_cms.check_replay_sql(client, client.client_handle) - if not q_msg.is_query(): - raise rpki.exceptions.BadQuery("Message type is not query") - r_msg = q_msg.__class__.reply() + if q_msg.get("type") != "query": + raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) + r_msg = Element(pub_tag_msg, nsmap = pub_nsmap, type = "reply", version = pub_version) delta = None failed = False for q_pdu in q_msg: try: - if isinstance(q_pdu, rpki.publication.list_elt): + if q_pdu.tag == pub_tag_list: for obj in client.objects: - r_pdu = q_pdu.__class__() - r_pdu.tag = q_pdu.tag - r_pdu.uri = obj.uri - r_pdu.hash = obj.hash - r_msg.append(r_pdu) + r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) else: + assert q_pdu.tag in (pub_tag_publish, pub_tag_withdraw) if delta is None and not failed: delta = self.session.new_delta() - q_pdu.gctx = self - q_pdu.client = client - q_pdu.client.check_allowed_uri(q_pdu.uri) - q_pdu.serve_action(delta) - r_pdu = q_pdu.__class__() - r_pdu.tag = q_pdu.tag - r_pdu.uri = q_pdu.uri - r_msg.append(r_pdu) + client.check_allowed_uri(q_pdu.get("uri")) + if q_pdu.tag == pub_tag_publish: + der = q_pdu.text.decode("base64") + logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri"))) + delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash")) + else: + logger.info("Withdrawing %s", q_pdu.get("uri")) + delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash")) + r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri")) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) except Exception, e: if not isinstance(e, rpki.exceptions.NotFound): logger.exception("Exception processing PDU %r", q_pdu) - r_msg.append(rpki.publication.report_error_elt.from_exception(e, q_pdu.tag)) + r_pdu = SubElement(r_msg, pub_tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) failed = True if delta is not None: delta.sql_delete() @@ -243,12 +263,11 @@ class main(object): self.session.write_snapshot() self.session.write_notification() - # Somewhere around here is also where we should finally write - # stuff out to rsync store, now that SQL is the publication - # database of record. This may require doing the filesystem - # updates from the delta, but that should be straightforward. + # Write changes to rsyncd from the delta we just committed - request.send_cms_response(rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) + delta.write_rsync() + + request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) self.sql.commit() except Exception, e: @@ -340,13 +359,13 @@ class session_obj(rpki.sql.sql_persistent): Generate an XML snapshot of this session. """ - xml = Element(rrdp_xmlns + "snapshot", nsmap = rrdp_nsmap, + xml = Element(rrdp_tag_snapshot, nsmap = rrdp_nsmap, version = rrdp_version, session_id = self.uuid, serial = str(self.serial)) xml.text = "\n" for obj in self.objects: - DERSubElement(xml, rrdp_xmlns + "publish", + DERSubElement(xml, rrdp_tag_publish, der = obj.der, uri = obj.uri) rpki.relaxng.rrdp.assertValid(xml) @@ -382,15 +401,15 @@ class session_obj(rpki.sql.sql_persistent): Write current notification file to disk. """ - xml = Element(rrdp_xmlns + "notification", nsmap = rrdp_nsmap, + xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap, version = rrdp_version, session_id = self.uuid, serial = str(self.serial)) - SubElement(xml, rrdp_xmlns + "snapshot", + SubElement(xml, rrdp_tag_snapshot, uri = self.gctx.rrdp_filename_to_uri(self.snapshot_fn), hash = self.hash) for delta in self.deltas: - se = SubElement(xml, rrdp_xmlns + "delta", + se = SubElement(xml, rrdp_tag_delta, to = str(delta.serial), uri = self.gctx.rrdp_filename_to_uri(delta.fn), hash = delta.hash) @@ -435,19 +454,18 @@ class delta_obj(rpki.sql.sql_persistent): self.xml = None self.hash = None self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval - self.deltas = Element(rrdp_xmlns + "deltas", nsmap = rrdp_nsmap, + self.deltas = Element(rrdp_tag_deltas, nsmap = rrdp_nsmap, to = str(self.serial), version = rrdp_version, session_id = session.uuid) self.deltas.set("from", str(self.serial - 1)) - SubElement(self.deltas, rrdp_xmlns + "delta", serial = str(self.serial)).text = "\n" + SubElement(self.deltas, rrdp_tag_delta, serial = str(self.serial)).text = "\n" return self def activate(self): rpki.relaxng.rrdp.assertValid(self.deltas) self.xml = ElementToString(self.deltas, pretty_print = True) self.hash = rpki.x509.sha256(self.xml).encode("hex") - del self.deltas self.sql_mark_dirty() def publish(self, client, der, uri, hash): @@ -458,7 +476,7 @@ class delta_obj(rpki.sql.sql_persistent): raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri) logger.debug("Publishing %s", uri) object_obj.create(client, self, der, uri) - se = DERSubElement(self.deltas[0], rrdp_xmlns + "publish", der = der, uri = uri) + se = DERSubElement(self.deltas[0], rrdp_tag_publish, der = der, uri = uri) if hash is not None: se.set("hash", hash) rpki.relaxng.rrdp.assertValid(self.deltas) @@ -471,9 +489,38 @@ class delta_obj(rpki.sql.sql_persistent): raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash)) logger.debug("Withdrawing %s", uri) obj.delete(self) - SubElement(self.deltas[0], rrdp_xmlns + "withdraw", uri = uri, hash = hash).tail = "\n" + SubElement(self.deltas[0], rrdp_tag_withdraw, uri = uri, hash = hash).tail = "\n" rpki.relaxng.rrdp.assertValid(self.deltas) + def write_rsync(self): + min_path_len = len(self.gctx.publication_base.rstrip("/")) + for pdu in self.deltas[0]: + assert pdu.tag in (rrdp_tag_publish, rrdp_tag_withdraw) + fn = self.gctx.uri_to_filename(pdu.get("uri")) + if pdu.tag == rrdp_tag_publish: + tn = fn + ".tmp" + dn = os.path.dirname(fn) + if not os.path.isdir(dn): + os.makedirs(dn) + with open(tn, "wb") as f: + f.write(pdu.text.decode("base64")) + os.rename(tn, fn) + else: + try: + os.remove(fn) + except OSError, e: + if e.errno != errno.ENOENT: + raise + dn = os.path.dirname(fn) + while len(dn) > min_path_len: + try: + os.rmdir(dn) + except OSError: + break + else: + dn = os.path.dirname(dn) + del self.deltas + class object_obj(rpki.sql.sql_persistent): """ |