# $Id$ # # Copyright (C) 2013--2014 Dragon Research Labs ("DRL") # Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC") # Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL, # ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ RPKI publication engine. """ import os import re import uuid import time import logging import argparse import rpki.resource_set import rpki.up_down import rpki.x509 import rpki.sql import rpki.http import rpki.config import rpki.exceptions import rpki.relaxng import rpki.log import rpki.publication import rpki.publication_control import rpki.daemonize from lxml.etree import Element, SubElement, ElementTree, Comment logger = logging.getLogger(__name__) class main(object): """ Main program for pubd. """ def __init__(self): os.environ["TZ"] = "UTC" time.tzset() self.irbe_cms_timestamp = None parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("-c", "--config", help = "override default location of configuration file") parser.add_argument("-f", "--foreground", action = "store_true", help = "do not daemonize") parser.add_argument("--pidfile", help = "override default location of pid file") parser.add_argument("--profile", help = "enable profiling, saving data to PROFILE") rpki.log.argparse_setup(parser) args = parser.parse_args() self.profile = args.profile rpki.log.init("pubd", args) self.cfg = rpki.config.parser(args.config, "pubd") self.cfg.set_global_flags() if not args.foreground: rpki.daemonize.daemon(pidfile = args.pidfile) if self.profile: import cProfile prof = cProfile.Profile() try: prof.runcall(self.main) finally: prof.dump_stats(self.profile) logger.info("Dumped profile data to %s", self.profile) else: self.main() def main(self): if self.profile: logger.info("Running in profile mode with output to %s", self.profile) self.sql = rpki.sql.session(self.cfg) self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) self.pubd_cert = rpki.x509.X509(Auto_update = self.cfg.get("pubd-cert")) self.pubd_key = rpki.x509.RSA( Auto_update = self.cfg.get("pubd-key")) self.pubd_crl = rpki.x509.CRL( Auto_update = self.cfg.get("pubd-crl")) self.http_server_host = self.cfg.get("server-host", "") self.http_server_port = self.cfg.getint("server-port") self.publication_base = self.cfg.get("publication-base", "publication/") self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False) self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h")) self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", "rrdp-publication/") self.session = session_obj.fetch(self) rpki.http.server( host = self.http_server_host, port = self.http_server_port, handlers = (("/control", self.control_handler), ("/client/", self.client_handler))) def control_handler(self, query, path, cb): """ Process one PDU from the IRBE. """ def done(r_msg): self.sql.sweep() cb(code = 200, body = rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert)) try: q_cms = rpki.publication_control.cms_msg(DER = query) q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control") q_msg.serve_top_level(self, done) except (rpki.async.ExitNow, SystemExit): raise except Exception, e: logger.exception("Unhandled exception processing control query, path %r", path) cb(code = 500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) client_url_regexp = re.compile("/client/([-A-Z0-9_/]+)$", re.I) def client_handler(self, query, path, cb): """ Process one PDU from a client. """ def done(r_msg): self.sql.sweep() cb(code = 200, body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) try: match = self.client_url_regexp.search(path) if match is None: raise rpki.exceptions.BadContactURL("Bad path: %s" % path) client_handle = match.group(1) client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,)) if client is None: raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle) q_cms = rpki.publication.cms_msg(DER = query) q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) q_cms.check_replay_sql(client, client.client_handle) q_msg.serve_top_level(self, client, done) except (rpki.async.ExitNow, SystemExit): raise except Exception, e: logger.exception("Unhandled exception processing client query, path %r", path) cb(code = 500, reason = "Could not process PDU: %s" % e) class session_obj(rpki.sql.sql_persistent): """ An RRDP session. """ # We probably need additional columns or an additional table to # handle cleanup of old serial numbers. Not sure quite what these # would look like, other than that the SQL datatypes are probably # BIGINT and DATETIME. Maybe a table to track time at which we # retired a particular serial number, or, to save us the arithmetic, # the corresponding cleanup time? sql_template = rpki.sql.template( "session", "session_id", "uuid") def __repr__(self): return rpki.log.log_repr(self, self.uuid, self.serial) @classmethod def fetch(cls, gctx): """ Fetch the one and only session, creating it if necessary. """ self = cls.sql_fetch(gctx, 1) if self is None: self = cls() self.gctx = gctx self.session_id = 1 self.uuid = uuid.uuid4() self.sql_store() return self @property def objects(self): return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,)) @property def snapshots(self): return snapshot_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,)) @property def current_snapshot(self): return snapshot_obj.sql_fetch_where1(self.gctx, "session_id = %s AND activated IS NOT NULL AND expires IS NULL", (self.session_id,)) def new_snapshot(self): return snapshot_obj.create(self) def add_snapshot(self, new_snapshot): now = rpki.sundial.now() old_snapshot = self.current_snapshot if old_snapshot is not None: old_snapshot.expires = now + self.gctx.rrdp_expiration_interval old_snapshot.sql_store() new_snapshot.activated = now new_snapshot.sql_store() def expire_snapshots(self): for snapshot in snapshot_obj.sql_fetch_where(self.gctx, "session_id = %s AND expires IS NOT NULL AND expires < %s", (self.session_id, rpki.sundial.now())): snapshot.sql_delete() class snapshot_obj(rpki.sql.sql_persistent): """ An RRDP session snapshot. """ sql_template = rpki.sql.template( "snapshot", "snapshot_id", ("activated", rpki.sundial.datetime), ("expires", rpki.sundial.datetime), "session_id") @property @rpki.sql.cache_reference def session(self): return session_obj.sql_fetch(self.gctx, self.session_id) @classmethod def create(cls, session): self = cls() self.gctx = session.gctx self.session_id = session.session_id self.activated = None self.expires = None self.sql_store() return self @property def serial(self): """ I know that using an SQL ID for any other purpose is usually a bad idea, but in this case it has exactly the right properties, and we really do want both the autoincrement behavior and the foreign key behavior to tie to the snapshot serial numbers. So risk it. Well, OK, only almost the right properties. auto-increment probably does not back up if we ROLLBACK, which could leave gaps in the sequence. So may need to rework this, eg, to use a serial field in the session object. Ignore the issue until we have the rest of this working. """ return self.snapshot_id def publish(self, client, obj, uri, hash): if hash is not None: self.withdraw(client, uri, hash) if object_obj.current_object_at_uri(client, self, uri) is not None: raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri) logger.debug("Publishing %s", uri) return object_obj.create(client, self, obj, uri) def withdraw(self, client, uri, hash): obj = object_obj.current_object_at_uri(client, self, uri) if obj is None: raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri) if obj.hash != hash: raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (%s, %s)" % (uri, obj.hash, hash)) logger.debug("Withdrawing %s", uri) obj.delete(self) class object_obj(rpki.sql.sql_persistent): """ A published object. """ sql_template = rpki.sql.template( "object", "object_id", "uri", "hash", "payload", "published_snapshot_id", "withdrawn_snapshot_id", "client_id", "session_id") def __repr__(self): return rpki.log.log_repr(self, self.uri, self.published_snapshot_id, self.withdrawn_snapshot_id) @property @rpki.sql.cache_reference def session(self): return session_obj.sql_fetch(self.gctx, self.session_id) @property @rpki.sql.cache_reference def client(self): return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id) @classmethod def create(cls, client, snapshot, obj, uri): self = cls() self.gctx = snapshot.gctx self.uri = uri self.payload = obj self.hash = rpki.x509.sha256(obj.get_Base64()).encode("hex") logger.debug("Computed hash %s of %r", self.hash, obj) self.published_snapshot_id = snapshot.snapshot_id self.withdrawn_snapshot_id = None self.session_id = snapshot.session_id self.client_id = client.client_id self.sql_mark_dirty() return self def delete(self, snapshot): self.withdrawn_snapshot_id = snapshot.snapshot_id #self.sql_mark_dirty() self.sql_store() @classmethod def current_object_at_uri(cls, client, snapshot, uri): return cls.sql_fetch_where1(client.gctx, "session_id = %s AND client_id = %s AND withdrawn_snapshot_id IS NULL AND uri = %s", (snapshot.session_id, client.client_id, uri))