aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/x509.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/x509.py')
-rw-r--r--rpkid/rpki/x509.py104
1 files changed, 29 insertions, 75 deletions
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py
index a74fc429..71ff4d53 100644
--- a/rpkid/rpki/x509.py
+++ b/rpkid/rpki/x509.py
@@ -356,74 +356,6 @@ class X509(DER_object):
return X509(POWpkix = cert)
-class X509_chain(list):
- """Collections of certs.
-
- This class provides sorting and conversion functions for various
- packages.
- """
-
- def __init__(self, *args, **kw):
- """Initialize an X509_chain."""
- if args:
- self[:] = args
- elif "PEM_files" in kw:
- self.load_from_PEM(kw["PEM_files"])
- elif "DER_files" in kw:
- self.load_from_DER(kw["DER_files"])
- elif "Auto_files" in kw:
- self.load_from_Auto(kw["Auto_files"])
- elif kw:
- raise TypeError
-
- def chainsort(self):
- """Sort a bag of certs into a chain, leaf first.
-
- Various other routines want their certs presented in this order.
- """
- if len(self) > 1:
- bag = self[:]
- issuer_names = [x.getIssuer() for x in bag]
- subject_map = dict([(x.getSubject(), x) for x in bag])
- chain = []
- for subject in subject_map:
- if subject not in issuer_names:
- cert = subject_map[subject]
- chain.append(cert)
- bag.remove(cert)
- if len(chain) != 1:
- raise rpki.exceptions.NotACertificateChain, "Certificates in bag don't form a proper chain"
- while bag:
- cert = subject_map[chain[-1].getIssuer()]
- chain.append(cert)
- bag.remove(cert)
- self[:] = chain
-
- def tlslite_certChain(self):
- """Return a certChain in the format tlslite likes."""
- self.chainsort()
- return tlslite.api.X509CertChain([x.get_tlslite() for x in self])
-
- def tlslite_trustList(self):
- """Return a trustList in the format tlslite likes."""
- return [x.get_tlslite() for x in self]
-
- def clear(self):
- """Drop all certs from this bag onto the floor."""
- self[:] = []
-
- def load_from_PEM(self, files):
- """Load a set of certs from a list of PEM files."""
- self.extend([X509(PEM_file=f) for f in files])
-
- def load_from_DER(self, files):
- """Load a set of certs from a list of DER files."""
- self.extend([X509(DER_file=f) for f in files])
-
- def load_from_Auto(self, files):
- """Load a set of certs from a list of DER or PEM files (guessing)."""
- self.extend([X509(Auto_file=f) for f in files])
-
class PKCS10(DER_object):
"""Class to hold a PKCS #10 request."""
@@ -622,6 +554,7 @@ class CMS_object(DER_object):
econtent_oid = POWify("id-data")
dump_on_verify_failure = False
+ debug_cms_certs = True
def get_DER(self):
"""Get the DER value of this CMS_object."""
@@ -644,14 +577,27 @@ class CMS_object(DER_object):
"""Verify CMS wrapper and store inner content."""
cms = POW.derRead(POW.CMS_MESSAGE, self.get_DER())
+
if cms.eContentType() != self.econtent_oid:
raise rpki.exceptions.WrongEContentType, "Got CMS eContentType %s, expected %s" % (cms.eContentType(), self.econtent_oid)
+
store = POW.X509Store()
- if isinstance(ta, (tuple, list)):
- for x in ta:
- store.addTrust(x.get_POW())
- else:
- store.addTrust(ta.get_POW())
+
+ if isinstance(ta, X509):
+ ta = (ta,)
+
+ for x in ta:
+ if self.debug_cms_certs:
+ rpki.log.debug("CMS trusted cert %s" % x.getSubject())
+ store.addTrust(x.get_POW())
+
+ if self.debug_cms_certs:
+ try:
+ for x in cms.certs():
+ rpki.log.debug("Received CMS cert %s" % x.getSubject())
+ except:
+ pass
+
try:
content = cms.verify(store)
except:
@@ -659,17 +605,25 @@ class CMS_object(DER_object):
print "CMS verification failed, dumping ASN.1:"
self.dumpasn1()
raise rpki.exceptions.CMSVerificationFailed, "CMS verification failed"
+
self.decode(content)
return self.get_content()
def sign(self, keypair, certs, crls = None, no_certs = False):
"""Sign and wrap inner content."""
+ if isinstance(certs, X509):
+ cert = certs
+ certs = ()
+ else:
+ cert = certs[0]
+ certs = certs[1:]
+
cms = POW.CMS()
- cms.sign(certs[0].get_POW(),
+ cms.sign(cert.get_POW(),
keypair.get_POW(),
self.encode(),
- [x.get_POW() for x in certs[1:]],
+ [x.get_POW() for x in certs],
crls,
self.econtent_oid,
POW.CMS_NOCERTS if no_certs else 0)