From 5fee0478fe150cb26c74aad36be68534632973c1 Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Thu, 17 Jul 2014 20:24:57 +0000 Subject: RRDP working, mostly. Doens't clean up old files yet, doesn't use explicit command and rollback yet, a lot of other cleanup to do, but does generate vaguely plausible XML files. svn path=/branches/tk705/; revision=5899 --- rpki/pubd.py | 113 +++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 67 insertions(+), 46 deletions(-) (limited to 'rpki/pubd.py') diff --git a/rpki/pubd.py b/rpki/pubd.py index 7d677f0a..e1c7245d 100644 --- a/rpki/pubd.py +++ b/rpki/pubd.py @@ -25,6 +25,7 @@ import os import re import uuid import time +import socket import logging import argparse import rpki.resource_set @@ -40,24 +41,24 @@ import rpki.publication import rpki.publication_control import rpki.daemonize -from lxml.etree import Element, SubElement, ElementTree, Comment, tostring as ElementToString +from lxml.etree import Element, SubElement, tostring as ElementToString logger = logging.getLogger(__name__) # Temporary, these should come from the schema or something -rrdp_namespace = "{http://www.ripe.net/rpki/rrdp}" -rrdp_version = "1" +rrdp_xmlns = "{http://www.ripe.net/rpki/rrdp}" +rrdp_version = "1" +rrdp_nsmap = { None : rrdp_xmlns[1:-1] } - -def Base64SubElement(elt, name, obj, attrib = None, **kwargs): +def DERSubElement(elt, name, der, attrib = None, **kwargs): """ Convenience wrapper around SubElement for use with Base64 text. """ se = SubElement(elt, name, attrib, **kwargs) - se.text = "\n" + obj.get_Base64() + se.text = rpki.x509.base64_with_linebreaks(der) se.tail = "\n" return se @@ -127,8 +128,7 @@ class main(object): self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False) - if False: - self.rrdp_uri_base = self.cfg.get("rrdp-uri-base") + self.rrdp_uri_base = self.cfg.get("rrdp-uri-base", "http://%s/" % socket.getfqdn()) self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h")) self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", "rrdp-publication/") @@ -217,6 +217,10 @@ class main(object): # delta.activate() self.sql.sweep() + self.session.generate_snapshot() + self.session.write_snapshot() + self.session.write_deltas() + self.session.write_notification() cb(code = 200, body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) except (rpki.async.ExitNow, SystemExit): @@ -277,52 +281,75 @@ class session_obj(rpki.sql.sql_persistent): (self.session_id, rpki.sundial.now())): delta.sql_delete() - def write_rrdp_file(self, fn, xml, overwrite = False): + def write_rrdp_file(self, fn, text, overwrite = False): """ - Save an RRDP XML object to disk. + Save RRDP XML to disk. """ - if not overwrite and os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)): - raise SomeExceptionOrAnother("%s already exists, not regenerating" % fn) - rpki.relaxng.rrdp.assertValid(xml) - tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid()) - if not os.path.isdir(os.path.dirname(tn)): - os.makedirs(os.path.dirname(tn)) - ElementTree(xml).write(tn) - os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn)) + if overwrite or not os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)): + tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid()) + if not os.path.isdir(os.path.dirname(tn)): + os.makedirs(os.path.dirname(tn)) + with open(tn, "w") as f: + f.write(text) + os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn)) def generate_snapshot(self): """ Generate an XML snapshot of this session. """ - xml = Element(rrdp_namespace + "snapshot", version = rrdp_version, session_id = self.uuid, serial = self.serial) + xml = Element(rrdp_xmlns + "snapshot", nsmap = rrdp_nsmap, + version = rrdp_version, + session_id = self.uuid, + serial = str(self.serial)) + xml.text = "\n" for obj in self.objects: - Base64SubElement(xml, rrdp_namespace + "publish", obj, uri = obj.uri) + DERSubElement(xml, rrdp_xmlns + "publish", nsmap = rrdp_nsmap, + der = obj.payload, + uri = obj.uri) + rpki.relaxng.rrdp.assertValid(xml) self.snapshot = ElementToString(xml, pretty_print = True) self.hash = rpki.x509.sha256(self.snapshot).encode("hex") - self.sql_mark_dirty() + self.sql_store() + + def write_snapshot(self): + """ + Write current session snapshot to disk. + """ + + self.write_rrdp_file("%s/snapshot/%s.xml" % (self.uuid, self.serial), self.snapshot) + + def write_deltas(self): + """ + Write any missing deltas to disk. + """ + + for delta in self.deltas: + self.write_rrdp_file(delta.fn, delta.xml) def write_notification(self): """ Write current notification file to disk. """ - xml = Element(rrdp_namespace + "notification", + xml = Element(rrdp_xmlns + "notification", nsmap = rrdp_nsmap, version = rrdp_version, session_id = self.uuid, - serial = self.serial) - SubElement(xml, rrdp_namespace + "snapshot", + serial = str(self.serial)) + SubElement(xml, rrdp_xmlns + "snapshot", nsmap = rrdp_nsmap, uri = "%s/%s/snapshot/%d.xml" % (self.gctx.rrdp_uri_base, self.uuid, self.serial), hash = self.hash) for delta in self.deltas: - SubElement(xml, rrdp_namespace + "delta", - { "from" : str(delta.serial - 1), - "to" : str(delta.serial), - "uri" : delta.uri, - "hash" : delta.hash }) + se = SubElement(xml, rrdp_xmlns + "delta", nsmap = rrdp_nsmap, + to = str(delta.serial), + uri = "%s/%s" % (self.gctx.rrdp_uri_base, delta.fn), + hash = delta.hash) + se.set("from", str(delta.serial - 1)) rpki.relaxng.rrdp.assertValid(xml) - self.write_rrdp_file("%s/notification.xml" % self.uuid, xml, overwrite = True) + self.write_rrdp_file("%s/notification.xml" % self.uuid, + ElementToString(xml, pretty_print = True), + overwrite = True) class delta_obj(rpki.sql.sql_persistent): @@ -344,16 +371,10 @@ class delta_obj(rpki.sql.sql_persistent): def session(self): return session_obj.sql_fetch(self.gctx, self.session_id) - # a) Consider making this __init__() instead, just make session optional. - # - # b) We need some place to store the changes that go into this delta - # while running .publish() and .withdraw(); need not be a field - # that goes in SQL, a list that we only use while constructing - # would be fine. - # - # c) This probably means that we shouldn't even try to put this into - # SQL until we're done building it, which is probably OK. - # + @property + def fn(self): + return "%s/deltas/%s-%s.xml" % (self.session.uuid, self.serial - 1, self.serial) + @classmethod def create(cls, session): self = cls() @@ -365,12 +386,12 @@ class delta_obj(rpki.sql.sql_persistent): self.xml = None self.hash = None self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval - self.deltas = Element(rrdp_namespace + "deltas", + self.deltas = Element(rrdp_xmlns + "deltas", nsmap = rrdp_nsmap, to = str(self.serial), version = rrdp_version, session_id = session.uuid) self.deltas.set("from", str(self.serial - 1)) - SubElement(self.deltas, rrdp_namespace + "delta", serial = str(self.serial)) + SubElement(self.deltas, rrdp_xmlns + "delta", nsmap = rrdp_nsmap, serial = str(self.serial)).text = "\n" return self def activate(self): @@ -387,7 +408,7 @@ class delta_obj(rpki.sql.sql_persistent): raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri) logger.debug("Publishing %s", uri) object_obj.create(client, self, obj, uri) - se = Base64SubElement(self.deltas[0], rrdp_namespace + "publish", obj, uri = uri) + se = DERSubElement(self.deltas[0], rrdp_xmlns + "publish", obj.get_DER(), nsmap = rrdp_nsmap, uri = uri) if hash is not None: se.set("hash", hash) rpki.relaxng.rrdp.assertValid(self.deltas) @@ -400,7 +421,7 @@ class delta_obj(rpki.sql.sql_persistent): raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash)) logger.debug("Withdrawing %s", uri) obj.delete(self) - SubElement(self.deltas[0], rrdp_namespace + "withdraw", uri = uri, hash = hash) + SubElement(self.deltas[0], rrdp_xmlns + "withdraw", nsmap = rrdp_nsmap, uri = uri, hash = hash).tail = "\n" rpki.relaxng.rrdp.assertValid(self.deltas) @@ -436,8 +457,8 @@ class object_obj(rpki.sql.sql_persistent): self = cls() self.gctx = delta.gctx self.uri = uri - self.payload = obj - self.hash = rpki.x509.sha256(obj.get_DER()).encode("hex") + self.payload = obj.get_DER() + self.hash = rpki.x509.sha256(self.payload).encode("hex") logger.debug("Computed hash %s for %r", self.hash, obj) self.session_id = delta.session_id self.client_id = client.client_id -- cgit v1.2.3