diff options
Diffstat (limited to 'scripts/rpki')
-rw-r--r-- | scripts/rpki/cms.py | 4 | ||||
-rw-r--r-- | scripts/rpki/https.py | 12 | ||||
-rw-r--r-- | scripts/rpki/x509.py | 159 |
3 files changed, 99 insertions, 76 deletions
diff --git a/scripts/rpki/cms.py b/scripts/rpki/cms.py index 32d76c71..374f592d 100644 --- a/scripts/rpki/cms.py +++ b/scripts/rpki/cms.py @@ -11,7 +11,9 @@ import os, rpki.x509 def encode(xml, key, cert_files): - certs = rpki.x509.sort_chain([rpki.x509.X509(PEM_file=PEM_file) for PEM_file in cert_files]) + certs = rpki.x509.X509_chain() + certs.load_from_PEM(cert_files) + certs.chainsort() signer_filename = "cms.tmp.signer.pem" certfile_filename = "cms.tmp.certfile.pem" diff --git a/scripts/rpki/https.py b/scripts/rpki/https.py index 1eb6412c..4be20537 100644 --- a/scripts/rpki/https.py +++ b/scripts/rpki/https.py @@ -24,10 +24,14 @@ class CertInfo(object): self.privateKey = tlslite.api.parsePEMKey(f.read(), private=True) f.close() - chain = [rpki.x509.X509(PEM_file=PEM_file) for PEM_file in glob.glob(self.cert_dir + myname + "-*.cer")] - self.certChain = tlslite.api.X509CertChain([x.get_tlslite() for x in rpki.x509.sort_chain(chain)]) - - self.x509TrustList = [rpki.x509.X509(PEM_file=PEM_file).get_tlslite() for PEM_file in glob.glob(self.cert_dir + "*-Root.cer")] + chain = rpki.x509.X509_chain() + chain.load_from_PEM(glob.glob(self.cert_dir + myname + "-*.cer")) + chain.chainsort() + self.certChain = chain.tlslite_certChain() + + trustlist = rpki.x509.X509_chain() + trustlist.load_from_PEM(glob.glob(self.cert_dir + "*-Root.cer")) + self.x509TrustList = trustlist.tlslite_trustList() def client(msg, certInfo, host="localhost", port=4433, url="/"): httpc = tlslite.api.HTTPTLSConnection(host=host, diff --git a/scripts/rpki/x509.py b/scripts/rpki/x509.py index d285ab9d..7c7c75ec 100644 --- a/scripts/rpki/x509.py +++ b/scripts/rpki/x509.py @@ -13,26 +13,46 @@ some of the nasty details. This involves a lot of format conversion. import POW, tlslite.api, POW.pkix, base64 -class X509(object): +class PEM_converter(object): """ - Class to hold all the different representations of X.509 certs we're - using and convert between them. + Convert between DER and PEM encodings for various kinds of ASN.1 data. + """ + + def __init__(self, kind): # "CERTIFICATE", "RSA PRIVATE KEY", ... + self.b = "-----BEGIN %s-----" % kind + self.e = "-----END %s-----" % kind + + def toDER(self, pem): + lines = pem.splitlines(0) + while lines and lines.pop(0) != self.b: + pass + while lines and lines.pop(-1) != self.e: + pass + assert lines + return base64.b64decode("".join(lines)) + + def toPEM(self, der): + b64 = base64.b64encode(der) + pem = self.b + "\n" + while len(b64) > 64: + pem += b64[0:64] + "\n" + b64 = b64[64:] + return pem + b64 + "\n" + self.e + "\n" + +class DER_object(object): + """ + Virtual class to hold a generic DER object. """ + formats = ("DER",) + pem_converter = None + def empty(self): - return (self.DER is None and - self.PEM is None and - self.POW is None and - self.POWpkix is None and - self.tlslite is None) + return reduce(lambda x,y: x and getattr(self, y, None) is None, self.formats, True) def clear(self): - self.DER = None - self.PEM = None - self.POW = None - self.POWpkix = None - self.tlslite = None - self.POW_extensions = None + for fmt in self.formats: + setattr(self, fmt, None) def __init__(self, **kw): self.clear() @@ -42,7 +62,7 @@ class X509(object): def set(self, **kw): name = kw.keys()[0] if len(kw) == 1: - if name in ("DER", "PEM", "POW", "POWpkix", "tlslite"): + if name in self.formats: self.clear() setattr(self, name, kw[name]) return @@ -50,11 +70,10 @@ class X509(object): f = open(kw[name], "r") text = f.read() f.close() - self.clear() if name == "PEM_file": - self.PEM = text - else: - self.DER = text + text = self.pem_converter.toDER(text) + self.clear() + self.DER = text return raise TypeError @@ -62,15 +81,30 @@ class X509(object): assert not self.empty() if self.DER: return self.DER + raise RuntimeError, "No conversion path to DER available" + + def get_PEM(self): + return self.pem_converter.toPEM(self.get_DER()) + +class X509(DER_object): + """ + Class to hold all the different representations of X.509 certs we're + using and convert between them. + """ + + formats = ("DER", "POW", "POWpkix", "tlslite") + pem_converter = PEM_converter("CERTIFICATE") + + def get_DER(self): + assert not self.empty() + if self.DER: + return self.DER if self.POW: self.DER = self.POW.derWrite() return self.get_DER() if self.POWpkix: self.DER = self.POWpkix.toString() return self.get_DER() - if self.PEM: - self.POW = POW.pemRead(POW.X509_CERTIFICATE, self.PEM) - return self.get_DER() raise RuntimeError def get_POW(self): @@ -79,12 +113,6 @@ class X509(object): self.POW = POW.derRead(POW.X509_CERTIFICATE, self.get_DER()) return self.POW - def get_PEM(self): - assert not self.empty() - if not self.PEM: - self.PEM = self.get_POW().pemWrite() - return self.PEM - def get_POWpkix(self): assert not self.empty() if not self.POWpkix: @@ -123,56 +151,45 @@ class X509(object): def getSKI(self): return self.get_POW_extensions().get("subjectKeyIdentifier") -def sort_chain(bag): +class X509_chain(list): """ - Sort a bag of certs into a chain, leaf first. Various other - routines want their certs presented in this order. + Collection of certs with sorting and conversion functions + for various packages. """ - issuer_names = [x.getIssuer() for x in bag] - subject_map = dict([(x.getSubject(), x) for x in bag]) - chain = [] - - for subject in subject_map: - if subject not in issuer_names: - cert = subject_map[subject] + def chainsort(self): + """ + Sort a bag of certs into a chain, leaf first. Various other + routines want their certs presented in this order. + """ + + bag = self[:] + issuer_names = [x.getIssuer() for x in bag] + subject_map = dict([(x.getSubject(), x) for x in bag]) + chain = [] + for subject in subject_map: + if subject not in issuer_names: + cert = subject_map[subject] + chain.append(cert) + bag.remove(cert) + assert len(chain) == 1 + while bag: + cert = subject_map[chain[-1].getIssuer()] chain.append(cert) bag.remove(cert) + self[:] = chain - assert len(chain) == 1 + def tlslite_certChain(self): + return tlslite.api.X509CertChain([x.get_tlslite() for x in self]) - while bag: - cert = subject_map[chain[-1].getIssuer()] - chain.append(cert) - bag.remove(cert) + def tlslite_trustList(self): + return [x.get_tlslite() for x in self] - return chain - -def _pem_delimiters(type): - return ("-----BEGIN %s-----" % type, - "-----END %s-----" % type) - -def pem2der(pem, type): - """ - Generic PEM -> DER converter. Second argument is type of PEM text - to be converted ("CERTIFICATE", "RSA PRIVATE KEY", etc). - """ - - bdelim, edelim = _pem_delimiters(type) - lines = pem.splitlines(0) - assert lines[0] == bdelim and lines[-1] == edelim - return base64.b64decode("".join(lines[1:-2])) + def clear(self): + self[:] = [] -def der2pem(der, type): - """ - Generic DER -> PEM converter. Second argument is type of PEM text - to be converted ("CERTIFICATE", "RSA PRIVATE KEY", etc). - """ + def load_from_PEM(self, files): + self.extend([X509(PEM_file=f) for f in files]) - bdelim, edelim = _pem_delimiters(type) - b64 = base64.b64enode(der) - pem = bdelim - while len(b64) > 64: - pem += b64[0:63] + "\n" - b64 = b64[64:] - return pem + b64 + "\n" + edelim + "\n" + def load_from_DER(self, files): + self.extend([X509(DER_file=f) for f in files]) |