aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ca/tests/yamlconf.py58
1 files changed, 57 insertions, 1 deletions
diff --git a/ca/tests/yamlconf.py b/ca/tests/yamlconf.py
index 5a66fe8f..56bec483 100644
--- a/ca/tests/yamlconf.py
+++ b/ca/tests/yamlconf.py
@@ -29,13 +29,14 @@ able to do the initial configuration stage quickly can help a lot.
# pylint: disable=W0702,W0621,W0602
-import subprocess
import re
import os
import sys
import yaml
import time
import argparse
+import subprocess
+import lxml.etree
import rpki.resource_set
import rpki.sundial
import rpki.config
@@ -107,6 +108,41 @@ class roa_request(object):
def parse(cls, y):
return cls(y.get("asn"), y.get("ipv4"), y.get("ipv6"))
+class router_cert(object):
+ """
+ Representation for a router_cert object.
+ """
+
+ _ecparams = None
+
+ @classmethod
+ def ecparams(cls):
+ if cls._ecparams is None:
+ cls._ecparams = rpki.x509.KeyParams.generateEC()
+ return cls._ecparams
+
+ def __init__(self, asn, router_id):
+ self.asn = rpki.resource_set.resource_set_as("".join(str(asn).split()))
+ self.router_id = router_id
+ self.keypair = rpki.x509.ECDSA.generate(self.ecparams())
+ self.pkcs10 = rpki.x509.PKCS10.create(keypair = self.keypair)
+ self.gski = self.pkcs10.gSKI()
+
+ def __eq__(self, other):
+ return self.asn == other.asn and self.router_id == other.router_id and self.gski == other.gski
+
+ def __hash__(self):
+ v6 = tuple(self.v6) if self.v6 is not None else None
+ return tuple(self.asn).__hash__() + self.router_id.__hash__() + self.gski.__hash__()
+
+ def __str__(self):
+ return "%s: %s: %s" % (self.asn, self.router_id, self.gski)
+
+ @classmethod
+ def parse(cls, yaml):
+ return cls(yaml.get("asn"), yaml.get("router_id"))
+
+
class allocation_db(list):
"""
Allocation database.
@@ -197,11 +233,14 @@ class allocation(object):
else:
self.ghostbusters = []
self.roa_requests = [roa_request.parse(r) for r in y.get("roa_request", ())]
+ self.router_certs = [router_cert.parse(r) for r in y.get("router_cert", ())]
for r in self.roa_requests:
if r.v4:
self.base.v4 |= r.v4.to_resource_set()
if r.v6:
self.base.v6 |= r.v6.to_resource_set()
+ for r in self.router_certs:
+ self.base.asn |= r.asn
self.hosted_by = y.get("hosted_by")
self.hosts = []
if not self.is_hosted:
@@ -313,6 +352,22 @@ class allocation(object):
f.write("\n")
f.write(g)
+ def dump_router_certificates(self, fn):
+ if self.router_certs:
+ path = self.path(fn)
+ if not quiet:
+ print "Writing", path
+ xmlns = "{http://www.hactrn.net/uris/rpki/router-certificate/}"
+ xml = lxml.etree.Element(xmlns + "router_certificate_requests", version = "1")
+ for r in self.router_certs:
+ x = lxml.etree.SubElement(xml, xmlns + "router_certificate_request",
+ router_id = str(r.router_id),
+ asn = str(r.asn),
+ valid_until = str(self.resources.valid_until))
+ x.text = r.pkcs10.get_Base64()
+ rpki.relaxng.router_certificate.assertValid(xml)
+ lxml.etree.ElementTree(xml).write(path, pretty_print = True)
+
@property
def pubd(self):
s = self
@@ -749,6 +804,7 @@ def body():
d.dump_prefixes("%s.prefixes.csv" % d.name)
d.dump_roas("%s.roas.csv" % d.name)
d.dump_ghostbusters("%s.ghostbusters.vcard" % d.name)
+ d.dump_router_certificates("%s.routercerts.xml" % d.name)
if not d.is_hosted:
if not quiet: