aboutsummaryrefslogtreecommitdiff
path: root/rpkid.stable/rpki/x509.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid.stable/rpki/x509.py')
-rw-r--r--rpkid.stable/rpki/x509.py995
1 files changed, 0 insertions, 995 deletions
diff --git a/rpkid.stable/rpki/x509.py b/rpkid.stable/rpki/x509.py
deleted file mode 100644
index b167560c..00000000
--- a/rpkid.stable/rpki/x509.py
+++ /dev/null
@@ -1,995 +0,0 @@
-"""One X.509 implementation to rule them all...
-
-...and in the darkness hide the twisty maze of partially overlapping
-X.509 support packages in Python.
-
-There are several existing packages, none of which do quite what I
-need, due to age, lack of documentation, specialization, or lack of
-foresight on somebody's part (perhaps mine). This module attempts to
-bring together the functionality I need in a way that hides at least
-some of the nasty details. This involves a lot of format conversion.
-
-$Id$
-
-
-Copyright (C) 2009 Internet Systems Consortium ("ISC")
-
-Permission to use, copy, modify, and distribute this software for any
-purpose with or without fee is hereby granted, provided that the above
-copyright notice and this permission notice appear in all copies.
-
-THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-PERFORMANCE OF THIS SOFTWARE.
-
-
-Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
-
-Permission to use, copy, modify, and distribute this software for any
-purpose with or without fee is hereby granted, provided that the above
-copyright notice and this permission notice appear in all copies.
-
-THE SOFTWARE IS PROVIDED "AS IS" AND ARIN DISCLAIMS ALL WARRANTIES WITH
-REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS. IN NO EVENT SHALL ARIN BE LIABLE FOR ANY SPECIAL, DIRECT,
-INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-PERFORMANCE OF THIS SOFTWARE.
-"""
-
-import POW, tlslite.api, POW.pkix, base64, lxml.etree, os
-import rpki.exceptions, rpki.resource_set, rpki.oids, rpki.sundial
-import rpki.manifest, rpki.roa, rpki.log
-
-def calculate_SKI(public_key_der):
- """Calculate the SKI value given the DER representation of a public
- key, which requires first peeling the ASN.1 wrapper off the key.
- """
- k = POW.pkix.SubjectPublicKeyInfo()
- k.fromString(public_key_der)
- d = POW.Digest(POW.SHA1_DIGEST)
- d.update(k.subjectPublicKey.get())
- return d.digest()
-
-class PEM_converter(object):
- """Convert between DER and PEM encodings for various kinds of ASN.1 data."""
-
- def __init__(self, kind): # "CERTIFICATE", "RSA PRIVATE KEY", ...
- """Initialize PEM_converter."""
- self.b = "-----BEGIN %s-----" % kind
- self.e = "-----END %s-----" % kind
-
- def looks_like_PEM(self, text):
- """Guess whether text looks like a PEM encoding."""
- b = text.find(self.b)
- return b >= 0 and text.find(self.e) > b + len(self.b)
-
- def to_DER(self, pem):
- """Convert from PEM to DER."""
- lines = [line.strip() for line in pem.splitlines(0)]
- while lines and lines.pop(0) != self.b:
- pass
- while lines and lines.pop(-1) != self.e:
- pass
- if not lines:
- raise rpki.exceptions.EmptyPEM, "Could not find PEM in:\n%s" % pem
- return base64.b64decode("".join(lines))
-
- def to_PEM(self, der):
- """Convert from DER to PEM."""
- b64 = base64.b64encode(der)
- pem = self.b + "\n"
- while len(b64) > 64:
- pem += b64[0:64] + "\n"
- b64 = b64[64:]
- return pem + b64 + "\n" + self.e + "\n"
-
-class DER_object(object):
- """Virtual class to hold a generic DER object."""
-
- ## Formats supported in this object
- formats = ("DER",)
-
- ## PEM converter for this object
- pem_converter = None
-
- ## Other attributes that self.clear() should whack
- other_clear = ()
-
- ## @var DER
- ## DER value of this object
-
- def empty(self):
- """Test whether this object is empty."""
- for a in self.formats:
- if getattr(self, a, None) is not None:
- return False
- return True
-
- def clear(self):
- """Make this object empty."""
- for a in self.formats + self.other_clear:
- setattr(self, a, None)
-
- def __init__(self, **kw):
- """Initialize a DER_object."""
- self.clear()
- if len(kw):
- self.set(**kw)
-
- def set(self, **kw):
- """Set this object by setting one of its known formats.
-
- This method only allows one to set one format at a time.
- Subsequent calls will clear the object first. The point of all
- this is to let the object's internal converters handle mustering
- the object into whatever format you need at the moment.
- """
- if len(kw) == 1:
- name = kw.keys()[0]
- if name in self.formats:
- self.clear()
- setattr(self, name, kw[name])
- return
- if name == "PEM":
- self.clear()
- self.DER = self.pem_converter.to_DER(kw[name])
- return
- if name == "Base64":
- self.clear()
- self.DER = base64.b64decode(kw[name])
- return
- if name in ("PEM_file", "DER_file", "Auto_file"):
- f = open(kw[name], "rb")
- value = f.read()
- f.close()
- if name == "PEM_file" or (name == "Auto_file" and self.pem_converter.looks_like_PEM(value)):
- value = self.pem_converter.to_DER(value)
- self.clear()
- self.DER = value
- return
- raise rpki.exceptions.DERObjectConversionError, "Can't honor conversion request %s" % repr(kw)
-
- def get_DER(self):
- """Get the DER value of this object.
-
- Subclasses will almost certainly override this method.
- """
- assert not self.empty()
- if self.DER:
- return self.DER
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_Base64(self):
- """Get the Base64 encoding of the DER value of this object."""
- return base64.b64encode(self.get_DER())
-
- def get_PEM(self):
- """Get the PEM representation of this object."""
- return self.pem_converter.to_PEM(self.get_DER())
-
- def __cmp__(self, other):
- """Compare two DER-encoded objects."""
- return cmp(self.get_DER(), other.get_DER())
-
- def hSKI(self):
- """Return hexadecimal string representation of SKI for this
- object. Only work for subclasses that implement get_SKI().
- """
- ski = self.get_SKI()
- return ":".join(("%02X" % ord(i) for i in ski)) if ski else ""
-
- def gSKI(self):
- """Calculate g(SKI) for this object. Only work for subclasses
- that implement get_SKI().
- """
- return base64.urlsafe_b64encode(self.get_SKI()).rstrip("=")
-
- def hAKI(self):
- """Return hexadecimal string representation of AKI for this
- object. Only work for subclasses that implement get_AKI().
- """
- aki = self.get_AKI()
- return ":".join(("%02X" % ord(i) for i in aki)) if aki else ""
-
- def gAKI(self):
- """Calculate g(AKI) for this object. Only work for subclasses
- that implement get_AKI().
- """
- return base64.urlsafe_b64encode(self.get_AKI()).rstrip("=")
-
- def get_AKI(self):
- """Get the AKI extension from this object. Only works for subclasses that support getExtension()."""
- aki = (self.get_POWpkix().getExtension(rpki.oids.name2oid["authorityKeyIdentifier"]) or ((), 0, None))[2]
- return aki[0] if isinstance(aki, tuple) else aki
-
- def get_SKI(self):
- """Get the SKI extension from this object. Only works for subclasses that support getExtension()."""
- return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectKeyIdentifier"]) or ((), 0, None))[2]
-
- def get_SIA(self):
- """Get the SIA extension from this object. Only works for subclasses that support getExtension()."""
- return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectInfoAccess"]) or ((), 0, None))[2]
-
- def get_AIA(self):
- """Get the SIA extension from this object. Only works for subclasses that support getExtension()."""
- return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectInfoAccess"]) or ((), 0, None))[2]
-
- def get_basicConstraints(self):
- """Get the basicConstraints extension from this object. Only works for subclasses that support getExtension()."""
- return (self.get_POWpkix().getExtension(rpki.oids.name2oid["basicConstraints"]) or ((), 0, None))[2]
-
- def is_CA(self):
- """Return True if and only if object has the basicConstraints extension and its cA value is true."""
- basicConstraints = self.get_basicConstraints()
- return basicConstraints and basicConstraints[0] != 0
-
- def get_3779resources(self):
- """Get RFC 3779 resources as rpki.resource_set objects.
- Only works for subclasses that support getExtensions().
- """
- resources = rpki.resource_set.resource_bag.from_rfc3779_tuples(self.get_POWpkix().getExtensions())
- try:
- resources.valid_until = self.getNotAfter()
- except AttributeError:
- pass
- return resources
-
- @classmethod
- def from_sql(cls, x):
- """Convert from SQL storage format."""
- return cls(DER = x)
-
- def to_sql(self):
- """Convert to SQL storage format."""
- return self.get_DER()
-
- def dumpasn1(self):
- """Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
- Use a temporary file rather than popen4() because dumpasn1 uses
- seek() when decoding ASN.1 content nested in OCTET STRING values.
- """
-
- ret = None
- fn = "dumpasn1.tmp"
- try:
- f = open(fn, "wb")
- f.write(self.get_DER())
- f.close()
- f = os.popen("dumpasn1 2>&1 -a " + fn)
- ret = "\n".join(x for x in f.read().splitlines() if x.startswith(" "))
- f.close()
- finally:
- os.unlink(fn)
- return ret
-
-class X509(DER_object):
- """X.509 certificates.
-
- This class is designed to hold all the different representations of
- X.509 certs we're using and convert between them. X.509 support in
- Python a nasty maze of half-cooked stuff (except perhaps for
- cryptlib, which is just different). Users of this module should not
- have to care about this implementation nightmare.
- """
-
- formats = ("DER", "POW", "POWpkix", "tlslite")
- pem_converter = PEM_converter("CERTIFICATE")
-
- def get_DER(self):
- """Get the DER value of this certificate."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite()
- return self.get_DER()
- if self.POWpkix:
- self.DER = self.POWpkix.toString()
- return self.get_DER()
- if self.tlslite:
- der = self.tlslite.writeBytes()
- if not isinstance(der, str): # Apparently sometimes tlslite strings aren't strings,
- der = der.tostring() # then again somtimes they are. Isn't that special?
- self.DER = der
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POW(self):
- """Get the POW value of this certificate."""
- assert not self.empty()
- if not self.POW:
- self.POW = POW.derRead(POW.X509_CERTIFICATE, self.get_DER())
- return self.POW
-
- def get_POWpkix(self):
- """Get the POW.pkix value of this certificate."""
- assert not self.empty()
- if not self.POWpkix:
- cert = POW.pkix.Certificate()
- cert.fromString(self.get_DER())
- self.POWpkix = cert
- return self.POWpkix
-
- def get_tlslite(self):
- """Get the tlslite value of this certificate."""
- assert not self.empty()
- if not self.tlslite:
- cert = tlslite.api.X509()
- cert.parseBinary(self.get_DER())
- self.tlslite = cert
- return self.tlslite
-
- def getIssuer(self):
- """Get the issuer of this certificate."""
- return self.get_POW().getIssuer()
-
- def getSubject(self):
- """Get the subject of this certificate."""
- return self.get_POW().getSubject()
-
- def getNotBefore(self):
- """Get the inception time of this certificate."""
- return rpki.sundial.datetime.fromASN1tuple(self.get_POWpkix().tbs.validity.notBefore.get())
-
- def getNotAfter(self):
- """Get the expiration time of this certificate."""
- return rpki.sundial.datetime.fromASN1tuple(self.get_POWpkix().tbs.validity.notAfter.get())
-
- def getSerial(self):
- """Get the serial number of this certificate."""
- return self.get_POW().getSerial()
-
- def getPublicKey(self):
- """Extract the public key from this certificate."""
- return RSApublic(DER = self.get_POWpkix().tbs.subjectPublicKeyInfo.toString())
-
- def expired(self):
- """Test whether this certificate has expired."""
- return self.getNotAfter() <= rpki.sundial.now()
-
- def issue(self, keypair, subject_key, serial, sia, aia, crldp, notAfter,
- cn = None, resources = None, is_ca = True):
- """Issue a certificate."""
-
- now = rpki.sundial.now()
- aki = self.get_SKI()
- ski = subject_key.get_SKI()
-
- if cn is None:
- cn = "".join(("%02X" % ord(i) for i in ski))
-
- # if notAfter is None: notAfter = now + rpki.sundial.timedelta(days = 30)
-
- cert = POW.pkix.Certificate()
- cert.setVersion(2)
- cert.setSerial(serial)
- cert.setIssuer(self.get_POWpkix().getSubject())
- cert.setSubject((((rpki.oids.name2oid["commonName"], ("printableString", cn)),),))
- cert.setNotBefore(now.toASN1tuple())
- cert.setNotAfter(notAfter.toASN1tuple())
- cert.tbs.subjectPublicKeyInfo.fromString(subject_key.get_DER())
-
- exts = [ ["subjectKeyIdentifier", False, ski],
- ["authorityKeyIdentifier", False, (aki, (), None)],
- ["cRLDistributionPoints", False, ((("fullName", (("uri", crldp),)), None, ()),)],
- ["authorityInfoAccess", False, ((rpki.oids.name2oid["id-ad-caIssuers"], ("uri", aia)),)],
- ["certificatePolicies", True, ((rpki.oids.name2oid["id-cp-ipAddr-asNumber"], ()),)] ]
-
- if is_ca:
- exts.append(["basicConstraints", True, (1, None)])
- exts.append(["keyUsage", True, (0, 0, 0, 0, 0, 1, 1)])
- else:
- exts.append(["keyUsage", True, (1,)])
-
- if sia is not None:
- exts.append(["subjectInfoAccess", False, sia])
- else:
- assert not is_ca
-
- if resources is not None and resources.asn:
- exts.append(["sbgp-autonomousSysNum", True, (resources.asn.to_rfc3779_tuple(), None)])
-
- if resources is not None and (resources.v4 or resources.v6):
- exts.append(["sbgp-ipAddrBlock", True, [x for x in (resources.v4.to_rfc3779_tuple(), resources.v6.to_rfc3779_tuple()) if x is not None]])
-
- for x in exts:
- x[0] = rpki.oids.name2oid[x[0]]
- cert.setExtensions(exts)
-
- cert.sign(keypair.get_POW(), POW.SHA256_DIGEST)
-
- return X509(POWpkix = cert)
-
- @classmethod
- def normalize_chain(cls, chain):
- """Normalize a chain of certificates into a tuple of X509 objects.
- Given all the glue certificates needed for BPKI cross
- certification, it's easiest to allow sloppy arguments to the HTTPS
- and CMS validation methods and provide a single method that
- normalizes the allowed cases. So this method allows X509, None,
- lists, and tuples, and returns a tuple of X509 objects.
- """
- if isinstance(chain, cls):
- chain = (chain,)
- return tuple(x for x in chain if x is not None)
-
-class PKCS10(DER_object):
- """Class to hold a PKCS #10 request."""
-
- formats = ("DER", "POWpkix")
- pem_converter = PEM_converter("CERTIFICATE REQUEST")
-
- def get_DER(self):
- """Get the DER value of this certification request."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POWpkix:
- self.DER = self.POWpkix.toString()
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POWpkix(self):
- """Get the POW.pkix value of this certification request."""
- assert not self.empty()
- if not self.POWpkix:
- req = POW.pkix.CertificationRequest()
- req.fromString(self.get_DER())
- self.POWpkix = req
- return self.POWpkix
-
- def getPublicKey(self):
- """Extract the public key from this certification request."""
- return RSApublic(DER = self.get_POWpkix().certificationRequestInfo.subjectPublicKeyInfo.toString())
-
- def check_valid_rpki(self):
- """Check this certification request to see whether it's a valid
- request for an RPKI certificate. This is broken out of the
- up-down protocol code because it's somewhat involved and the
- up-down code doesn't need to know the details.
-
- Throws an exception if the request isn't valid, so if this method
- returns at all, the request is ok.
- """
-
- if not self.get_POWpkix().verify():
- raise rpki.exceptions.BadPKCS10, "Signature check failed"
-
- if self.get_POWpkix().certificationRequestInfo.version.get() != 0:
- raise rpki.exceptions.BadPKCS10, \
- "Bad version number %s" % self.get_POWpkix().certificationRequestInfo.version
-
- if rpki.oids.oid2name.get(self.get_POWpkix().signatureAlgorithm.algorithm.get()) \
- not in ("sha256WithRSAEncryption", "sha384WithRSAEncryption", "sha512WithRSAEncryption"):
- raise rpki.exceptions.BadPKCS10, "Bad signature algorithm %s" % self.get_POWpkix().signatureAlgorithm
-
- exts = self.get_POWpkix().getExtensions()
- for oid, critical, value in exts:
- if rpki.oids.oid2name.get(oid) not in ("basicConstraints", "keyUsage", "subjectInfoAccess"):
- raise rpki.exceptions.BadExtension, "Forbidden extension %s" % oid
- req_exts = dict((rpki.oids.oid2name[oid], value) for (oid, critical, value) in exts)
-
- if "basicConstraints" not in req_exts or not req_exts["basicConstraints"][0]:
- raise rpki.exceptions.BadPKCS10, "request for EE cert not allowed here"
-
- if req_exts["basicConstraints"][1] is not None:
- raise rpki.exceptions.BadPKCS10, "basicConstraints must not specify Path Length"
-
- if "keyUsage" in req_exts and (not req_exts["keyUsage"][5] or not req_exts["keyUsage"][6]):
- raise rpki.exceptions.BadPKCS10, "keyUsage doesn't match basicConstraints"
-
- for method, location in req_exts.get("subjectInfoAccess", ()):
- if rpki.oids.oid2name.get(method) == "id-ad-caRepository" and \
- (location[0] != "uri" or (location[1].startswith("rsync://") and not location[1].endswith("/"))):
- raise rpki.exceptions.BadPKCS10, "Certificate request includes bad SIA component: %s" % repr(location)
-
- # This one is an implementation restriction. I don't yet
- # understand what the spec is telling me to do in this case.
- assert "subjectInfoAccess" in req_exts, "Can't (yet) handle PKCS #10 without an SIA extension"
-
- @classmethod
- def create_ca(cls, keypair, sia = None):
- """Create a new request for a given keypair, including given SIA value."""
- exts = [["basicConstraints", True, (1, None)],
- ["keyUsage", True, (0, 0, 0, 0, 0, 1, 1)]]
- if sia is not None:
- exts.append(["subjectInfoAccess", False, sia])
- for x in exts:
- x[0] = rpki.oids.name2oid[x[0]]
- return cls.create(keypair, exts)
-
- @classmethod
- def create(cls, keypair, exts = None):
- """Create a new request for a given keypair, including given extensions."""
- cn = "".join(("%02X" % ord(i) for i in keypair.get_SKI()))
- req = POW.pkix.CertificationRequest()
- req.certificationRequestInfo.version.set(0)
- req.certificationRequestInfo.subject.set((((rpki.oids.name2oid["commonName"],
- ("printableString", cn)),),))
- if exts is not None:
- req.setExtensions(exts)
- req.sign(keypair.get_POW(), POW.SHA256_DIGEST)
- return cls(POWpkix = req)
-
-class RSA(DER_object):
- """Class to hold an RSA key pair."""
-
- formats = ("DER", "POW", "tlslite")
- pem_converter = PEM_converter("RSA PRIVATE KEY")
-
- def get_DER(self):
- """Get the DER value of this keypair."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite(POW.RSA_PRIVATE_KEY)
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POW(self):
- """Get the POW value of this keypair."""
- assert not self.empty()
- if not self.POW:
- self.POW = POW.derRead(POW.RSA_PRIVATE_KEY, self.get_DER())
- return self.POW
-
- def get_tlslite(self):
- """Get the tlslite value of this keypair."""
- assert not self.empty()
- if not self.tlslite:
- self.tlslite = tlslite.api.parsePEMKey(self.get_PEM(), private=True)
- return self.tlslite
-
- @classmethod
- def generate(cls, keylength = 2048):
- """Generate a new keypair."""
- return cls(POW = POW.Asymmetric(POW.RSA_CIPHER, keylength))
-
- def get_public_DER(self):
- """Get the DER encoding of the public key from this keypair."""
- return self.get_POW().derWrite(POW.RSA_PUBLIC_KEY)
-
- def get_SKI(self):
- """Calculate the SKI of this keypair."""
- return calculate_SKI(self.get_public_DER())
-
- def get_RSApublic(self):
- """Convert the public key of this keypair into a RSApublic object."""
- return RSApublic(DER = self.get_public_DER())
-
-class RSApublic(DER_object):
- """Class to hold an RSA public key."""
-
- formats = ("DER", "POW")
- pem_converter = PEM_converter("RSA PUBLIC KEY")
-
- def get_DER(self):
- """Get the DER value of this public key."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite(POW.RSA_PUBLIC_KEY)
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POW(self):
- """Get the POW value of this public key."""
- assert not self.empty()
- if not self.POW:
- self.POW = POW.derRead(POW.RSA_PUBLIC_KEY, self.get_DER())
- return self.POW
-
- def get_SKI(self):
- """Calculate the SKI of this public key."""
- return calculate_SKI(self.get_DER())
-
-def POWify_OID(oid):
- """Utility function to convert tuple form of an OID to
- the dotted-decimal string form that POW uses.
- """
- if isinstance(oid, str):
- return POWify_OID(rpki.oids.name2oid[oid])
- else:
- return ".".join(str(i) for i in oid)
-
-class CMS_object(DER_object):
- """Class to hold a CMS-wrapped object.
-
- CMS-wrapped objects are a little different from the other DER_object
- types because the signed object is CMS wrapping inner content that's
- also ASN.1, and due to our current minimal support for CMS we can't
- just handle this as a pretty composite object. So, for now anyway,
- a CMS_object is the outer CMS wrapped object so that the usual DER
- and PEM operations do the obvious things, and the inner content is
- handle via separate methods.
- """
-
- formats = ("DER", "POW")
- other_clear = ("content",)
- econtent_oid = POWify_OID("id-data")
- pem_converter = PEM_converter("CMS")
-
- ## @var dump_on_verify_failure
- # Set this to True to get dumpasn1 dumps of ASN.1 on CMS verify failures.
-
- dump_on_verify_failure = True
-
- ## @var debug_cms_certs
- # Set this to True to log a lot of chatter about CMS certificates.
-
- debug_cms_certs = False
-
- ## @var require_crls
- # Set this to False to make CMS CRLs optional in the cases where we
- # would otherwise require them. Some day this option should go away
- # and CRLs should be uncondtionally mandatory in such cases.
-
- require_crls = False
-
- ## @var print_on_der_error
- # Set this to True to log alleged DER when we have trouble parsing
- # it, in case it's really a Perl backtrace or something.
-
- print_on_der_error = True
-
- def get_DER(self):
- """Get the DER value of this CMS_object."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite()
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POW(self):
- """Get the POW value of this CMS_object."""
- assert not self.empty()
- if not self.POW:
- self.POW = POW.derRead(POW.CMS_MESSAGE, self.get_DER())
- return self.POW
-
- def get_content(self):
- """Get the inner content of this CMS_object."""
- assert self.content is not None
- return self.content
-
- def set_content(self, content):
- """Set the (inner) content of this CMS_object, clearing the wrapper."""
- self.clear()
- self.content = content
-
- def verify(self, ta):
- """Verify CMS wrapper and store inner content."""
-
- try:
- cms = self.get_POW()
- except:
- if self.print_on_der_error:
- rpki.log.debug("Problem parsing DER CMS message, might not really be DER: %s"
- % repr(self.get_DER()))
- raise rpki.exceptions.UnparsableCMSDER
-
- if cms.eContentType() != self.econtent_oid:
- raise rpki.exceptions.WrongEContentType, "Got CMS eContentType %s, expected %s" % (cms.eContentType(), self.econtent_oid)
-
- certs = [X509(POW = x) for x in cms.certs()]
- crls = [CRL(POW = c) for c in cms.crls()]
-
- if self.debug_cms_certs:
- for x in certs:
- rpki.log.debug("Received CMS cert issuer %s subject %s" % (x.getIssuer(), x.getSubject()))
- for c in crls:
- rpki.log.debug("Received CMS CRL issuer %s" % repr(c.getIssuer()))
-
- store = POW.X509Store()
-
- trusted_ee = None
-
- for x in X509.normalize_chain(ta):
- if self.debug_cms_certs:
- rpki.log.debug("CMS trusted cert issuer %s subject %s" % (x.getIssuer(), x.getSubject()))
- if not x.is_CA():
- assert trusted_ee is None, "Can't have two EE certs in the same validation chain"
- trusted_ee = x
- store.addTrust(x.get_POW())
-
- if trusted_ee:
- if self.debug_cms_certs:
- rpki.log.debug("Trusted CMS EE cert issuer %s subject %s" % (trusted_ee.getIssuer(), trusted_ee.getSubject()))
- if certs and (len(certs) > 1 or certs[0] != trusted_ee):
- raise rpki.exceptions.UnexpectedCMSCerts, certs
- if crls:
- raise rpki.exceptions.UnexpectedCMSCRLs, crls
- else:
- if not certs:
- raise rpki.exceptions.MissingCMSEEcert, certs
- if len(certs) > 1 or certs[0].is_CA():
- raise rpki.exceptions.UnexpectedCMSCerts, certs
- if not crls:
- if self.require_crls:
- raise rpki.exceptions.MissingCMSCRL, crls
- else:
- rpki.log.warn("MISSING CMS CRL! Ignoring per self.require_crls setting")
- if len(crls) > 1:
- raise rpki.exceptions.UnexpectedCMSCRLs, crls
-
- try:
- content = cms.verify(store)
- except:
- if self.dump_on_verify_failure:
- if True:
- dbg = self.dumpasn1()
- else:
- dbg = cms.pprint()
- print "CMS verification failed, dumping ASN.1 (%d octets):\n%s" % (len(self.get_DER()), dbg)
- raise rpki.exceptions.CMSVerificationFailed, "CMS verification failed"
-
- self.decode(content)
- return self.get_content()
-
- def extract(self):
- """Extract and store inner content from CMS wrapper without
- verifying the CMS.
-
- DANGER WILL ROBINSON!!!
-
- Do not use this method on unvalidated data. Use the verify()
- method instead.
-
- If you don't understand this warning, don't use this method.
- """
-
- try:
- cms = self.get_POW()
- except:
- raise rpki.exceptions.UnparsableCMSDER
-
- if cms.eContentType() != self.econtent_oid:
- raise rpki.exceptions.WrongEContentType, "Got CMS eContentType %s, expected %s" % (cms.eContentType(), self.econtent_oid)
-
- content = cms.verify(POW.X509Store(), None, POW.CMS_NOCRL | POW.CMS_NO_SIGNER_CERT_VERIFY | POW.CMS_NO_ATTR_VERIFY | POW.CMS_NO_CONTENT_VERIFY)
-
- self.decode(content)
- return self.get_content()
-
- def sign(self, keypair, certs, crls = None, no_certs = False):
- """Sign and wrap inner content."""
-
- rpki.log.trace()
-
- if isinstance(certs, X509):
- cert = certs
- certs = ()
- else:
- cert = certs[0]
- certs = certs[1:]
-
- if crls is None:
- crls = ()
- elif isinstance(crls, CRL):
- crls = (crls,)
-
- cms = POW.CMS()
-
- cms.sign(cert.get_POW(),
- keypair.get_POW(),
- self.encode(),
- [x.get_POW() for x in certs],
- [c.get_POW() for c in crls],
- self.econtent_oid,
- POW.CMS_NOCERTS if no_certs else 0)
-
- self.POW = cms
-
-class DER_CMS_object(CMS_object):
- """Class to hold CMS objects with DER-based content."""
-
- def encode(self):
- """Encode inner content for signing."""
- return self.get_content().toString()
-
- def decode(self, der):
- """Decode DER and set inner content."""
- obj = self.content_class()
- obj.fromString(der)
- self.content = obj
-
-class SignedManifest(DER_CMS_object):
- """Class to hold a signed manifest."""
-
- pem_converter = PEM_converter("RPKI MANIFEST")
- content_class = rpki.manifest.Manifest
- econtent_oid = POWify_OID("id-ct-rpkiManifest")
-
- def getThisUpdate(self):
- """Get thisUpdate value from this manifest."""
- return rpki.sundial.datetime.fromGeneralizedTime(self.get_content().thisUpdate.get())
-
- def getNextUpdate(self):
- """Get nextUpdate value from this manifest."""
- return rpki.sundial.datetime.fromGeneralizedTime(self.get_content().nextUpdate.get())
-
- @classmethod
- def build(cls, serial, thisUpdate, nextUpdate, names_and_objs, keypair, certs, version = 0):
- """Build a signed manifest."""
- self = cls()
- filelist = []
- for name, obj in names_and_objs:
- d = POW.Digest(POW.SHA256_DIGEST)
- d.update(obj.get_DER())
- filelist.append((name.rpartition("/")[2], d.digest()))
- filelist.sort(key = lambda x: x[0])
- m = rpki.manifest.Manifest()
- m.version.set(version)
- m.manifestNumber.set(serial)
- m.thisUpdate.set(thisUpdate.toGeneralizedTime())
- m.nextUpdate.set(nextUpdate.toGeneralizedTime())
- m.fileHashAlg.set(rpki.oids.name2oid["id-sha256"])
- m.fileList.set(filelist)
- self.set_content(m)
- self.sign(keypair, certs)
- return self
-
-class ROA(DER_CMS_object):
- """Class to hold a signed ROA."""
-
- pem_converter = PEM_converter("ROUTE ORIGIN ATTESTATION")
- content_class = rpki.roa.RouteOriginAttestation
- econtent_oid = POWify_OID("id-ct-routeOriginAttestation")
-
- @classmethod
- def build(cls, as_number, ipv4, ipv6, keypair, certs, version = 0):
- """Build a ROA."""
- self = cls()
- r = rpki.roa.RouteOriginAttestation()
- r.version.set(version)
- r.asID.set(as_number)
- r.ipAddrBlocks.set((a.to_roa_tuple() for a in (ipv4, ipv6) if a))
- self.set_content(r)
- self.sign(keypair, certs)
- return self
-
-class XML_CMS_object(CMS_object):
- """Class to hold CMS-wrapped XML protocol data."""
-
- econtent_oid = POWify_OID("id-ct-xml")
-
- ## @var dump_outbound_cms
- # If set, we write all outbound XML-CMS PDUs to disk, for debugging.
- # Value of this variable is prefix portion of filename, tail will
- # be a timestamp.
-
- dump_outbound_cms = None
-
- ## @var dump_outbound_cms
- # If set, we write all inbound XML-CMS PDUs to disk, for debugging.
- # Value of this variable is prefix portion of filename, tail will
- # be a timestamp.
-
- dump_inbound_cms = None
-
- def encode(self):
- """Encode inner content for signing."""
- return lxml.etree.tostring(self.get_content(), pretty_print = True, encoding = self.encoding, xml_declaration = True)
-
- def decode(self, xml):
- """Decode XML and set inner content."""
- self.content = lxml.etree.fromstring(xml)
-
- def pretty_print_content(self):
- """Pretty print XML content of this message."""
- return lxml.etree.tostring(self.get_content(), pretty_print = True, encoding = self.encoding, xml_declaration = True)
-
- def schema_check(self):
- """Handle XML RelaxNG schema check."""
- try:
- self.schema.assertValid(self.get_content())
- except lxml.etree.DocumentInvalid:
- rpki.log.error("PDU failed schema check: " + self.pretty_print_content())
- raise
-
- def dump_to_disk(self, prefix):
- """Write DER of current message to disk, for debugging."""
- f = open(prefix + rpki.sundial.now().isoformat() + "Z.cms", "wb")
- f.write(self.get_DER())
- f.close()
-
- @classmethod
- def wrap(cls, msg, keypair, certs, crls = None, pretty_print = False):
- """Build a CMS-wrapped XML PDU and return its DER encoding."""
- rpki.log.trace()
- self = cls()
- self.set_content(msg.toXML())
- self.schema_check()
- self.sign(keypair, certs, crls)
- if self.dump_outbound_cms:
- self.dump_to_disk(self.dump_outbound_cms)
- if pretty_print:
- return self.get_DER(), self.pretty_print_content()
- else:
- return self.get_DER()
-
- @classmethod
- def unwrap(cls, der, ta, pretty_print = False):
- """Unwrap a CMS-wrapped XML PDU and return Python objects."""
- self = cls(DER = der)
- if self.dump_inbound_cms:
- self.dump_to_disk(self.dump_inbound_cms)
- self.verify(ta)
- self.schema_check()
- msg = self.saxify(self.get_content())
- if pretty_print:
- return msg, self.pretty_print_content()
- else:
- return msg
-
-class CRL(DER_object):
- """Class to hold a Certificate Revocation List."""
-
- formats = ("DER", "POW", "POWpkix")
- pem_converter = PEM_converter("X509 CRL")
-
- def get_DER(self):
- """Get the DER value of this CRL."""
- assert not self.empty()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite()
- return self.get_DER()
- if self.POWpkix:
- self.DER = self.POWpkix.toString()
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available"
-
- def get_POW(self):
- """Get the POW value of this CRL."""
- assert not self.empty()
- if not self.POW:
- self.POW = POW.derRead(POW.X509_CRL, self.get_DER())
- return self.POW
-
- def get_POWpkix(self):
- """Get the POW.pkix value of this CRL."""
- assert not self.empty()
- if not self.POWpkix:
- crl = POW.pkix.CertificateList()
- crl.fromString(self.get_DER())
- self.POWpkix = crl
- return self.POWpkix
-
- def getThisUpdate(self):
- """Get thisUpdate value from this CRL."""
- return rpki.sundial.datetime.fromASN1tuple(self.get_POWpkix().getThisUpdate())
-
- def getNextUpdate(self):
- """Get nextUpdate value from this CRL."""
- return rpki.sundial.datetime.fromASN1tuple(self.get_POWpkix().getNextUpdate())
-
- def getIssuer(self):
- """Get issuer value of this CRL."""
- return self.get_POW().getIssuer()
-
- @classmethod
- def generate(cls, keypair, issuer, serial, thisUpdate, nextUpdate, revokedCertificates, version = 1, digestType = "sha256WithRSAEncryption"):
- crl = POW.pkix.CertificateList()
- crl.setVersion(version)
- crl.setIssuer(issuer.get_POWpkix().getSubject())
- crl.setThisUpdate(thisUpdate.toASN1tuple())
- crl.setNextUpdate(nextUpdate.toASN1tuple())
- if revokedCertificates:
- crl.setRevokedCertificates(revokedCertificates)
- crl.setExtensions(
- ((rpki.oids.name2oid["authorityKeyIdentifier"], False, (issuer.get_SKI(), (), None)),
- (rpki.oids.name2oid["cRLNumber"], False, serial)))
- crl.sign(keypair.get_POW(), digestType)
- return cls(POWpkix = crl)