aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ca/tests/smoketest.py134
-rw-r--r--ca/tests/sql-cleaner.py11
-rw-r--r--ca/tests/yamltest.py16
-rw-r--r--rpki/fields.py4
-rw-r--r--rpki/irdbd.py4
-rw-r--r--rpki/pubd.py530
-rw-r--r--rpki/pubdb/__init__.py20
-rw-r--r--rpki/pubdb/migrations/0001_initial.py120
-rw-r--r--rpki/pubdb/migrations/__init__.py0
-rw-r--r--rpki/pubdb/models.py310
-rw-r--r--rpki/publication.py2
-rw-r--r--rpki/publication_control.py44
12 files changed, 697 insertions, 498 deletions
diff --git a/ca/tests/smoketest.py b/ca/tests/smoketest.py
index bf949a97..d24fc460 100644
--- a/ca/tests/smoketest.py
+++ b/ca/tests/smoketest.py
@@ -220,7 +220,7 @@ def main():
for a in db:
a.setup_bpki_certs()
- setup_publication(pubd_sql)
+ setup_publication(pubd_sql, db.root.irdb_db_name)
setup_rootd(db.root, y.get("rootd", {}), db)
setup_rsyncd()
setup_rcynic()
@@ -1108,18 +1108,6 @@ class allocation(object):
self.cross_certify(self.parent.name + "-SELF")
self.cross_certify(parent_host + "-TA")
- logger.info("Writing leaf YAML for %s", self.name)
- f = open(self.name + ".yaml", "w")
- f.write(yaml_fmt_1 % {
- "parent_name" : self.parent.name,
- "parent_host" : parent_host,
- "my_name" : self.name,
- "http_port" : self.parent.get_rpki_port(),
- "class_name" : 2 if self.parent.is_hosted else 1,
- "sia" : self.sia_base,
- "ski" : ski })
- f.close()
-
def run_cron(self, cb):
"""
Trigger cron run for this engine.
@@ -1236,7 +1224,7 @@ def setup_rsyncd():
f.write(rsyncd_fmt_1 % d)
f.close()
-def setup_publication(pubd_sql):
+def setup_publication(pubd_sql, irdb_db_name):
"""
Set up publication daemon.
"""
@@ -1265,7 +1253,9 @@ def setup_publication(pubd_sql):
pubd_db_name = pubd_db_name,
pubd_db_user = pubd_db_user,
pubd_db_pass = pubd_db_pass,
- pubd_dir = rsyncd_dir)
+ pubd_dir = rsyncd_dir,
+ irdb_db_name = irdb_db_name,
+ irdb_db_pass = irdb_db_pass)
f = open(pubd_name + ".conf", "w")
f.write(pubd_fmt_1 % d)
f.close()
@@ -1459,88 +1449,57 @@ bpki_cert_fmt_6 = ''' && \
-config %(name)s-%(kind)s.conf \
'''
-yaml_fmt_1 = '''---
-version: 1
-posturl: http://localhost:%(http_port)s/up-down/%(parent_name)s/%(my_name)s
-recipient-id: "%(parent_name)s"
-sender-id: "%(my_name)s"
-
-cms-cert-file: %(my_name)s-RPKI.cer
-cms-key-file: %(my_name)s-RPKI.key
-cms-ca-cert-file: %(my_name)s-TA.cer
-cms-crl-file: %(my_name)s-TA.crl
-cms-ca-certs-file:
- - %(my_name)s-TA-%(parent_name)s-SELF.cer
-
-ssl-cert-file: %(my_name)s-RPKI.cer
-ssl-key-file: %(my_name)s-RPKI.key
-ssl-ca-cert-file: %(my_name)s-TA.cer
-ssl-ca-certs-file:
- - %(my_name)s-TA-%(parent_host)s-TA.cer
-
-# We're cheating here by hardwiring the class name
-
-requests:
- list:
- type: list
- issue:
- type: issue
- class: %(class_name)s
- sia:
- - %(sia)s
- cert-request-key-file: %(my_name)s.key
- revoke:
- type: revoke
- class: %(class_name)s
- ski: %(ski)s
-'''
-
conf_fmt_1 = '''\
[irdbd]
-startup-message = This is %(my_name)s irdbd
+startup-message = This is %(my_name)s irdbd
-sql-database = %(irdb_db_name)s
-sql-username = irdb
-sql-password = %(irdb_db_pass)s
-bpki-ta = %(my_name)s-TA.cer
-rpkid-cert = %(my_name)s-RPKI.cer
-irdbd-cert = %(my_name)s-IRDB.cer
-irdbd-key = %(my_name)s-IRDB.key
-http-url = http://localhost:%(irdb_port)d/
-enable_tracebacks = yes
+sql-database = %(irdb_db_name)s
+sql-username = irdb
+sql-password = %(irdb_db_pass)s
+bpki-ta = %(my_name)s-TA.cer
+rpkid-cert = %(my_name)s-RPKI.cer
+irdbd-cert = %(my_name)s-IRDB.cer
+irdbd-key = %(my_name)s-IRDB.key
+http-url = http://localhost:%(irdb_port)d/
+enable_tracebacks = yes
[irbe_cli]
-rpkid-bpki-ta = %(my_name)s-TA.cer
-rpkid-cert = %(my_name)s-RPKI.cer
-rpkid-irbe-cert = %(my_name)s-IRBE.cer
-rpkid-irbe-key = %(my_name)s-IRBE.key
-rpkid-url = http://localhost:%(rpki_port)d/left-right
-enable_tracebacks = yes
+rpkid-bpki-ta = %(my_name)s-TA.cer
+rpkid-cert = %(my_name)s-RPKI.cer
+rpkid-irbe-cert = %(my_name)s-IRBE.cer
+rpkid-irbe-key = %(my_name)s-IRBE.key
+rpkid-url = http://localhost:%(rpki_port)d/left-right
+enable_tracebacks = yes
[rpkid]
-startup-message = This is %(my_name)s rpkid
+startup-message = This is %(my_name)s rpkid
-sql-database = %(rpki_db_name)s
-sql-username = rpki
-sql-password = %(rpki_db_pass)s
+sql-database = %(rpki_db_name)s
+sql-username = rpki
+sql-password = %(rpki_db_pass)s
-bpki-ta = %(my_name)s-TA.cer
-rpkid-key = %(my_name)s-RPKI.key
-rpkid-cert = %(my_name)s-RPKI.cer
-irdb-cert = %(my_name)s-IRDB.cer
-irbe-cert = %(my_name)s-IRBE.cer
+bpki-ta = %(my_name)s-TA.cer
+rpkid-key = %(my_name)s-RPKI.key
+rpkid-cert = %(my_name)s-RPKI.cer
+irdb-cert = %(my_name)s-IRDB.cer
+irbe-cert = %(my_name)s-IRBE.cer
-irdb-url = http://localhost:%(irdb_port)d/
+irdb-url = http://localhost:%(irdb_port)d/
-server-host = localhost
-server-port = %(rpki_port)d
+server-host = localhost
+server-port = %(rpki_port)d
-use-internal-cron = false
-enable_tracebacks = yes
+use-internal-cron = false
+enable_tracebacks = yes
+
+[myrpki]
+start_rpkid = yes
+start_irdbd = yes
+start_pubd = no
'''
rootd_fmt_1 = '''\
@@ -1610,7 +1569,7 @@ certificatePolicies = critical, @rpki_certificate_policy
[rpki_certificate_policy]
-policyIdentifier = 1.3.6.1.5.5.7.14.2
+policyIdentifier = 1.3.6.1.5.5.7.14.2
'''
rootd_fmt_2 = '''\
@@ -1674,6 +1633,17 @@ server-host = localhost
server-port = %(pubd_port)d
publication-base = %(pubd_dir)s
enable_tracebacks = yes
+
+[irdbd]
+
+sql-database = %(irdb_db_name)s
+sql-username = irdb
+sql-password = %(irdb_db_pass)s
+
+[myrpki]
+start_rpkid = no
+start_irdbd = no
+start_pubd = yes
'''
main()
diff --git a/ca/tests/sql-cleaner.py b/ca/tests/sql-cleaner.py
index 0f0b55b1..369a68ea 100644
--- a/ca/tests/sql-cleaner.py
+++ b/ca/tests/sql-cleaner.py
@@ -29,11 +29,14 @@ for name in ("rpkid", "irdbd", "pubd"):
username = cfg.get("%s_sql_username" % name, name[:4])
password = cfg.get("%s_sql_password" % name, "fnord")
+ # All of this schema creation stuff will go away once we're on Django ORM.
+ # For the moment, a quick kludge for testing.
schema = []
- for line in getattr(rpki.sql_schemas, name, "").splitlines():
- schema.extend(line.partition("--")[0].split())
- schema = " ".join(schema).strip(";").split(";")
- schema = [statement.strip() for statement in schema if statement and "DROP TABLE" not in statement]
+ if name == "rpkid":
+ for line in getattr(rpki.sql_schemas, name, "").splitlines():
+ schema.extend(line.partition("--")[0].split())
+ schema = " ".join(schema).strip(";").split(";")
+ schema = [statement.strip() for statement in schema if statement and "DROP TABLE" not in statement]
db = MySQLdb.connect(user = username, passwd = password)
cur = db.cursor()
diff --git a/ca/tests/yamltest.py b/ca/tests/yamltest.py
index 6d1b0cf4..6ef63382 100644
--- a/ca/tests/yamltest.py
+++ b/ca/tests/yamltest.py
@@ -564,23 +564,23 @@ class allocation(object):
RPKI_CONF = self.path("rpki.conf"))
subprocess.check_call(cmd, cwd = self.host.path(), env = env)
- def syncdb(self, run_gui):
+ def syncdb(self):
"""
Run whatever Django ORM commands are necessary to set up the
database this week.
-
- This may end up moving back into rpkic as an explicit command, but
- for the moment I'm assuming that production use handle this via
- rpki-sql-setup and that we therefore must do it ourselves for
- testing. We'll see.
"""
+ verbosity = 1
+
+ if verbosity > 0:
+ print "Running Django setup for", self.name
+
if not os.fork():
os.environ.update(RPKI_CONF = self.path("rpki.conf"),
RPKI_GUI_ENABLE = "yes")
logging.getLogger().setLevel(logging.WARNING)
import django.core.management
- django.core.management.call_command("syncdb", migrate = True, verbosity = 0,
+ django.core.management.call_command("syncdb", migrate = True, verbosity = verbosity,
load_initial_data = False, interactive = False)
from django.contrib.auth.models import User
User.objects.create_superuser("root", "root@example.org", "fnord")
@@ -790,7 +790,7 @@ try:
d.dump_rsyncd()
if d.is_root:
os.makedirs(d.path("publication.root"))
- d.syncdb(args.run_gui)
+ d.syncdb()
d.run_rpkic("initialize_server_bpki")
print
diff --git a/rpki/fields.py b/rpki/fields.py
index 3d859aaa..1ca6c893 100644
--- a/rpki/fields.py
+++ b/rpki/fields.py
@@ -24,12 +24,16 @@ the only sane text representation would just be the Base64 encoding of
the DER and thus would add no value.
"""
+import logging
+
from django.db import models
from south.modelsinspector import add_introspection_rules
import rpki.x509
import rpki.sundial
+logger = logging.getLogger(__name__)
+
class EnumField(models.PositiveSmallIntegerField):
"""
diff --git a/rpki/irdbd.py b/rpki/irdbd.py
index e51f2e75..6e7e1632 100644
--- a/rpki/irdbd.py
+++ b/rpki/irdbd.py
@@ -145,7 +145,8 @@ class main(object):
global rpki # pylint: disable=W0602
- os.environ["TZ"] = "UTC"
+ os.environ.update(TZ = "UTC",
+ DJANGO_SETTINGS_MODULE = "rpki.django_settings")
time.tzset()
parser = argparse.ArgumentParser(description = __doc__)
@@ -188,7 +189,6 @@ class main(object):
# Now that we know which configuration file to use, it's OK to
# load modules that require Django's settings module.
- os.environ.update(DJANGO_SETTINGS_MODULE = "rpki.django_settings")
global rpki # pylint: disable=W0602
import rpki.irdb # pylint: disable=W0621
diff --git a/rpki/pubd.py b/rpki/pubd.py
index 6e50e9a6..3e7d4506 100644
--- a/rpki/pubd.py
+++ b/rpki/pubd.py
@@ -30,46 +30,20 @@ import logging
import argparse
import rpki.resource_set
-import rpki.up_down
import rpki.x509
-import rpki.sql
import rpki.config
import rpki.exceptions
-import rpki.relaxng
import rpki.log
import rpki.publication
import rpki.publication_control
import rpki.daemonize
import rpki.http_simple
-from lxml.etree import Element, SubElement, tostring as ElementToString
+from lxml.etree import Element, SubElement
logger = logging.getLogger(__name__)
-rrdp_xmlns = rpki.relaxng.rrdp.xmlns
-rrdp_nsmap = rpki.relaxng.rrdp.nsmap
-rrdp_version = "1"
-
-rrdp_tag_delta = rrdp_xmlns + "delta"
-rrdp_tag_deltas = rrdp_xmlns + "deltas"
-rrdp_tag_notification = rrdp_xmlns + "notification"
-rrdp_tag_publish = rrdp_xmlns + "publish"
-rrdp_tag_snapshot = rrdp_xmlns + "snapshot"
-rrdp_tag_withdraw = rrdp_xmlns + "withdraw"
-
-
-def DERSubElement(elt, name, der, attrib = None, **kwargs):
- """
- Convenience wrapper around SubElement for use with Base64 text.
- """
-
- se = SubElement(elt, name, attrib, **kwargs)
- se.text = rpki.x509.base64_with_linebreaks(der)
- se.tail = "\n"
- return se
-
-
class main(object):
"""
Main program for pubd.
@@ -77,7 +51,8 @@ class main(object):
def __init__(self):
- os.environ["TZ"] = "UTC"
+ os.environ.update(TZ = "UTC",
+ DJANGO_SETTINGS_MODULE = "rpki.django_settings")
time.tzset()
self.irbe_cms_timestamp = None
@@ -120,7 +95,8 @@ class main(object):
if self.profile:
logger.info("Running in profile mode with output to %s", self.profile)
- self.sql = rpki.sql.session(self.cfg, autocommit = False)
+ global rpki
+ import rpki.pubdb
self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta"))
self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert"))
@@ -139,7 +115,10 @@ class main(object):
self.rrdp_publication_base = self.cfg.get("rrdp-publication-base",
"rrdp-publication/")
- self.session = session_obj.fetch(self)
+ try:
+ self.session = rpki.pubdb.Session.objects.get()
+ except rpki.pubdb.Session.DoesNotExist:
+ self.session = rpki.pubdb.Session.objects.create(uuid = str(uuid.uuid4()), serial = 0)
rpki.http_simple.server(
host = self.http_server_host,
@@ -148,32 +127,87 @@ class main(object):
("/client/", self.client_handler)))
- def rrdp_filename_to_uri(self, fn):
- return "%s/%s" % (self.rrdp_uri_base.rstrip("/"), fn)
-
-
def control_handler(self, request, q_der):
"""
Process one PDU from the IRBE.
"""
- # This is still structured with callbacks as if it were
- # asynchronous, because a lot of the grunt work is done by code in
- # rpki.xml_utils. If and when we get around to re-writing the
- # left-right and publication-control protocols to use lxml.etree
- # directly, most of the rpki.xml_utils code will go away, but for
- # the moment it's simplest to preserve the weird calling sequences.
-
- def done(r_msg):
- self.sql.commit()
- request.send_cms_response(rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert))
- self.sql.commit()
+ from django.db import transaction
try:
- q_cms = rpki.publication_control.cms_msg(DER = q_der)
+ q_cms = rpki.publication_control.cms_msg_no_sax(DER = q_der)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control")
- q_msg.serve_top_level(self, done)
+ if q_msg.get("type") != "query":
+ raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type"))
+ r_msg = Element(rpki.publication_control.tag_msg, nsmap = rpki.publication_control.nsmap,
+ type = "reply", version = rpki.publication_control.version)
+
+ try:
+ with transaction.atomic(using = "pubdb"):
+
+ for q_pdu in q_msg:
+ if q_pdu.tag != rpki.publication_control.tag_client:
+ raise rpki.exceptions.BadQuery("PDU is %s, expected client" % q_pdu.tag)
+ client_handle = q_pdu.get("client_handle")
+ action = q_pdu.get("action")
+ if client_handle is None:
+ logger.info("Control %s request", action)
+ else:
+ logger.info("Control %s request for %s", action, client_handle)
+
+ if action in ("get", "list"):
+ if action == "get":
+ clients = rpki.pubdb.Client.objects.get(client_handle = client_handle),
+ else:
+ clients = rpki.pubdb.Client.objects.all()
+ for client in clients:
+ r_pdu = SubElement(r_msg, q_pdu.tag, action = action,
+ client_handle = client.client_handle, base_uri = client.base_uri)
+ if q_pdu.get("tag"):
+ r_pdu.set("tag", q_pdu.get("tag"))
+ SubElement(r_pdu, rpki.publication_control.tag_bpki_cert).text = client.bpki_cert.get_Base64()
+ if client.bpki_glue is not None:
+ SubElement(r_pdu, rpki.publication_control.tag_bpki_glue).text = client.bpki_glue.get_Base64()
+
+ if action in ("create", "set"):
+ if action == "create":
+ client = rpki.pubdb.Client(client_handle = client_handle)
+ else:
+ client = rpki.pubdb.Client.objects.get(client_handle = client_handle)
+ if q_pdu.get("base_uri"):
+ client.base_uri = q_pdu.get("base_uri")
+ bpki_cert = q_pdu.find(rpki.publication_control.tag_bpki_cert)
+ if bpki_cert is not None:
+ client.bpki_cert = bpki_cert.text.decode("base64")
+ bpki_glue = q_pdu.find(rpki.publication_control.tag_bpki_glue)
+ if bpki_glue is not None:
+ client.bpki_glue = bpki_glue.text.decode("base64")
+ if q_pdu.get("clear_replay_protection") == "yes":
+ client.last_cms_timestamp = None
+ client.save()
+ logger.debug("Stored client_handle %s, base_uri %s, bpki_cert %r, bpki_glue %r, last_cms_timestamp %s",
+ client.client_handle, client.base_uri, client.bpki_cert, client.bpki_glue,
+ client.last_cms_timestamp)
+ r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle)
+ if q_pdu.get("tag"):
+ r_pdu.set("tag", q_pdu.get("tag"))
+
+ if action == "destroy":
+ rpki.pubdb.Client.objects.filter(client_handle = client_handle).delete()
+ r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle)
+ if q_pdu.get("tag"):
+ r_pdu.set("tag", q_pdu.get("tag"))
+
+ except Exception, e:
+ logger.exception("Exception processing PDU %r", q_pdu)
+ r_pdu = SubElement(r_msg, rpki.publication_control.tag_report_error, error_code = e.__class__.__name__)
+ r_pdu.text = str(e)
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
+
+ request.send_cms_response(rpki.publication_control.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert))
+
except Exception, e:
logger.exception("Unhandled exception processing control query, path %r", request.path)
request.send_error(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e))
@@ -186,375 +220,71 @@ class main(object):
Process one PDU from a client.
"""
+ from django.db import transaction
+
try:
match = self.client_url_regexp.search(request.path)
if match is None:
raise rpki.exceptions.BadContactURL("Bad path: %s" % request.path)
- client_handle = match.group(1)
- client = rpki.publication_control.client_elt.sql_fetch_where1(self, "client_handle = %s", (client_handle,))
- if client is None:
- raise rpki.exceptions.ClientNotFound("Could not find client %s" % client_handle)
+ client = rpki.pubdb.Client.objects.get(client_handle = match.group(1))
q_cms = rpki.publication.cms_msg_no_sax(DER = q_der)
q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue))
- q_cms.check_replay_sql(client, client.client_handle)
- self.sql.commit() # commit the replay timestamp
+ client.last_cms_timestamp = q_cms.check_replay(client.last_cms_timestamp, client.client_handle)
+ client.save()
if q_msg.get("type") != "query":
raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type"))
r_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap,
type = "reply", version = rpki.publication.version)
delta = None
- failed = False
- for q_pdu in q_msg:
- try:
- if q_pdu.tag == rpki.publication.tag_list:
- for obj in client.objects:
- r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash)
- if q_pdu.get("tag") is not None:
- r_pdu.set("tag", q_pdu.get("tag"))
- else:
- assert q_pdu.tag in (rpki.publication.tag_publish, rpki.publication.tag_withdraw)
- if delta is None:
- delta = self.session.new_delta()
- client.check_allowed_uri(q_pdu.get("uri"))
- if q_pdu.tag == rpki.publication.tag_publish:
- der = q_pdu.text.decode("base64")
- logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri")))
- delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash"))
+ try:
+ with transaction.atomic(using = "pubdb"):
+ for q_pdu in q_msg:
+ if q_pdu.get("uri"):
+ logger.info("Client %s request for %s", q_pdu.tag, q_pdu.get("uri"))
else:
- logger.info("Withdrawing %s", q_pdu.get("uri"))
- delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash"))
- r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri"))
- if q_pdu.get("tag") is not None:
- r_pdu.set("tag", q_pdu.get("tag"))
- except Exception, e:
- logger.exception("Exception processing PDU %r", q_pdu)
- r_pdu = SubElement(r_msg, rpki.publication.tag_report_error, error_code = e.__class__.__name__)
- r_pdu.text = str(e)
- if q_pdu.get("tag") is not None:
- r_pdu.set("tag", q_pdu.get("tag"))
- failed = True
-
- if failed:
- self.sql.rollback()
-
- elif delta is not None:
- delta.activate()
- self.sql.sweep()
- self.session.generate_snapshot()
- self.session.expire_deltas()
- self.sql.commit()
- self.session.synchronize_rrdp_files()
- delta.update_rsync_files()
-
- request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
-
-
- except Exception, e:
- logger.exception("Unhandled exception processing client query, path %r", request.path)
- self.sql.rollback()
- request.send_error(500, "Could not process PDU: %s" % e)
+ logger.info("Client %s request", q_pdu.tag)
+ if q_pdu.tag == rpki.publication.tag_list:
+ for obj in client.publishedobject_set.all():
+ r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash)
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
- def uri_to_filename(self, uri):
- """
- Convert a URI to a local filename.
- """
-
- if not uri.startswith("rsync://"):
- raise rpki.exceptions.BadURISyntax(uri)
- path = uri.split("/")[4:]
- path.insert(0, self.publication_base.rstrip("/"))
- filename = "/".join(path)
- if "/../" in filename or filename.endswith("/.."):
- raise rpki.exceptions.BadURISyntax(filename)
- return filename
-
-
-class session_obj(rpki.sql.sql_persistent):
- """
- An RRDP session.
- """
-
- sql_template = rpki.sql.template(
- "session",
- "session_id",
- "uuid",
- "serial",
- "snapshot",
- "hash")
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.uuid, self.serial)
-
- @classmethod
- def fetch(cls, gctx):
- """
- Fetch the one and only session, creating it if necessary.
- """
-
- self = cls.sql_fetch(gctx, 1)
- if self is None:
- self = cls()
- self.uuid = str(uuid.uuid4())
- self.serial = 0
- self.snapshot = None
- self.hash = None
- self.gctx = gctx
- self.sql_store()
- return self
-
- @property
- def objects(self):
- return object_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
-
- @property
- def deltas(self):
- return delta_obj.sql_fetch_where(self.gctx, "session_id = %s", (self.session_id,))
-
- def new_delta(self):
- return delta_obj.create(self)
-
- def expire_deltas(self):
- for delta in delta_obj.sql_fetch_where(self.gctx,
- "session_id = %s AND expires IS NOT NULL AND expires < %s",
- (self.session_id, rpki.sundial.now())):
- delta.sql_mark_deleted()
-
- def write_rrdp_file(self, fn, text, overwrite = False):
- """
- Save RRDP XML to disk.
- """
-
- if overwrite or not os.path.exists(os.path.join(self.gctx.rrdp_publication_base, fn)):
- tn = os.path.join(self.gctx.rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
- if not os.path.isdir(os.path.dirname(tn)):
- os.makedirs(os.path.dirname(tn))
- with open(tn, "w") as f:
- f.write(text)
- os.rename(tn, os.path.join(self.gctx.rrdp_publication_base, fn))
-
- def generate_snapshot(self):
- """
- Generate an XML snapshot of this session.
- """
+ else:
+ assert q_pdu.tag in (rpki.publication.tag_publish, rpki.publication.tag_withdraw)
+ if delta is None:
+ delta = self.session.new_delta(rpki.sundial.now() + self.rrdp_expiration_interval)
+ client.check_allowed_uri(q_pdu.get("uri"))
+ if q_pdu.tag == rpki.publication.tag_publish:
+ der = q_pdu.text.decode("base64")
+ logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri")))
+ delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash"))
+ else:
+ logger.info("Withdrawing %s", q_pdu.get("uri"))
+ delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash"))
+ r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri"))
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
- xml = Element(rrdp_tag_snapshot, nsmap = rrdp_nsmap,
- version = rrdp_version,
- session_id = self.uuid,
- serial = str(self.serial))
- xml.text = "\n"
- for obj in self.objects:
- DERSubElement(xml, rrdp_tag_publish,
- der = obj.der,
- uri = obj.uri)
- rpki.relaxng.rrdp.assertValid(xml)
- self.snapshot = ElementToString(xml, pretty_print = True)
- self.hash = rpki.x509.sha256(self.snapshot).encode("hex")
- self.sql_store()
-
- @property
- def snapshot_fn(self):
- return "%s/snapshot/%s.xml" % (self.uuid, self.serial)
-
- @property
- def notification_fn(self):
- return "updates.xml"
-
- def synchronize_rrdp_files(self):
- """
- Write current RRDP files to disk, clean up old files and directories.
- """
+ if delta is not None:
+ delta.activate()
+ self.session.generate_snapshot()
+ self.session.expire_deltas()
- current_filenames = set()
-
- for delta in self.deltas:
- self.write_rrdp_file(delta.fn, delta.xml)
- current_filenames.add(delta.fn)
-
- self.write_rrdp_file(self.snapshot_fn, self.snapshot)
- current_filenames.add(self.snapshot_fn)
-
- xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap,
- version = rrdp_version,
- session_id = self.uuid,
- serial = str(self.serial))
- SubElement(xml, rrdp_tag_snapshot,
- uri = self.gctx.rrdp_filename_to_uri(self.snapshot_fn),
- hash = self.hash)
- for delta in self.deltas:
- se = SubElement(xml, rrdp_tag_delta,
- to = str(delta.serial),
- uri = self.gctx.rrdp_filename_to_uri(delta.fn),
- hash = delta.hash)
- se.set("from", str(delta.serial - 1))
- rpki.relaxng.rrdp.assertValid(xml)
- self.write_rrdp_file(self.notification_fn,
- ElementToString(xml, pretty_print = True),
- overwrite = True)
- current_filenames.add(self.notification_fn)
-
- for root, dirs, files in os.walk(self.gctx.rrdp_publication_base, topdown = False):
- for fn in files:
- fn = os.path.join(root, fn)
- if fn[len(self.gctx.rrdp_publication_base):].lstrip("/") not in current_filenames:
- os.remove(fn)
- for dn in dirs:
- try:
- os.rmdir(os.path.join(root, dn))
- except OSError:
- pass
-
-
-class delta_obj(rpki.sql.sql_persistent):
- """
- An RRDP delta.
- """
+ except Exception, e:
+ logger.exception("Exception processing PDU %r", q_pdu)
+ r_pdu = SubElement(r_msg, rpki.publication.tag_report_error, error_code = e.__class__.__name__)
+ r_pdu.text = str(e)
+ if q_pdu.get("tag") is not None:
+ r_pdu.set("tag", q_pdu.get("tag"))
- sql_template = rpki.sql.template(
- "delta",
- "delta_id",
- "session_id",
- "serial",
- "xml",
- "hash",
- ("expires", rpki.sundial.datetime))
-
- @property
- @rpki.sql.cache_reference
- def session(self):
- return session_obj.sql_fetch(self.gctx, self.session_id)
-
- @property
- def fn(self):
- return "%s/deltas/%s-%s.xml" % (self.session.uuid, self.serial - 1, self.serial)
-
- @classmethod
- def create(cls, session):
- self = cls()
- self.gctx = session.gctx
- self.session_id = session.session_id
- self.serial = session.serial + 1
- self.xml = None
- self.hash = None
- self.expires = rpki.sundial.now() + self.gctx.rrdp_expiration_interval
- self.deltas = Element(rrdp_tag_deltas, nsmap = rrdp_nsmap,
- to = str(self.serial),
- version = rrdp_version,
- session_id = session.uuid)
- self.deltas.set("from", str(self.serial - 1))
- SubElement(self.deltas, rrdp_tag_delta, serial = str(self.serial)).text = "\n"
- return self
-
- def activate(self):
- rpki.relaxng.rrdp.assertValid(self.deltas)
- self.xml = ElementToString(self.deltas, pretty_print = True)
- self.hash = rpki.x509.sha256(self.xml).encode("hex")
- self.sql_mark_dirty()
- self.session.serial += 1
- self.session.sql_mark_dirty()
-
- def publish(self, client, der, uri, hash):
- obj = object_obj.current_object_at_uri(client, self, uri)
- if obj is not None:
- if obj.hash == hash:
- obj.delete(self)
- elif hash is None:
- raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri)
else:
- raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
- logger.debug("Publishing %s", uri)
- object_obj.create(client, self, der, uri)
- se = DERSubElement(self.deltas[0], rrdp_tag_publish, der = der, uri = uri)
- if hash is not None:
- se.set("hash", hash)
- rpki.relaxng.rrdp.assertValid(self.deltas)
-
- def withdraw(self, client, uri, hash):
- obj = object_obj.current_object_at_uri(client, self, uri)
- if obj is None:
- raise rpki.exceptions.NoObjectAtURI("No object published at %s" % uri)
- if obj.hash != hash:
- raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
- logger.debug("Withdrawing %s", uri)
- obj.delete(self)
- SubElement(self.deltas[0], rrdp_tag_withdraw, uri = uri, hash = hash).tail = "\n"
- rpki.relaxng.rrdp.assertValid(self.deltas)
-
- def update_rsync_files(self):
- min_path_len = len(self.gctx.publication_base.rstrip("/"))
- for pdu in self.deltas[0]:
- assert pdu.tag in (rrdp_tag_publish, rrdp_tag_withdraw)
- fn = self.gctx.uri_to_filename(pdu.get("uri"))
- if pdu.tag == rrdp_tag_publish:
- tn = fn + ".tmp"
- dn = os.path.dirname(fn)
- if not os.path.isdir(dn):
- os.makedirs(dn)
- with open(tn, "wb") as f:
- f.write(pdu.text.decode("base64"))
- os.rename(tn, fn)
- else:
- try:
- os.remove(fn)
- except OSError, e:
- if e.errno != errno.ENOENT:
- raise
- dn = os.path.dirname(fn)
- while len(dn) > min_path_len:
- try:
- os.rmdir(dn)
- except OSError:
- break
- else:
- dn = os.path.dirname(dn)
- del self.deltas
-
-
-class object_obj(rpki.sql.sql_persistent):
- """
- A published object.
- """
+ if delta is not None:
+ self.session.synchronize_rrdp_files(self.rrdp_publication_base, self.rrdp_uri_base)
+ delta.update_rsync_files(self.publication_base)
+
+ request.send_cms_response(rpki.publication.cms_msg_no_sax().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl))
- sql_template = rpki.sql.template(
- "object",
- "object_id",
- "uri",
- "der",
- "hash",
- "client_id",
- "session_id")
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.uri)
-
- @property
- @rpki.sql.cache_reference
- def session(self):
- return session_obj.sql_fetch(self.gctx, self.session_id)
-
- @property
- @rpki.sql.cache_reference
- def client(self):
- return rpki.publication_control.client_elt.sql_fetch(self.gctx, self.client_id)
-
- @classmethod
- def create(cls, client, delta, der, uri):
- self = cls()
- self.gctx = delta.gctx
- self.uri = uri
- self.der = der
- self.hash = rpki.x509.sha256(der).encode("hex")
- logger.debug("Computed hash %s for %s", self.hash, self.uri)
- self.session_id = delta.session_id
- self.client_id = client.client_id
- self.sql_mark_dirty()
- return self
-
- def delete(self, delta):
- self.sql_mark_deleted()
-
- @classmethod
- def current_object_at_uri(cls, client, delta, uri):
- return cls.sql_fetch_where1(client.gctx,
- "session_id = %s AND client_id = %s AND uri = %s",
- (delta.session_id, client.client_id, uri))
+ except Exception, e:
+ logger.exception("Unhandled exception processing client query, path %r", request.path)
+ request.send_error(500, "Could not process PDU: %s" % e)
diff --git a/rpki/pubdb/__init__.py b/rpki/pubdb/__init__.py
index 5e25c7e3..2c83051f 100644
--- a/rpki/pubdb/__init__.py
+++ b/rpki/pubdb/__init__.py
@@ -1,3 +1,21 @@
# $Id$
#
-# Placeholder for pubdb Django models not yet written.
+# Copyright (C) 2014 Dragon Research Labs ("DRL")
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND DRL DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL DRL BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+"""
+Package for Django ORM models relating to pubd.
+"""
+
+from rpki.pubdb.models import *
diff --git a/rpki/pubdb/migrations/0001_initial.py b/rpki/pubdb/migrations/0001_initial.py
new file mode 100644
index 00000000..c796d020
--- /dev/null
+++ b/rpki/pubdb/migrations/0001_initial.py
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+from south.utils import datetime_utils as datetime
+from south.db import dbs
+from south.v2 import SchemaMigration
+from django.db import models
+
+db = dbs["pubdb"]
+
+class Migration(SchemaMigration):
+
+ def forwards(self, orm):
+ # Adding model 'Client'
+ db.create_table(u'pubdb_client', (
+ (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('client_handle', self.gf('django.db.models.fields.CharField')(unique=True, max_length=255)),
+ ('base_uri', self.gf('django.db.models.fields.TextField')()),
+ ('bpki_cert', self.gf('rpki.fields.BlobField')(default=None, blank=True)),
+ ('bpki_glue', self.gf('rpki.fields.BlobField')(default=None, null=True, blank=True)),
+ ('last_cms_timestamp', self.gf('rpki.fields.SundialField')(null=True, blank=True)),
+ ))
+ db.send_create_signal(u'pubdb', ['Client'])
+
+ # Adding model 'Session'
+ db.create_table(u'pubdb_session', (
+ (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('uuid', self.gf('django.db.models.fields.CharField')(unique=True, max_length=36)),
+ ('serial', self.gf('django.db.models.fields.BigIntegerField')()),
+ ('snapshot', self.gf('django.db.models.fields.TextField')(blank=True)),
+ ('hash', self.gf('django.db.models.fields.CharField')(max_length=64, blank=True)),
+ ))
+ db.send_create_signal(u'pubdb', ['Session'])
+
+ # Adding model 'Delta'
+ db.create_table(u'pubdb_delta', (
+ (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('serial', self.gf('django.db.models.fields.BigIntegerField')()),
+ ('xml', self.gf('django.db.models.fields.TextField')()),
+ ('hash', self.gf('django.db.models.fields.CharField')(max_length=64)),
+ ('expires', self.gf('rpki.fields.SundialField')()),
+ ('session', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['pubdb.Session'])),
+ ))
+ db.send_create_signal(u'pubdb', ['Delta'])
+
+ # Adding model 'PublishedObject'
+ db.create_table(u'pubdb_publishedobject', (
+ (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
+ ('uri', self.gf('django.db.models.fields.CharField')(max_length=255)),
+ ('der', self.gf('rpki.fields.BlobField')(default=None, blank=True)),
+ ('hash', self.gf('django.db.models.fields.CharField')(max_length=64)),
+ ('client', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['pubdb.Client'])),
+ ('session', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['pubdb.Session'])),
+ ))
+ db.send_create_signal(u'pubdb', ['PublishedObject'])
+
+ # Adding unique constraint on 'PublishedObject', fields ['session', 'hash']
+ db.create_unique(u'pubdb_publishedobject', ['session_id', 'hash'])
+
+ # Adding unique constraint on 'PublishedObject', fields ['session', 'uri']
+ db.create_unique(u'pubdb_publishedobject', ['session_id', 'uri'])
+
+
+ def backwards(self, orm):
+ # Removing unique constraint on 'PublishedObject', fields ['session', 'uri']
+ db.delete_unique(u'pubdb_publishedobject', ['session_id', 'uri'])
+
+ # Removing unique constraint on 'PublishedObject', fields ['session', 'hash']
+ db.delete_unique(u'pubdb_publishedobject', ['session_id', 'hash'])
+
+ # Deleting model 'Client'
+ db.delete_table(u'pubdb_client')
+
+ # Deleting model 'Session'
+ db.delete_table(u'pubdb_session')
+
+ # Deleting model 'Delta'
+ db.delete_table(u'pubdb_delta')
+
+ # Deleting model 'PublishedObject'
+ db.delete_table(u'pubdb_publishedobject')
+
+
+ models = {
+ u'pubdb.client': {
+ 'Meta': {'object_name': 'Client'},
+ 'base_uri': ('django.db.models.fields.TextField', [], {}),
+ 'bpki_cert': ('rpki.fields.BlobField', [], {'default': 'None', 'blank': 'True'}),
+ 'bpki_glue': ('rpki.fields.BlobField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
+ 'client_handle': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}),
+ u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'last_cms_timestamp': ('rpki.fields.SundialField', [], {'null': 'True', 'blank': 'True'})
+ },
+ u'pubdb.delta': {
+ 'Meta': {'object_name': 'Delta'},
+ 'expires': ('rpki.fields.SundialField', [], {}),
+ 'hash': ('django.db.models.fields.CharField', [], {'max_length': '64'}),
+ u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'serial': ('django.db.models.fields.BigIntegerField', [], {}),
+ 'session': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['pubdb.Session']"}),
+ 'xml': ('django.db.models.fields.TextField', [], {})
+ },
+ u'pubdb.publishedobject': {
+ 'Meta': {'unique_together': "((u'session', u'hash'), (u'session', u'uri'))", 'object_name': 'PublishedObject'},
+ 'client': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['pubdb.Client']"}),
+ 'der': ('rpki.fields.BlobField', [], {'default': 'None', 'blank': 'True'}),
+ 'hash': ('django.db.models.fields.CharField', [], {'max_length': '64'}),
+ u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'session': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['pubdb.Session']"}),
+ 'uri': ('django.db.models.fields.CharField', [], {'max_length': '255'})
+ },
+ u'pubdb.session': {
+ 'Meta': {'object_name': 'Session'},
+ 'hash': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+ u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+ 'serial': ('django.db.models.fields.BigIntegerField', [], {}),
+ 'snapshot': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
+ 'uuid': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '36'})
+ }
+ }
+
+ complete_apps = ['pubdb']
diff --git a/rpki/pubdb/migrations/__init__.py b/rpki/pubdb/migrations/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/rpki/pubdb/migrations/__init__.py
diff --git a/rpki/pubdb/models.py b/rpki/pubdb/models.py
new file mode 100644
index 00000000..f7edfd4a
--- /dev/null
+++ b/rpki/pubdb/models.py
@@ -0,0 +1,310 @@
+# $Id$
+#
+# Copyright (C) 2014 Dragon Research Labs ("DRL")
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND DRL DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL DRL BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+"""
+Django ORM models for pubd.
+"""
+
+from __future__ import unicode_literals
+from django.db import models
+from rpki.fields import BlobField, CertificateField, SundialField
+from lxml.etree import Element, SubElement, tostring as ElementToString
+
+import os
+import logging
+import rpki.exceptions
+import rpki.relaxng
+
+logger = logging.getLogger(__name__)
+
+
+# Some of this probably ought to move into a rpki.rrdp module.
+
+rrdp_xmlns = rpki.relaxng.rrdp.xmlns
+rrdp_nsmap = rpki.relaxng.rrdp.nsmap
+rrdp_version = "1"
+
+rrdp_tag_delta = rrdp_xmlns + "delta"
+rrdp_tag_deltas = rrdp_xmlns + "deltas"
+rrdp_tag_notification = rrdp_xmlns + "notification"
+rrdp_tag_publish = rrdp_xmlns + "publish"
+rrdp_tag_snapshot = rrdp_xmlns + "snapshot"
+rrdp_tag_withdraw = rrdp_xmlns + "withdraw"
+
+
+# This would probably be useful to more than just this module, not
+# sure quite where to put it at the moment.
+
+def DERSubElement(elt, name, der, attrib = None, **kwargs):
+ """
+ Convenience wrapper around SubElement for use with Base64 text.
+ """
+
+ se = SubElement(elt, name, attrib, **kwargs)
+ se.text = rpki.x509.base64_with_linebreaks(der)
+ se.tail = "\n"
+ return se
+
+
+
+class Client(models.Model):
+ client_handle = models.CharField(unique = True, max_length = 255)
+ base_uri = models.TextField()
+ bpki_cert = CertificateField()
+ bpki_glue = CertificateField(null = True)
+ last_cms_timestamp = SundialField(blank = True, null = True)
+
+
+ def check_allowed_uri(self, uri):
+ """
+ Make sure that a target URI is within this client's allowed URI space.
+ """
+
+ if not uri.startswith(self.base_uri):
+ raise rpki.exceptions.ForbiddenURI
+
+
+class Session(models.Model):
+ uuid = models.CharField(unique = True, max_length=36)
+ serial = models.BigIntegerField()
+ snapshot = models.TextField(blank = True)
+ hash = models.CharField(max_length = 64, blank = True)
+
+
+ def new_delta(self, expires):
+ """
+ Construct a new delta associated with this session.
+ """
+
+ delta = Delta(session = self,
+ serial = self.serial + 1,
+ expires = expires)
+ delta.deltas = Element(rrdp_tag_deltas,
+ nsmap = rrdp_nsmap,
+ version = rrdp_version,
+ session_id = self.uuid)
+ delta.deltas.set("to", str(delta.serial))
+ delta.deltas.set("from", str(delta.serial - 1))
+ SubElement(delta.deltas, rrdp_tag_delta, serial = str(delta.serial)).text = "\n"
+ return delta
+
+
+ def expire_deltas(self):
+ """
+ Delete deltas whose expiration date has passed.
+ """
+
+ self.delta_set.filter(expires__lt = rpki.sundial.now()).delete()
+
+
+ def generate_snapshot(self):
+ """
+ Generate an XML snapshot of this session.
+ """
+
+ xml = Element(rrdp_tag_snapshot, nsmap = rrdp_nsmap,
+ version = rrdp_version,
+ session_id = self.uuid,
+ serial = str(self.serial))
+ xml.text = "\n"
+ for obj in self.publishedobject_set.all():
+ DERSubElement(xml, rrdp_tag_publish,
+ der = obj.der,
+ uri = obj.uri)
+ rpki.relaxng.rrdp.assertValid(xml)
+ self.snapshot = ElementToString(xml, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.snapshot).encode("hex")
+ self.save()
+
+
+ @property
+ def snapshot_fn(self):
+ return "%s/snapshot/%s.xml" % (self.uuid, self.serial)
+
+
+ @property
+ def notification_fn(self):
+ return "updates.xml"
+
+
+ @staticmethod
+ def _write_rrdp_file(fn, text, rrdp_publication_base, overwrite = False):
+ if overwrite or not os.path.exists(os.path.join(rrdp_publication_base, fn)):
+ tn = os.path.join(rrdp_publication_base, fn + ".%s.tmp" % os.getpid())
+ if not os.path.isdir(os.path.dirname(tn)):
+ os.makedirs(os.path.dirname(tn))
+ with open(tn, "w") as f:
+ f.write(text)
+ os.rename(tn, os.path.join(rrdp_publication_base, fn))
+
+
+ @staticmethod
+ def _rrdp_filename_to_uri(fn, rrdp_uri_base):
+ return "%s/%s" % (rrdp_uri_base.rstrip("/"), fn)
+
+
+ def _generate_update_xml(self, rrdp_uri_base):
+ xml = Element(rrdp_tag_notification, nsmap = rrdp_nsmap,
+ version = rrdp_version,
+ session_id = self.uuid,
+ serial = str(self.serial))
+ SubElement(xml, rrdp_tag_snapshot,
+ uri = self._rrdp_filename_to_uri(self.snapshot_fn, rrdp_uri_base),
+ hash = self.hash)
+ for delta in self.delta_set.all():
+ se = SubElement(xml, rrdp_tag_delta,
+ uri = self._rrdp_filename_to_uri(delta.fn, rrdp_uri_base),
+ hash = delta.hash)
+ se.set("to", str(delta.serial))
+ se.set("from", str(delta.serial - 1))
+ rpki.relaxng.rrdp.assertValid(xml)
+ return ElementToString(xml, pretty_print = True)
+
+
+ def synchronize_rrdp_files(self, rrdp_publication_base, rrdp_uri_base):
+ """
+ Write current RRDP files to disk, clean up old files and directories.
+ """
+
+ current_filenames = set()
+
+ for delta in self.delta_set.all():
+ self._write_rrdp_file(delta.fn, delta.xml, rrdp_publication_base)
+ current_filenames.add(delta.fn)
+
+ self._write_rrdp_file(self.snapshot_fn, self.snapshot, rrdp_publication_base)
+ current_filenames.add(self.snapshot_fn)
+
+ self._write_rrdp_file(self.notification_fn, self._generate_update_xml(rrdp_uri_base),
+ rrdp_publication_base, overwrite = True)
+ current_filenames.add(self.notification_fn)
+
+ for root, dirs, files in os.walk(rrdp_publication_base, topdown = False):
+ for fn in files:
+ fn = os.path.join(root, fn)
+ if fn[len(rrdp_publication_base):].lstrip("/") not in current_filenames:
+ os.remove(fn)
+ for dn in dirs:
+ try:
+ os.rmdir(os.path.join(root, dn))
+ except OSError:
+ pass
+
+
+class Delta(models.Model):
+ serial = models.BigIntegerField()
+ xml = models.TextField()
+ hash = models.CharField(max_length = 64)
+ expires = SundialField()
+ session = models.ForeignKey(Session)
+
+
+ @staticmethod
+ def _uri_to_filename(uri, publication_base):
+ if not uri.startswith("rsync://"):
+ raise rpki.exceptions.BadURISyntax(uri)
+ path = uri.split("/")[4:]
+ path.insert(0, publication_base.rstrip("/"))
+ filename = "/".join(path)
+ if "/../" in filename or filename.endswith("/.."):
+ raise rpki.exceptions.BadURISyntax(filename)
+ return filename
+
+
+ @property
+ def fn(self):
+ return "%s/deltas/%s-%s.xml" % (self.session.uuid, self.serial - 1, self.serial)
+
+
+ def activate(self):
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+ self.xml = ElementToString(self.deltas, pretty_print = True)
+ self.hash = rpki.x509.sha256(self.xml).encode("hex")
+ self.save()
+ self.session.serial += 1
+ self.session.save()
+
+
+ def publish(self, client, der, uri, hash):
+ try:
+ obj = client.publishedobject_set.get(session = self.session, uri = uri)
+ if obj.hash == hash:
+ obj.delete()
+ elif hash is None:
+ raise rpki.exceptions.ExistingObjectAtURI("Object already published at %s" % uri)
+ else:
+ raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
+ except rpki.pubdb.PublishedObject.DoesNotExist:
+ pass
+ logger.debug("Publishing %s", uri)
+ PublishedObject.objects.create(session = self.session, client = client, der = der, uri = uri,
+ hash = rpki.x509.sha256(der).encode("hex"))
+ se = DERSubElement(self.deltas[0], rrdp_tag_publish, der = der, uri = uri)
+ if hash is not None:
+ se.set("hash", hash)
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+
+
+ def withdraw(self, client, uri, hash):
+ obj = client.publishedobject_set.get(session = self.session, uri = uri)
+ if obj.hash != hash:
+ raise rpki.exceptions.DifferentObjectAtURI("Found different object at %s (old %s, new %s)" % (uri, obj.hash, hash))
+ logger.debug("Withdrawing %s", uri)
+ obj.delete()
+ SubElement(self.deltas[0], rrdp_tag_withdraw, uri = uri, hash = hash).tail = "\n"
+ rpki.relaxng.rrdp.assertValid(self.deltas)
+
+
+ def update_rsync_files(self, publication_base):
+ min_path_len = len(publication_base.rstrip("/"))
+ for pdu in self.deltas[0]:
+ assert pdu.tag in (rrdp_tag_publish, rrdp_tag_withdraw)
+ fn = self._uri_to_filename(pdu.get("uri"), publication_base)
+ if pdu.tag == rrdp_tag_publish:
+ tn = fn + ".tmp"
+ dn = os.path.dirname(fn)
+ if not os.path.isdir(dn):
+ os.makedirs(dn)
+ with open(tn, "wb") as f:
+ f.write(pdu.text.decode("base64"))
+ os.rename(tn, fn)
+ else:
+ try:
+ os.remove(fn)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ dn = os.path.dirname(fn)
+ while len(dn) > min_path_len:
+ try:
+ os.rmdir(dn)
+ except OSError:
+ break
+ else:
+ dn = os.path.dirname(dn)
+ del self.deltas
+
+
+class PublishedObject(models.Model):
+ uri = models.CharField(max_length = 255)
+ der = BlobField()
+ hash = models.CharField(max_length = 64)
+ client = models.ForeignKey(Client)
+ session = models.ForeignKey(Session)
+
+ class Meta:
+ unique_together = (("session", "hash"),
+ ("session", "uri"))
diff --git a/rpki/publication.py b/rpki/publication.py
index 9b5bfcbd..fc5b2627 100644
--- a/rpki/publication.py
+++ b/rpki/publication.py
@@ -35,6 +35,8 @@ import rpki.relaxng
import rpki.sundial
import rpki.log
+from lxml.etree import Element, SubElement
+
logger = logging.getLogger(__name__)
diff --git a/rpki/publication_control.py b/rpki/publication_control.py
index eae96ccc..478f183b 100644
--- a/rpki/publication_control.py
+++ b/rpki/publication_control.py
@@ -39,6 +39,38 @@ import rpki.log
logger = logging.getLogger(__name__)
+nsmap = rpki.relaxng.publication_control.nsmap
+version = rpki.relaxng.publication_control.version
+
+tag_msg = rpki.relaxng.publication_control.xmlns + "msg"
+tag_client = rpki.relaxng.publication_control.xmlns + "client"
+tag_bpki_cert = rpki.relaxng.publication_control.xmlns + "bpki_cert"
+tag_bpki_glue = rpki.relaxng.publication_control.xmlns + "bpki_glue"
+tag_report_error = rpki.relaxng.publication_control.xmlns + "report_error"
+
+
+def raise_if_error(pdu):
+ """
+ Raise an appropriate error if this is a <report_error/> PDU.
+
+ As a convience, this will also accept a <msg/> PDU and raise an
+ appropriate error if it contains any <report_error/> PDUs.
+ """
+
+ if pdu.tag == tag_report_error:
+ code = pdu.get("error_code")
+ logger.debug("<report_error/> code %r", code)
+ e = getattr(rpki.exceptions, code, None)
+ if e is not None and issubclass(e, rpki.exceptions.RPKI_Exception):
+ raise e(pdu.text)
+ else:
+ raise rpki.exceptions.BadPublicationReply("Unexpected response from pubd: %r, %r" % (code, pdu))
+
+ if pdu.tag == tag_msg:
+ for p in pdu:
+ raise_if_error(p)
+
+
class publication_control_namespace(object):
"""
XML namespace parameters for publication control protocol.
@@ -157,7 +189,7 @@ class report_error_elt(rpki.xml_utils.text_elt, publication_control_namespace):
Raise exception associated with this <report_error/> PDU.
"""
- t = rpki.exceptions.__dict__.get(self.error_code)
+ t = getattr(rpki.exceptions, self.error_code, None)
if isinstance(t, type) and issubclass(t, rpki.exceptions.RPKI_Exception):
raise t(getattr(self, "text", None))
else:
@@ -225,3 +257,13 @@ class cms_msg(rpki.x509.XML_CMS_object):
encoding = "us-ascii"
schema = rpki.relaxng.publication_control
saxify = sax_handler.saxify
+
+
+class cms_msg_no_sax(cms_msg):
+ """
+ Class to hold a CMS-signed publication control PDU without legacy
+ SAX transcoding. The name is a transition kludge, this class will
+ be renamed cms_msg once the SAX code goes away.
+ """
+
+ saxify = None