aboutsummaryrefslogtreecommitdiff
path: root/scripts/rpki
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rpki')
-rw-r--r--scripts/rpki/cms.py4
-rw-r--r--scripts/rpki/https.py12
-rw-r--r--scripts/rpki/x509.py159
3 files changed, 99 insertions, 76 deletions
diff --git a/scripts/rpki/cms.py b/scripts/rpki/cms.py
index 32d76c71..374f592d 100644
--- a/scripts/rpki/cms.py
+++ b/scripts/rpki/cms.py
@@ -11,7 +11,9 @@ import os, rpki.x509
def encode(xml, key, cert_files):
- certs = rpki.x509.sort_chain([rpki.x509.X509(PEM_file=PEM_file) for PEM_file in cert_files])
+ certs = rpki.x509.X509_chain()
+ certs.load_from_PEM(cert_files)
+ certs.chainsort()
signer_filename = "cms.tmp.signer.pem"
certfile_filename = "cms.tmp.certfile.pem"
diff --git a/scripts/rpki/https.py b/scripts/rpki/https.py
index 1eb6412c..4be20537 100644
--- a/scripts/rpki/https.py
+++ b/scripts/rpki/https.py
@@ -24,10 +24,14 @@ class CertInfo(object):
self.privateKey = tlslite.api.parsePEMKey(f.read(), private=True)
f.close()
- chain = [rpki.x509.X509(PEM_file=PEM_file) for PEM_file in glob.glob(self.cert_dir + myname + "-*.cer")]
- self.certChain = tlslite.api.X509CertChain([x.get_tlslite() for x in rpki.x509.sort_chain(chain)])
-
- self.x509TrustList = [rpki.x509.X509(PEM_file=PEM_file).get_tlslite() for PEM_file in glob.glob(self.cert_dir + "*-Root.cer")]
+ chain = rpki.x509.X509_chain()
+ chain.load_from_PEM(glob.glob(self.cert_dir + myname + "-*.cer"))
+ chain.chainsort()
+ self.certChain = chain.tlslite_certChain()
+
+ trustlist = rpki.x509.X509_chain()
+ trustlist.load_from_PEM(glob.glob(self.cert_dir + "*-Root.cer"))
+ self.x509TrustList = trustlist.tlslite_trustList()
def client(msg, certInfo, host="localhost", port=4433, url="/"):
httpc = tlslite.api.HTTPTLSConnection(host=host,
diff --git a/scripts/rpki/x509.py b/scripts/rpki/x509.py
index d285ab9d..7c7c75ec 100644
--- a/scripts/rpki/x509.py
+++ b/scripts/rpki/x509.py
@@ -13,26 +13,46 @@ some of the nasty details. This involves a lot of format conversion.
import POW, tlslite.api, POW.pkix, base64
-class X509(object):
+class PEM_converter(object):
"""
- Class to hold all the different representations of X.509 certs we're
- using and convert between them.
+ Convert between DER and PEM encodings for various kinds of ASN.1 data.
+ """
+
+ def __init__(self, kind): # "CERTIFICATE", "RSA PRIVATE KEY", ...
+ self.b = "-----BEGIN %s-----" % kind
+ self.e = "-----END %s-----" % kind
+
+ def toDER(self, pem):
+ lines = pem.splitlines(0)
+ while lines and lines.pop(0) != self.b:
+ pass
+ while lines and lines.pop(-1) != self.e:
+ pass
+ assert lines
+ return base64.b64decode("".join(lines))
+
+ def toPEM(self, der):
+ b64 = base64.b64encode(der)
+ pem = self.b + "\n"
+ while len(b64) > 64:
+ pem += b64[0:64] + "\n"
+ b64 = b64[64:]
+ return pem + b64 + "\n" + self.e + "\n"
+
+class DER_object(object):
+ """
+ Virtual class to hold a generic DER object.
"""
+ formats = ("DER",)
+ pem_converter = None
+
def empty(self):
- return (self.DER is None and
- self.PEM is None and
- self.POW is None and
- self.POWpkix is None and
- self.tlslite is None)
+ return reduce(lambda x,y: x and getattr(self, y, None) is None, self.formats, True)
def clear(self):
- self.DER = None
- self.PEM = None
- self.POW = None
- self.POWpkix = None
- self.tlslite = None
- self.POW_extensions = None
+ for fmt in self.formats:
+ setattr(self, fmt, None)
def __init__(self, **kw):
self.clear()
@@ -42,7 +62,7 @@ class X509(object):
def set(self, **kw):
name = kw.keys()[0]
if len(kw) == 1:
- if name in ("DER", "PEM", "POW", "POWpkix", "tlslite"):
+ if name in self.formats:
self.clear()
setattr(self, name, kw[name])
return
@@ -50,11 +70,10 @@ class X509(object):
f = open(kw[name], "r")
text = f.read()
f.close()
- self.clear()
if name == "PEM_file":
- self.PEM = text
- else:
- self.DER = text
+ text = self.pem_converter.toDER(text)
+ self.clear()
+ self.DER = text
return
raise TypeError
@@ -62,15 +81,30 @@ class X509(object):
assert not self.empty()
if self.DER:
return self.DER
+ raise RuntimeError, "No conversion path to DER available"
+
+ def get_PEM(self):
+ return self.pem_converter.toPEM(self.get_DER())
+
+class X509(DER_object):
+ """
+ Class to hold all the different representations of X.509 certs we're
+ using and convert between them.
+ """
+
+ formats = ("DER", "POW", "POWpkix", "tlslite")
+ pem_converter = PEM_converter("CERTIFICATE")
+
+ def get_DER(self):
+ assert not self.empty()
+ if self.DER:
+ return self.DER
if self.POW:
self.DER = self.POW.derWrite()
return self.get_DER()
if self.POWpkix:
self.DER = self.POWpkix.toString()
return self.get_DER()
- if self.PEM:
- self.POW = POW.pemRead(POW.X509_CERTIFICATE, self.PEM)
- return self.get_DER()
raise RuntimeError
def get_POW(self):
@@ -79,12 +113,6 @@ class X509(object):
self.POW = POW.derRead(POW.X509_CERTIFICATE, self.get_DER())
return self.POW
- def get_PEM(self):
- assert not self.empty()
- if not self.PEM:
- self.PEM = self.get_POW().pemWrite()
- return self.PEM
-
def get_POWpkix(self):
assert not self.empty()
if not self.POWpkix:
@@ -123,56 +151,45 @@ class X509(object):
def getSKI(self):
return self.get_POW_extensions().get("subjectKeyIdentifier")
-def sort_chain(bag):
+class X509_chain(list):
"""
- Sort a bag of certs into a chain, leaf first. Various other
- routines want their certs presented in this order.
+ Collection of certs with sorting and conversion functions
+ for various packages.
"""
- issuer_names = [x.getIssuer() for x in bag]
- subject_map = dict([(x.getSubject(), x) for x in bag])
- chain = []
-
- for subject in subject_map:
- if subject not in issuer_names:
- cert = subject_map[subject]
+ def chainsort(self):
+ """
+ Sort a bag of certs into a chain, leaf first. Various other
+ routines want their certs presented in this order.
+ """
+
+ bag = self[:]
+ issuer_names = [x.getIssuer() for x in bag]
+ subject_map = dict([(x.getSubject(), x) for x in bag])
+ chain = []
+ for subject in subject_map:
+ if subject not in issuer_names:
+ cert = subject_map[subject]
+ chain.append(cert)
+ bag.remove(cert)
+ assert len(chain) == 1
+ while bag:
+ cert = subject_map[chain[-1].getIssuer()]
chain.append(cert)
bag.remove(cert)
+ self[:] = chain
- assert len(chain) == 1
+ def tlslite_certChain(self):
+ return tlslite.api.X509CertChain([x.get_tlslite() for x in self])
- while bag:
- cert = subject_map[chain[-1].getIssuer()]
- chain.append(cert)
- bag.remove(cert)
+ def tlslite_trustList(self):
+ return [x.get_tlslite() for x in self]
- return chain
-
-def _pem_delimiters(type):
- return ("-----BEGIN %s-----" % type,
- "-----END %s-----" % type)
-
-def pem2der(pem, type):
- """
- Generic PEM -> DER converter. Second argument is type of PEM text
- to be converted ("CERTIFICATE", "RSA PRIVATE KEY", etc).
- """
-
- bdelim, edelim = _pem_delimiters(type)
- lines = pem.splitlines(0)
- assert lines[0] == bdelim and lines[-1] == edelim
- return base64.b64decode("".join(lines[1:-2]))
+ def clear(self):
+ self[:] = []
-def der2pem(der, type):
- """
- Generic DER -> PEM converter. Second argument is type of PEM text
- to be converted ("CERTIFICATE", "RSA PRIVATE KEY", etc).
- """
+ def load_from_PEM(self, files):
+ self.extend([X509(PEM_file=f) for f in files])
- bdelim, edelim = _pem_delimiters(type)
- b64 = base64.b64enode(der)
- pem = bdelim
- while len(b64) > 64:
- pem += b64[0:63] + "\n"
- b64 = b64[64:]
- return pem + b64 + "\n" + edelim + "\n"
+ def load_from_DER(self, files):
+ self.extend([X509(DER_file=f) for f in files])