diff options
Diffstat (limited to 'rpkid/rpki/x509.py')
-rw-r--r-- | rpkid/rpki/x509.py | 104 |
1 files changed, 29 insertions, 75 deletions
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py index a74fc429..71ff4d53 100644 --- a/rpkid/rpki/x509.py +++ b/rpkid/rpki/x509.py @@ -356,74 +356,6 @@ class X509(DER_object): return X509(POWpkix = cert) -class X509_chain(list): - """Collections of certs. - - This class provides sorting and conversion functions for various - packages. - """ - - def __init__(self, *args, **kw): - """Initialize an X509_chain.""" - if args: - self[:] = args - elif "PEM_files" in kw: - self.load_from_PEM(kw["PEM_files"]) - elif "DER_files" in kw: - self.load_from_DER(kw["DER_files"]) - elif "Auto_files" in kw: - self.load_from_Auto(kw["Auto_files"]) - elif kw: - raise TypeError - - def chainsort(self): - """Sort a bag of certs into a chain, leaf first. - - Various other routines want their certs presented in this order. - """ - if len(self) > 1: - bag = self[:] - issuer_names = [x.getIssuer() for x in bag] - subject_map = dict([(x.getSubject(), x) for x in bag]) - chain = [] - for subject in subject_map: - if subject not in issuer_names: - cert = subject_map[subject] - chain.append(cert) - bag.remove(cert) - if len(chain) != 1: - raise rpki.exceptions.NotACertificateChain, "Certificates in bag don't form a proper chain" - while bag: - cert = subject_map[chain[-1].getIssuer()] - chain.append(cert) - bag.remove(cert) - self[:] = chain - - def tlslite_certChain(self): - """Return a certChain in the format tlslite likes.""" - self.chainsort() - return tlslite.api.X509CertChain([x.get_tlslite() for x in self]) - - def tlslite_trustList(self): - """Return a trustList in the format tlslite likes.""" - return [x.get_tlslite() for x in self] - - def clear(self): - """Drop all certs from this bag onto the floor.""" - self[:] = [] - - def load_from_PEM(self, files): - """Load a set of certs from a list of PEM files.""" - self.extend([X509(PEM_file=f) for f in files]) - - def load_from_DER(self, files): - """Load a set of certs from a list of DER files.""" - self.extend([X509(DER_file=f) for f in files]) - - def load_from_Auto(self, files): - """Load a set of certs from a list of DER or PEM files (guessing).""" - self.extend([X509(Auto_file=f) for f in files]) - class PKCS10(DER_object): """Class to hold a PKCS #10 request.""" @@ -622,6 +554,7 @@ class CMS_object(DER_object): econtent_oid = POWify("id-data") dump_on_verify_failure = False + debug_cms_certs = True def get_DER(self): """Get the DER value of this CMS_object.""" @@ -644,14 +577,27 @@ class CMS_object(DER_object): """Verify CMS wrapper and store inner content.""" cms = POW.derRead(POW.CMS_MESSAGE, self.get_DER()) + if cms.eContentType() != self.econtent_oid: raise rpki.exceptions.WrongEContentType, "Got CMS eContentType %s, expected %s" % (cms.eContentType(), self.econtent_oid) + store = POW.X509Store() - if isinstance(ta, (tuple, list)): - for x in ta: - store.addTrust(x.get_POW()) - else: - store.addTrust(ta.get_POW()) + + if isinstance(ta, X509): + ta = (ta,) + + for x in ta: + if self.debug_cms_certs: + rpki.log.debug("CMS trusted cert %s" % x.getSubject()) + store.addTrust(x.get_POW()) + + if self.debug_cms_certs: + try: + for x in cms.certs(): + rpki.log.debug("Received CMS cert %s" % x.getSubject()) + except: + pass + try: content = cms.verify(store) except: @@ -659,17 +605,25 @@ class CMS_object(DER_object): print "CMS verification failed, dumping ASN.1:" self.dumpasn1() raise rpki.exceptions.CMSVerificationFailed, "CMS verification failed" + self.decode(content) return self.get_content() def sign(self, keypair, certs, crls = None, no_certs = False): """Sign and wrap inner content.""" + if isinstance(certs, X509): + cert = certs + certs = () + else: + cert = certs[0] + certs = certs[1:] + cms = POW.CMS() - cms.sign(certs[0].get_POW(), + cms.sign(cert.get_POW(), keypair.get_POW(), self.encode(), - [x.get_POW() for x in certs[1:]], + [x.get_POW() for x in certs], crls, self.econtent_oid, POW.CMS_NOCERTS if no_certs else 0) |