# $Id$
#
# Copyright (C) 2013--2014 Dragon Research Labs ("DRL")
# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notices and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL,
# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
RPKI publication engine.
"""
import os
import re
import uuid
import time
import logging
import argparse
import rpki.resource_set
import rpki.up_down
import rpki.x509
import rpki.sql
import rpki.http
import rpki.config
import rpki.exceptions
import rpki.relaxng
import rpki.log
import rpki.publication
import rpki.publication_control
import rpki.daemonize
logger = logging.getLogger(__name__)
class main(object):
"""
Main program for pubd.
"""
def __init__(self):
os.environ["TZ"] = "UTC"
time.tzset()
self.irbe_cms_timestamp = None
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("-c", "--config",
help = "override default location of configuration file")
parser.add_argument("-f", "--foreground", action = "store_true",
help = "do not daemonize")
parser.add_argument("--pidfile",
help = "override default location of pid file")
parser.add_argument("--profile",
help = "enable profiling, saving data to PROFILE")
rpki.log.argparse_setup(parser)
args = parser.parse_args()
self.profile = args.profile
rpki.log.init("pubd", args)
self.cfg = rpki.config.parser(args.config, "pubd")
self.cfg.set_global_flags()
if not args.foreground:
rpki.daemonize.daemon(pidfile = args.pidfile)
if self.profile:
import cProfile
prof = cProfile.Profile()
try:
prof.runcall(self.main)
finally:
prof.dump_stats(self.profile)
logger.info("Dumped profile data to %s", self.profile)
else:
self.main()
def main(self):
if self.profile:
logger.info("Running in profile mode with output to %s", self.profile)
self.sql = rpki.sql.session(self.cfg)
self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
self.pubd_cert = rpki.x509.X509(Auto_update = self.cfg.get("pubd-cert"))
self.pubd_key = rpki.x509.RSA( Auto_update = self.cfg.get("pubd-key"))
self.pubd_crl = rpki.x509.CRL( Auto_update = self.cfg.get("pubd-crl"))
self.http_server_host = self.cfg.get("server-host", "")
self.http_server_port = self.cfg.getint("server-port")
self.publication_base = self.cfg.get("publication-base", "publication/")
self.publication_multimodule = self.cfg.getboolean("publication-multimodule", False)
self.session = session_obj.fetch(self)
rpki.http.server(
host = self.http_server_host,
port = self.http_server_port,
handlers = (("/control", self.control_handler),
("/client/", self.client_handler)))
def control_handler(self, query, path, cb):
"""
Process one PDU from the IRBE.
"""
def done(r_msg):
self.sql.sweep()
cb(code = 200,
body = rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert))
try:
q_cms = rpki.publication_control.cms_msg(DER = query)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control")
q_msg.serve_top_level(self, done)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
logger.exception("Unhandled exception processing control query, path %r", path)
cb(code = 500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
client_url_regexp = re.compile("/client/([-A-Z0-9_/]+)$", re.I)
def client_handler(self, query, path, cb):
"""
Process one PDU from a client.
"""
def done(r_msg):
self.sql.sweep()
cb(code = 200,
body = rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
try:
match = self.client_url_regexp.search(path)
if match is None:
raise rpki.exceptions.BadContactURL("Bad path: %s" % path)
client_handle = match.group(1)
client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,))
if client is None:
raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle)
q_cms = rpki.publication.cms_msg(DER = query)
q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
q_cms.check_replay_sql(client, client.client_handle)
q_msg.serve_top_level(self, client, done)
except (rpki.async.ExitNow, SystemExit):
raise
except Exception, e:
logger.exception("Unhandled exception processing client query, path %r", path)
cb(code = 500,
reason = "Could not process PDU: %s" % e)
class session_obj(rpki.sql.sql_persistent):
"""
An RRDP session.
"""
# We probably need additional columns or an additional table to
# handle cleanup of old serial numbers. Not sure quite what these
# would look like, other than that the SQL datatypes are probably
# BIGINT and DATETIME. Maybe a table to track time at which we
# retired a particular serial number, or, to save us the arithmetic,
# the corresponding cleanup time?
sql_template = rpki.sql.template(
"session",
"session_id",
"uuid")
## @var expiration_interval
# How long to wait after retiring a snapshot before purging it from the database.
expiration_interval = rpki.sundial.timedelta(hours = 6)
def __repr__(self):
return rpki.log.log_repr(self, self.uuid, self.serial)
@classmethod
def fetch(cls, gctx):
"""
Fetch the one and only session, creating it if necessary.
"""
self = cls.sql_fetch(gctx, 1)
if self is None:
self = cls()
self.gctx = gctx
self.session_id = 1
self.uuid = uuid.uuid4()
self.sql_store()
return self
@property
def objects(self):
return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
@property
def snapshots(self):
return snapshot_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
@property
def current_snapshot(self):
return snapshot_obj.sql_fetch_where1(self.gctx,
"session_id = %s AND activated IS NOT NULL AND expires IS NULL",
(self.session_id,))
def new_snapshot(self):
return snapshot_obj.create(self)
def add_snapshot(self, new_snapshot):
now = rpki.sundial.now()
old_snapshot = self.current_snapshot
if old_snapshot is not None:
old_snapshot.expires = now + self.expiration_interval
old_snapshot.sql_store()
new_snapshot.activated = now
new_snapshot.sql_store()
def expire_snapshots(self):
for snapshot in snapshot_obj.sql_fetch_where(self.gctx,
"session_id = %s AND expires IS NOT NULL AND expires < %s",
(self.session_id, rpki.sundial.now())):
snapshot.sql_delete()
class snapshot_obj(rpki.sql.sql_persistent):
"""
An RRDP session snapshot.
"""
sql_template = rpki.sql.template(
"snapshot",
"snapshot_id",
("activated", rpki.sundial.datetime),
("expires", rpki.sundial.datetime),
"session_id")
@property
@rpki.sql.cache_reference
def session(self):
return session_obj.sql_fetch(self.gctx, self.session_id)
@classmethod
def create(cls, session):
self = cls()
self.gctx = session.gctx
self.session_id = session.session_id
self.activated = None
self.expires = None
self.sql_store()
return self
@property
def serial(self):
"""
I know that using an SQL ID for any other purpose is usually a bad
idea, but in this case it has exactly the right properties, and we
really do want both the autoincrement behavior and the foreign key
behavior to tie to the snapshot serial numbers. So risk it.
Well, OK, only almost the right properties. auto-increment
probably does not back up if we ROLLBACK, which could leave gaps
in the sequence. So may need to rework this. Ignore for now.
"""
return self.snapshot_id
def publish(self, client, obj, uri):
# Still a bit confused as to what we should do here. The
# overwrite with another model doens't
# really match the IXFR model. Current proposal is an attribute
# on to say that this is an overwrite, haven't
# implemented that yet. Would need to push knowledge of when
# we're overwriting all the way from rpkid code that decides to
# write each kind of object. In most cases it looks like we
# already know, a priori, might be a few corner cases.
# Temporary kludge
if True:
try:
self.withdraw(client, uri)
except rpki.exceptions.NoObjectAtURI:
logger.debug("Withdrew %s", uri)
else:
logger.debug("No prior %s", uri)
logger.debug("Publishing %s", uri)
return object_obj.create(client, self, obj, uri)
def withdraw(self, client, uri):
obj = object_obj.sql_fetch_where1(self.gctx,
"session_id = %s AND client_id = %s AND withdrawn_snapshot_id IS NULL AND uri = %s",
(self.session_id, client.client_id, uri))
if obj is None:
raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri)
logger.debug("Withdrawing %s", uri)
obj.delete(self)
class object_obj(rpki.sql.sql_persistent):
"""
A published object.
"""
sql_template = rpki.sql.template(
"object",
"object_id",
"uri",
"hash",
"payload",
"published_snapshot_id",
"withdrawn_snapshot_id",
"client_id",
"session_id")
def __repr__(self):
return rpki.log.log_repr(self, self.uri, self.published_snapshot_id, self.withdrawn_snapshot_id)
@property
@rpki.sql.cache_reference
def session(self):
return session_obj.sql_fetch(self.gctx, self.session_id)
@property
@rpki.sql.cache_reference
def client(self):
return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id)
@classmethod
def create(cls, client, snapshot, obj, uri):
self = cls()
self.gctx = snapshot.gctx
self.uri = uri
self.payload = obj
self.hash = rpki.x509.sha256(obj.get_Base64())
logger.debug("Computed hash %s of %r", self.hash.encode("hex"), obj)
self.published_snapshot_id = snapshot.snapshot_id
self.withdrawn_snapshot_id = None
self.session_id = snapshot.session_id
self.client_id = client.client_id
self.sql_mark_dirty()
return self
def delete(self, snapshot):
self.withdrawn_snapshot_id = snapshot.snapshot_id
#self.sql_mark_dirty()
self.sql_store()