aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpki/pubd.py117
-rw-r--r--rpki/publication.py8
2 files changed, 90 insertions, 35 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py
index c6a2e2d2..4814dc24 100644
--- a/rpki/pubd.py
+++ b/rpki/pubd.py
@@ -51,7 +51,22 @@ rrdp_xmlns = rpki.relaxng.rrdp.xmlns
rrdp_nsmap = rpki.relaxng.rrdp.nsmap
rrdp_version = "1"
-rpki_content_type = "application/x-rpki"
+pub_xmlns = rpki.relaxng.publication.xmlns
+pub_nsmap = rpki.relaxng.publication.nsmap
+pub_version = rpki.relaxng.publication.version
+
+rrdp_tag_delta = rrdp_xmlns + "delta"
+rrdp_tag_deltas = rrdp_xmlns + "deltas"
+rrdp_tag_notification = rrdp_xmlns + "notification"
+rrdp_tag_publish = rrdp_xmlns + "publish"
+rrdp_tag_snapshot = rrdp_xmlns + "snapshot"
+rrdp_tag_withdraw = rrdp_xmlns + "withdraw"
+
+pub_tag_msg = pub_xmlns + "msg"
+pub_tag_list = pub_xmlns + "list"
+pub_tag_publish = pub_xmlns + "publish"
+pub_tag_withdraw = pub_xmlns + "withdraw"
+pub_tag_report_error = pub_xmlns + "report_error"
def DERSubElement(elt, name, der, attrib = None, **kwargs):
@@ -189,38 +204,43 @@ class main(object):
client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,))
if client is None:
raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle)
- q_cms = rpki.publication.cms_msg(DER = q_der)
+ q_cms = rpki.publication.cms_msg_no_sax(DER = q_der)
q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
q_cms.check_replay_sql(client, client.client_handle)
- if not q_msg.is_query():
- raise rpki.exceptions.BadQuery("Message type is not query")
- r_msg = q_msg.__class__.reply()
+ if q_msg.get("type") != "query":
+ raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type"))
+ r_msg = Element(pub_tag_msg, nsmap = pub_nsmap, type = "reply", version = pub_version)
delta = None
failed = False
for q_pdu in q_msg:
try:
- if isinstance(q_pdu, rpki.publication.list_elt):
+ if q_pdu.tag == pub_tag_list:
for obj in client.objects:
- r_pdu = q_pdu.__class__()
- r_pdu.tag = q_pdu.tag
- r_pdu.uri = obj.uri
- r_pdu.hash = obj.hash
- r_msg.append(r_pdu)
+ r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash)
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
else:
+ assert q_pdu.tag in (pub_tag_publish, pub_tag_withdraw)
if delta is None and not failed:
delta = self.session.new_delta()
- q_pdu.gctx = self
- q_pdu.client = client
- q_pdu.client.check_allowed_uri(q_pdu.uri)
- q_pdu.serve_action(delta)
- r_pdu = q_pdu.__class__()
- r_pdu.tag = q_pdu.tag
- r_pdu.uri = q_pdu.uri
- r_msg.append(r_pdu)
+ client.check_allowed_uri(q_pdu.get("uri"))
+ if q_pdu.tag == pub_tag_publish:
+ der = q_pdu.text.decode("base64")
+ logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri")))
+ delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash"))
+ else:
+ logger.info("Withdrawing %s", q_pdu.get("uri"))
+ delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash"))
+ r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri"))
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
except Exception, e:
if not isinstance(e, rpki.exceptions.NotFound):
logger.exception("Exception processing PDU %r", q_pdu)
- r_msg.append(rpki.publication.report_error_elt.from_exception(e, q_pdu.tag))
+ r_pdu = SubElement(r_msg, pub_tag_report_error, error_code = e.__class__.__name__)
+ r_pdu.text = str(e)
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
failed = True
if delta is not None:
delta.sql_delete()
@@ -243,12 +263,11 @@ class main(object):
self.session.write_snapshot()
self.session.write_notification()
- # Somewhere around here is also where we should finally write
- # stuff out to rsync store, now that SQL is the publication
- # database of record. This may require doing the filesystem
- # updates from the delta, but that should be straightforward.
+ # Write changes to rsyncd from the delta we just committed
- request.send_cms_response(rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
+ delta.write_rsync()
+
+ request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
self.sql.commit()
except Exception, e:
@@ -340,13 +359,13 @@ class session_obj(rpki.sql.sql_persistent):
Generate an XML snapshot of this session.
"""
- xml = Element(rrdp_xmlns + "snapshot", nsmap = rrdp_nsmap,
+ xml = Element(rrdp_tag_snapshot, nsmap = rrdp_nsmap,
version = rrdp_version,
session_id = self.uuid,
serial = str(self.serial))
xml.text = "\n"
for obj in self.objects:
- DERSubElement(xml, rrdp_xmlns + "publish",
+ DERSubElement(xml, rrdp_tag_publish,
der = obj.der,
uri = obj.uri)
rpki.relaxng.rrdp.assertValid(xml)
@@ -382,15 +401,15 @@ class session_obj(rpki.sql.sql_persistent):
Write current notification file to disk.
"""
- xml = Element(rrdp_xmlns + "notification", nsmap = rrdp_nsmap,
+ xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap,
version = rrdp_version,
session_id = self.uuid,
serial = str(self.serial))
- SubElement(xml, rrdp_xmlns + "snapshot",
+ SubElement(xml, rrdp_tag_snapshot,
uri = self.gctx.rrdp_filename_to_uri(self.snapshot_fn),
hash = self.hash)
for delta in self.deltas:
- se = SubElement(xml, rrdp_xmlns + "delta",
+ se = SubElement(xml, rrdp_tag_delta,
to = str(delta.serial),
uri = self.gctx.rrdp_filename_to_uri(delta.fn),
hash = delta.hash)
@@ -435,19 +454,18 @@ class delta_obj(rpki.sql.sql_persistent):
self.xml = None
self.hash = None
self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval
- self.deltas = Element(rrdp_xmlns + "deltas", nsmap = rrdp_nsmap,
+ self.deltas = Element(rrdp_tag_deltas, nsmap = rrdp_nsmap,
to = str(self.serial),
version = rrdp_version,
session_id = session.uuid)
self.deltas.set("from", str(self.serial - 1))
- SubElement(self.deltas, rrdp_xmlns + "delta", serial = str(self.serial)).text = "\n"
+ SubElement(self.deltas, rrdp_tag_delta, serial = str(self.serial)).text = "\n"
return self
def activate(self):
rpki.relaxng.rrdp.assertValid(self.deltas)
self.xml = ElementToString(self.deltas, pretty_print = True)
self.hash = rpki.x509.sha256(self.xml).encode("hex")
- del self.deltas
self.sql_mark_dirty()
def publish(self, client, der, uri, hash):
@@ -458,7 +476,7 @@ class delta_obj(rpki.sql.sql_persistent):
raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri)
logger.debug("Publishing %s", uri)
object_obj.create(client, self, der, uri)
- se = DERSubElement(self.deltas[0], rrdp_xmlns + "publish", der = der, uri = uri)
+ se = DERSubElement(self.deltas[0], rrdp_tag_publish, der = der, uri = uri)
if hash is not None:
se.set("hash", hash)
rpki.relaxng.rrdp.assertValid(self.deltas)
@@ -471,9 +489,38 @@ class delta_obj(rpki.sql.sql_persistent):
raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
logger.debug("Withdrawing %s", uri)
obj.delete(self)
- SubElement(self.deltas[0], rrdp_xmlns + "withdraw", uri = uri, hash = hash).tail = "\n"
+ SubElement(self.deltas[0], rrdp_tag_withdraw, uri = uri, hash = hash).tail = "\n"
rpki.relaxng.rrdp.assertValid(self.deltas)
+ def write_rsync(self):
+ min_path_len = len(self.gctx.publication_base.rstrip("/"))
+ for pdu in self.deltas[0]:
+ assert pdu.tag in (rrdp_tag_publish, rrdp_tag_withdraw)
+ fn = self.gctx.uri_to_filename(pdu.get("uri"))
+ if pdu.tag == rrdp_tag_publish:
+ tn = fn + ".tmp"
+ dn = os.path.dirname(fn)
+ if not os.path.isdir(dn):
+ os.makedirs(dn)
+ with open(tn, "wb") as f:
+ f.write(pdu.text.decode("base64"))
+ os.rename(tn, fn)
+ else:
+ try:
+ os.remove(fn)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ dn = os.path.dirname(fn)
+ while len(dn) > min_path_len:
+ try:
+ os.rmdir(dn)
+ except OSError:
+ break
+ else:
+ dn = os.path.dirname(dn)
+ del self.deltas
+
class object_obj(rpki.sql.sql_persistent):
"""
diff --git a/rpki/publication.py b/rpki/publication.py
index 763fda9d..4c3109b9 100644
--- a/rpki/publication.py
+++ b/rpki/publication.py
@@ -245,3 +245,11 @@ class cms_msg(rpki.x509.XML_CMS_object):
encoding = "us-ascii"
schema = rpki.relaxng.publication
saxify = sax_handler.saxify
+
+class cms_msg_no_sax(cms_msg):
+ """
+ Transition kludge: varient of cms_msg (q.v.) with SAX parsing disabled.
+ If and when we ditch SAX entirely, this will become cms_msg.
+ """
+
+ saxify = None