aboutsummaryrefslogtreecommitdiff
path: root/rpki/pubd.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r--rpki/pubd.py436
1 files changed, 394 insertions, 42 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py
index 79315a78..c6a2e2d2 100644
--- a/rpki/pubd.py
+++ b/rpki/pubd.py
@@ -23,23 +23,48 @@ RPKI publication engine.
import os
import re
+import uuid
import time
+import socket
import logging
import argparse
+
import rpki.resource_set
import rpki.up_down
import rpki.x509
import rpki.sql
-import rpki.http
import rpki.config
import rpki.exceptions
import rpki.relaxng
import rpki.log
import rpki.publication
+import rpki.publication_control
import rpki.daemonize
+import rpki.http_simple
+
+from lxml.etree import Element, SubElement, tostring as ElementToString
logger = logging.getLogger(__name__)
+
+rrdp_xmlns = rpki.relaxng.rrdp.xmlns
+rrdp_nsmap = rpki.relaxng.rrdp.nsmap
+rrdp_version = "1"
+
+rpki_content_type = "application/x-rpki"
+
+
+def DERSubElement(elt, name, der, attrib = None, **kwargs):
+ """
+ Convenience wrapper around SubElement for use with Base64 text.
+ """
+
+ se = SubElement(elt, name, attrib, **kwargs)
+ se.text = rpki.x509.base64_with_linebreaks(der)
+ se.tail = "\n"
+ return se
+
+
class main(object):
"""
Main program for pubd.
@@ -90,84 +115,411 @@ class main(object):
if self.profile:
logger.info("Running in profile mode with output to %s", self.profile)
- self.sql = rpki.sql.session(self.cfg)
+ self.sql = rpki.sql.session(self.cfg, autocommit = False)
self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
self.pubd_cert = rpki.x509.X509(Auto_update = self.cfg.get("pubd-cert"))
self.pubd_key = rpki.x509.RSA( Auto_update = self.cfg.get("pubd-key"))
+ self.pubd_crl = rpki.x509.CRL( Auto_update = self.cfg.get("pubd-crl"))
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
self.publication_base = self.cfg.get("publication-base", "publication/")
- self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False)
+ self.rrdp_uri_base = self.cfg.get("rrdp-uri-base",
+ "http://%s/rrdp/" % socket.getfqdn())
+ self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h"))
+ self.rrdp_publication_base = self.cfg.get("rrdp-publication-base",
+ "rrdp-publication/")
+
+ self.session = session_obj.fetch(self)
- rpki.http.server(
+ rpki.http_simple.server(
host = self.http_server_host,
port = self.http_server_port,
handlers = (("/control", self.control_handler),
("/client/", self.client_handler)))
- def handler_common(self, query, client, cb, certs, crl = None):
- """
- Common PDU handler code.
- """
- def done(r_msg):
- reply = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, crl)
- self.sql.sweep()
- cb(reply)
+ def rrdp_filename_to_uri(self, fn):
+ return "%s/%s" % (self.rrdp_uri_base.rstrip("/"), fn)
- q_cms = rpki.publication.cms_msg(DER = query)
- q_msg = q_cms.unwrap(certs)
- if client is None:
- self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control")
- else:
- q_cms.check_replay_sql(client, client.client_handle)
- q_msg.serve_top_level(self, client, done)
- def control_handler(self, query, path, cb):
+ def control_handler(self, request, q_der):
"""
Process one PDU from the IRBE.
"""
- def done(body):
- cb(200, body = body)
+ # This is still structured with callbacks as if it were
+ # asynchronous, because a lot of the grunt work is done by code in
+ # rpki.xml_utils. If and when we get around to re-writing the
+ # left-right and publication-control protocols to use lxml.etree
+ # directly, most of the rpki.xml_utils code will go away, but for
+ # the moment it's simplest to preserve the weird calling sequences.
+
+ def done(r_msg):
+ self.sql.commit()
+ request.send_cms_response(rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert))
+ self.sql.commit()
try:
- self.handler_common(query, None, done, (self.bpki_ta, self.irbe_cert))
- except (rpki.async.ExitNow, SystemExit):
- raise
+ q_cms = rpki.publication_control.cms_msg(DER = q_der)
+ q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
+ self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control")
+ q_msg.serve_top_level(self, done)
except Exception, e:
- logger.exception("Unhandled exception processing control query, path %r", path)
- cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+ logger.exception("Unhandled exception processing control query, path %r", request.path)
+ request.send_error(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
+
client_url_regexp = re.compile("/client/([-A-Z0-9_/]+)$", re.I)
- def client_handler(self, query, path, cb):
+ def client_handler(self, request, q_der):
"""
Process one PDU from a client.
"""
- def done(body):
- cb(200, body = body)
-
try:
- match = self.client_url_regexp.search(path)
+ match = self.client_url_regexp.search(request.path)
if match is None:
- raise rpki.exceptions.BadContactURL("Bad path: %s" % path)
+ raise rpki.exceptions.BadContactURL("Bad path: %s" % request.path)
client_handle = match.group(1)
- client = rpki.publication.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,))
+ client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,))
if client is None:
raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle)
- config = rpki.publication.config_elt.fetch(self)
- if config is None or config.bpki_crl is None:
- raise rpki.exceptions.CMSCRLNotSet
- self.handler_common(query, client, done, (self.bpki_ta, client.bpki_cert, client.bpki_glue), config.bpki_crl)
- except (rpki.async.ExitNow, SystemExit):
- raise
+ q_cms = rpki.publication.cms_msg(DER = q_der)
+ q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
+ q_cms.check_replay_sql(client, client.client_handle)
+ if not q_msg.is_query():
+ raise rpki.exceptions.BadQuery("Message type is not query")
+ r_msg = q_msg.__class__.reply()
+ delta = None
+ failed = False
+ for q_pdu in q_msg:
+ try:
+ if isinstance(q_pdu, rpki.publication.list_elt):
+ for obj in client.objects:
+ r_pdu = q_pdu.__class__()
+ r_pdu.tag = q_pdu.tag
+ r_pdu.uri = obj.uri
+ r_pdu.hash = obj.hash
+ r_msg.append(r_pdu)
+ else:
+ if delta is None and not failed:
+ delta = self.session.new_delta()
+ q_pdu.gctx = self
+ q_pdu.client = client
+ q_pdu.client.check_allowed_uri(q_pdu.uri)
+ q_pdu.serve_action(delta)
+ r_pdu = q_pdu.__class__()
+ r_pdu.tag = q_pdu.tag
+ r_pdu.uri = q_pdu.uri
+ r_msg.append(r_pdu)
+ except Exception, e:
+ if not isinstance(e, rpki.exceptions.NotFound):
+ logger.exception("Exception processing PDU %r", q_pdu)
+ r_msg.append(rpki.publication.report_error_elt.from_exception(e, q_pdu.tag))
+ failed = True
+ if delta is not None:
+ delta.sql_delete()
+ self.session.serial -= 1
+ self.session.sql_mark_dirty()
+
+ if delta is not None:
+ assert not failed
+ delta.activate()
+ self.sql.sweep()
+ self.session.generate_snapshot()
+ self.sql.commit()
+
+ # These could be merged, and perhaps should be, among other
+ # reasons because we need to do something about expiring old
+ # deltas and deleting old XML files, and it's probably easier
+ # to avoid race conditions if we do this all in one place.
+
+ self.session.write_deltas()
+ self.session.write_snapshot()
+ self.session.write_notification()
+
+ # Somewhere around here is also where we should finally write
+ # stuff out to rsync store, now that SQL is the publication
+ # database of record. This may require doing the filesystem
+ # updates from the delta, but that should be straightforward.
+
+ request.send_cms_response(rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
+ self.sql.commit()
+
except Exception, e:
- logger.exception("Unhandled exception processing client query, path %r", path)
- cb(500, reason = "Could not process PDU: %s" % e)
+ logger.exception("Unhandled exception processing client query, path %r", request.path)
+ self.sql.rollback()
+ request.send_error(500, "Could not process PDU: %s" % e)
+
+
+ def uri_to_filename(self, uri):
+ """
+ Convert a URI to a local filename.
+ """
+
+ if not uri.startswith("rsync://"):
+ raise rpki.exceptions.BadURISyntax(uri)
+ path = uri.split("/")[4:]
+ path.insert(0, self.publication_base.rstrip("/"))
+ filename = "/".join(path)
+ if "/../" in filename or filename.endswith("/.."):
+ raise rpki.exceptions.BadURISyntax(filename)
+ return filename
+
+
+class session_obj(rpki.sql.sql_persistent):
+ """
+ An RRDP session.
+ """
+
+ sql_template = rpki.sql.template(
+ "session",
+ "session_id",
+ "uuid",
+ "serial",
+ "snapshot",
+ "hash")
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.uuid, self.serial)
+
+ @classmethod
+ def fetch(cls, gctx):
+ """
+ Fetch the one and only session, creating it if necessary.
+ """
+
+ self = cls.sql_fetch(gctx, 1)
+ if self is None:
+ self = cls()
+ self.uuid = str(uuid.uuid4())
+ self.serial = 0
+ self.snapshot = None
+ self.hash = None
+ self.gctx = gctx
+ self.sql_store()
+ return self
+
+ @property
+ def objects(self):
+ return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
+
+ @property
+ def deltas(self):
+ return delta_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
+
+ def new_delta(self):
+ return delta_obj.create(self)
+
+ def expire_deltas(self):
+ for delta in delta_obj.sql_fetch_where(self.gctx,
+ "session_id = %s AND expires IS NOT NULL AND expires < %s",
+ (self.session_id, rpki.sundial.now())):
+ delta.sql_delete()
+
+ def write_rrdp_file(self, fn, text, overwrite = False):
+ """
+ Save RRDP XML to disk.
+ """
+
+ if overwrite or not os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)):
+ tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
+ if not os.path.isdir(os.path.dirname(tn)):
+ os.makedirs(os.path.dirname(tn))
+ with open(tn, "w") as f:
+ f.write(text)
+ os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn))
+
+ def generate_snapshot(self):
+ """
+ Generate an XML snapshot of this session.
+ """
+
+ xml = Element(rrdp_xmlns + "snapshot", nsmap = rrdp_nsmap,
+ version = rrdp_version,
+ session_id = self.uuid,
+ serial = str(self.serial))
+ xml.text = "\n"
+ for obj in self.objects:
+ DERSubElement(xml, rrdp_xmlns + "publish",
+ der = obj.der,
+ uri = obj.uri)
+ rpki.relaxng.rrdp.assertValid(xml)
+ self.snapshot = ElementToString(xml, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.snapshot).encode("hex")
+ self.sql_store()
+
+ @property
+ def snapshot_fn(self):
+ return "%s/snapshot/%s.xml" % (self.uuid, self.serial)
+
+ @property
+ def notification_fn(self):
+ return "updates.xml"
+
+ def write_snapshot(self):
+ """
+ Write current session snapshot to disk.
+ """
+
+ self.write_rrdp_file(self.snapshot_fn, self.snapshot)
+
+ def write_deltas(self):
+ """
+ Write any missing deltas to disk.
+ """
+
+ for delta in self.deltas:
+ self.write_rrdp_file(delta.fn, delta.xml)
+
+ def write_notification(self):
+ """
+ Write current notification file to disk.
+ """
+
+ xml = Element(rrdp_xmlns + "notification", nsmap = rrdp_nsmap,
+ version = rrdp_version,
+ session_id = self.uuid,
+ serial = str(self.serial))
+ SubElement(xml, rrdp_xmlns + "snapshot",
+ uri = self.gctx.rrdp_filename_to_uri(self.snapshot_fn),
+ hash = self.hash)
+ for delta in self.deltas:
+ se = SubElement(xml, rrdp_xmlns + "delta",
+ to = str(delta.serial),
+ uri = self.gctx.rrdp_filename_to_uri(delta.fn),
+ hash = delta.hash)
+ se.set("from", str(delta.serial - 1))
+ rpki.relaxng.rrdp.assertValid(xml)
+ self.write_rrdp_file(self.notification_fn,
+ ElementToString(xml, pretty_print = True),
+ overwrite = True)
+
+
+class delta_obj(rpki.sql.sql_persistent):
+ """
+ An RRDP delta.
+ """
+
+ sql_template = rpki.sql.template(
+ "delta",
+ "delta_id",
+ "session_id",
+ "serial",
+ "xml",
+ "hash",
+ ("expires", rpki.sundial.datetime))
+
+ @property
+ @rpki.sql.cache_reference
+ def session(self):
+ return session_obj.sql_fetch(self.gctx, self.session_id)
+
+ @property
+ def fn(self):
+ return "%s/deltas/%s-%s.xml" % (self.session.uuid, self.serial - 1, self.serial)
+
+ @classmethod
+ def create(cls, session):
+ self = cls()
+ session.serial += 1
+ session.sql_mark_dirty()
+ self.gctx = session.gctx
+ self.session_id = session.session_id
+ self.serial = session.serial
+ self.xml = None
+ self.hash = None
+ self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval
+ self.deltas = Element(rrdp_xmlns + "deltas", nsmap = rrdp_nsmap,
+ to = str(self.serial),
+ version = rrdp_version,
+ session_id = session.uuid)
+ self.deltas.set("from", str(self.serial - 1))
+ SubElement(self.deltas, rrdp_xmlns + "delta", serial = str(self.serial)).text = "\n"
+ return self
+
+ def activate(self):
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+ self.xml = ElementToString(self.deltas, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.xml).encode("hex")
+ del self.deltas
+ self.sql_mark_dirty()
+
+ def publish(self, client, der, uri, hash):
+ obj = object_obj.current_object_at_uri(client, self, uri)
+ if obj is not None and obj.hash == hash:
+ obj.delete(self)
+ elif obj is not None:
+ raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri)
+ logger.debug("Publishing %s", uri)
+ object_obj.create(client, self, der, uri)
+ se = DERSubElement(self.deltas[0], rrdp_xmlns + "publish", der = der, uri = uri)
+ if hash is not None:
+ se.set("hash", hash)
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+
+ def withdraw(self, client, uri, hash):
+ obj = object_obj.current_object_at_uri(client, self, uri)
+ if obj is None:
+ raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri)
+ if obj.hash != hash:
+ raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
+ logger.debug("Withdrawing %s", uri)
+ obj.delete(self)
+ SubElement(self.deltas[0], rrdp_xmlns + "withdraw", uri = uri, hash = hash).tail = "\n"
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+
+
+class object_obj(rpki.sql.sql_persistent):
+ """
+ A published object.
+ """
+
+ sql_template = rpki.sql.template(
+ "object",
+ "object_id",
+ "uri",
+ "der",
+ "hash",
+ "client_id",
+ "session_id")
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.uri)
+
+ @property
+ @rpki.sql.cache_reference
+ def session(self):
+ return session_obj.sql_fetch(self.gctx, self.session_id)
+
+ @property
+ @rpki.sql.cache_reference
+ def client(self):
+ return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id)
+
+ @classmethod
+ def create(cls, client, delta, der, uri):
+ self = cls()
+ self.gctx = delta.gctx
+ self.uri = uri
+ self.der = der
+ self.hash = rpki.x509.sha256(der).encode("hex")
+ logger.debug("Computed hash %s for %s", self.hash, self.uri)
+ self.session_id = delta.session_id
+ self.client_id = client.client_id
+ self.sql_mark_dirty()
+ return self
+
+ def delete(self, delta):
+ self.sql_mark_deleted()
+
+ @classmethod
+ def current_object_at_uri(cls, client, delta, uri):
+ return cls.sql_fetch_where1(client.gctx,
+ "session_id = %s AND client_id = %s AND uri = %s",
+ (delta.session_id, client.client_id, uri))