diff options
Diffstat (limited to 'rpkid/rpki/x509.py')
-rw-r--r-- | rpkid/rpki/x509.py | 278 |
1 files changed, 205 insertions, 73 deletions
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py index 9825e609..91ab48bd 100644 --- a/rpkid/rpki/x509.py +++ b/rpkid/rpki/x509.py @@ -1,4 +1,5 @@ -"""One X.509 implementation to rule them all... +""" +One X.509 implementation to rule them all... ...and in the darkness hide the twisty maze of partially overlapping X.509 support packages in Python. @@ -47,7 +48,8 @@ import rpki.exceptions, rpki.resource_set, rpki.oids, rpki.sundial import rpki.manifest, rpki.roa, rpki.log, rpki.async def calculate_SKI(public_key_der): - """Calculate the SKI value given the DER representation of a public + """ + Calculate the SKI value given the DER representation of a public key, which requires first peeling the ASN.1 wrapper off the key. """ k = POW.pkix.SubjectPublicKeyInfo() @@ -57,20 +59,28 @@ def calculate_SKI(public_key_der): return d.digest() class PEM_converter(object): - """Convert between DER and PEM encodings for various kinds of ASN.1 data.""" + """ + Convert between DER and PEM encodings for various kinds of ASN.1 data. + """ def __init__(self, kind): # "CERTIFICATE", "RSA PRIVATE KEY", ... - """Initialize PEM_converter.""" + """ + Initialize PEM_converter. + """ self.b = "-----BEGIN %s-----" % kind self.e = "-----END %s-----" % kind def looks_like_PEM(self, text): - """Guess whether text looks like a PEM encoding.""" + """ + Guess whether text looks like a PEM encoding. + """ b = text.find(self.b) return b >= 0 and text.find(self.e) > b + len(self.b) def to_DER(self, pem): - """Convert from PEM to DER.""" + """ + Convert from PEM to DER. + """ lines = [line.strip() for line in pem.splitlines(0)] while lines and lines.pop(0) != self.b: pass @@ -81,7 +91,9 @@ class PEM_converter(object): return base64.b64decode("".join(lines)) def to_PEM(self, der): - """Convert from DER to PEM.""" + """ + Convert from DER to PEM. + """ b64 = base64.b64encode(der) pem = self.b + "\n" while len(b64) > 64: @@ -90,7 +102,9 @@ class PEM_converter(object): return pem + b64 + "\n" + self.e + "\n" class DER_object(object): - """Virtual class to hold a generic DER object.""" + """ + Virtual class to hold a generic DER object. + """ ## Formats supported in this object formats = ("DER",) @@ -105,31 +119,39 @@ class DER_object(object): ## DER value of this object def empty(self): - """Test whether this object is empty.""" + """ + Test whether this object is empty. + """ for a in self.formats: if getattr(self, a, None) is not None: return False return True def clear(self): - """Make this object empty.""" + """ + Make this object empty. + """ for a in self.formats + self.other_clear: setattr(self, a, None) def __init__(self, **kw): - """Initialize a DER_object.""" + """ + Initialize a DER_object. + """ self.clear() if len(kw): self.set(**kw) def set(self, **kw): - """Set this object by setting one of its known formats. + """ + Set this object by setting one of its known formats. This method only allows one to set one format at a time. Subsequent calls will clear the object first. The point of all this is to let the object's internal converters handle mustering the object into whatever format you need at the moment. """ + if len(kw) == 1: name = kw.keys()[0] if name in self.formats: @@ -156,7 +178,8 @@ class DER_object(object): raise rpki.exceptions.DERObjectConversionError, "Can't honor conversion request %s" % repr(kw) def get_DER(self): - """Get the DER value of this object. + """ + Get the DER value of this object. Subclasses will almost certainly override this method. """ @@ -178,60 +201,83 @@ class DER_object(object): return cmp(self.get_DER(), other.get_DER()) def hSKI(self): - """Return hexadecimal string representation of SKI for this - object. Only work for subclasses that implement get_SKI(). + """ + Return hexadecimal string representation of SKI for this object. + Only work for subclasses that implement get_SKI(). """ ski = self.get_SKI() return ":".join(("%02X" % ord(i) for i in ski)) if ski else "" def gSKI(self): - """Calculate g(SKI) for this object. Only work for subclasses + """ + Calculate g(SKI) for this object. Only work for subclasses that implement get_SKI(). """ return base64.urlsafe_b64encode(self.get_SKI()).rstrip("=") def hAKI(self): - """Return hexadecimal string representation of AKI for this + """ + Return hexadecimal string representation of AKI for this object. Only work for subclasses that implement get_AKI(). """ aki = self.get_AKI() return ":".join(("%02X" % ord(i) for i in aki)) if aki else "" def gAKI(self): - """Calculate g(AKI) for this object. Only work for subclasses + """ + Calculate g(AKI) for this object. Only work for subclasses that implement get_AKI(). """ return base64.urlsafe_b64encode(self.get_AKI()).rstrip("=") def get_AKI(self): - """Get the AKI extension from this object. Only works for subclasses that support getExtension().""" + """ + Get the AKI extension from this object. Only works for subclasses + that support getExtension(). + """ aki = (self.get_POWpkix().getExtension(rpki.oids.name2oid["authorityKeyIdentifier"]) or ((), 0, None))[2] return aki[0] if isinstance(aki, tuple) else aki def get_SKI(self): - """Get the SKI extension from this object. Only works for subclasses that support getExtension().""" + """ + Get the SKI extension from this object. Only works for subclasses + that support getExtension(). + """ return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectKeyIdentifier"]) or ((), 0, None))[2] def get_SIA(self): - """Get the SIA extension from this object. Only works for subclasses that support getExtension().""" + """ + Get the SIA extension from this object. Only works for subclasses + that support getExtension(). + """ return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectInfoAccess"]) or ((), 0, None))[2] def get_AIA(self): - """Get the SIA extension from this object. Only works for subclasses that support getExtension().""" + """ + Get the SIA extension from this object. Only works for subclasses + that support getExtension(). + """ return (self.get_POWpkix().getExtension(rpki.oids.name2oid["subjectInfoAccess"]) or ((), 0, None))[2] def get_basicConstraints(self): - """Get the basicConstraints extension from this object. Only works for subclasses that support getExtension().""" + """ + Get the basicConstraints extension from this object. Only works + for subclasses that support getExtension(). + """ return (self.get_POWpkix().getExtension(rpki.oids.name2oid["basicConstraints"]) or ((), 0, None))[2] def is_CA(self): - """Return True if and only if object has the basicConstraints extension and its cA value is true.""" + """ + Return True if and only if object has the basicConstraints + extension and its cA value is true. + """ basicConstraints = self.get_basicConstraints() return basicConstraints and basicConstraints[0] != 0 def get_3779resources(self): - """Get RFC 3779 resources as rpki.resource_set objects. - Only works for subclasses that support getExtensions(). + """ + Get RFC 3779 resources as rpki.resource_set objects. Only works + for subclasses that support getExtensions(). """ resources = rpki.resource_set.resource_bag.from_rfc3779_tuples(self.get_POWpkix().getExtensions()) try: @@ -250,7 +296,8 @@ class DER_object(object): return self.get_DER() def dumpasn1(self): - """Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool. + """ + Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool. Use a temporary file rather than popen4() because dumpasn1 uses seek() when decoding ASN.1 content nested in OCTET STRING values. """ @@ -269,7 +316,8 @@ class DER_object(object): return ret class X509(DER_object): - """X.509 certificates. + """ + X.509 certificates. This class is designed to hold all the different representations of X.509 certs we're using and convert between them. X.509 support in @@ -282,7 +330,9 @@ class X509(DER_object): pem_converter = PEM_converter("CERTIFICATE") def get_DER(self): - """Get the DER value of this certificate.""" + """ + Get the DER value of this certificate. + """ assert not self.empty() if self.DER: return self.DER @@ -301,14 +351,18 @@ class X509(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POW(self): - """Get the POW value of this certificate.""" + """ + Get the POW value of this certificate. + """ assert not self.empty() if not self.POW: self.POW = POW.derRead(POW.X509_CERTIFICATE, self.get_DER()) return self.POW def get_POWpkix(self): - """Get the POW.pkix value of this certificate.""" + """ + Get the POW.pkix value of this certificate. + """ assert not self.empty() if not self.POWpkix: cert = POW.pkix.Certificate() @@ -317,7 +371,9 @@ class X509(DER_object): return self.POWpkix def get_tlslite(self): - """Get the tlslite value of this certificate.""" + """ + Get the tlslite value of this certificate. + """ assert not self.empty() if not self.tlslite: cert = tlslite.api.X509() @@ -355,7 +411,9 @@ class X509(DER_object): def issue(self, keypair, subject_key, serial, sia, aia, crldp, notAfter, cn = None, resources = None, is_ca = True): - """Issue a certificate.""" + """ + Issue a certificate. + """ now = rpki.sundial.now() aki = self.get_SKI() @@ -408,7 +466,8 @@ class X509(DER_object): @classmethod def normalize_chain(cls, chain): - """Normalize a chain of certificates into a tuple of X509 objects. + """ + Normalize a chain of certificates into a tuple of X509 objects. Given all the glue certificates needed for BPKI cross certification, it's easiest to allow sloppy arguments to the HTTPS and CMS validation methods and provide a single method that @@ -420,13 +479,17 @@ class X509(DER_object): return tuple(x for x in chain if x is not None) class PKCS10(DER_object): - """Class to hold a PKCS #10 request.""" + """ + Class to hold a PKCS #10 request. + """ formats = ("DER", "POWpkix") pem_converter = PEM_converter("CERTIFICATE REQUEST") def get_DER(self): - """Get the DER value of this certification request.""" + """ + Get the DER value of this certification request. + """ assert not self.empty() if self.DER: return self.DER @@ -436,7 +499,9 @@ class PKCS10(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POWpkix(self): - """Get the POW.pkix value of this certification request.""" + """ + Get the POW.pkix value of this certification request. + """ assert not self.empty() if not self.POWpkix: req = POW.pkix.CertificationRequest() @@ -449,7 +514,8 @@ class PKCS10(DER_object): return RSApublic(DER = self.get_POWpkix().certificationRequestInfo.subjectPublicKeyInfo.toString()) def check_valid_rpki(self): - """Check this certification request to see whether it's a valid + """ + Check this certification request to see whether it's a valid request for an RPKI certificate. This is broken out of the up-down protocol code because it's somewhat involved and the up-down code doesn't need to know the details. @@ -495,7 +561,9 @@ class PKCS10(DER_object): @classmethod def create_ca(cls, keypair, sia = None): - """Create a new request for a given keypair, including given SIA value.""" + """ + Create a new request for a given keypair, including given SIA value. + """ exts = [["basicConstraints", True, (1, None)], ["keyUsage", True, (0, 0, 0, 0, 0, 1, 1)]] if sia is not None: @@ -506,7 +574,9 @@ class PKCS10(DER_object): @classmethod def create(cls, keypair, exts = None): - """Create a new request for a given keypair, including given extensions.""" + """ + Create a new request for a given keypair, including given extensions. + """ cn = "".join(("%02X" % ord(i) for i in keypair.get_SKI())) req = POW.pkix.CertificationRequest() req.certificationRequestInfo.version.set(0) @@ -518,13 +588,17 @@ class PKCS10(DER_object): return cls(POWpkix = req) class RSA(DER_object): - """Class to hold an RSA key pair.""" + """ + Class to hold an RSA key pair. + """ formats = ("DER", "POW", "tlslite") pem_converter = PEM_converter("RSA PRIVATE KEY") def get_DER(self): - """Get the DER value of this keypair.""" + """ + Get the DER value of this keypair. + """ assert not self.empty() if self.DER: return self.DER @@ -534,14 +608,18 @@ class RSA(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POW(self): - """Get the POW value of this keypair.""" + """ + Get the POW value of this keypair. + """ assert not self.empty() if not self.POW: self.POW = POW.derRead(POW.RSA_PRIVATE_KEY, self.get_DER()) return self.POW def get_tlslite(self): - """Get the tlslite value of this keypair.""" + """ + Get the tlslite value of this keypair. + """ assert not self.empty() if not self.tlslite: self.tlslite = tlslite.api.parsePEMKey(self.get_PEM(), private=True) @@ -565,13 +643,17 @@ class RSA(DER_object): return RSApublic(DER = self.get_public_DER()) class RSApublic(DER_object): - """Class to hold an RSA public key.""" + """ + Class to hold an RSA public key. + """ formats = ("DER", "POW") pem_converter = PEM_converter("RSA PUBLIC KEY") def get_DER(self): - """Get the DER value of this public key.""" + """ + Get the DER value of this public key. + """ assert not self.empty() if self.DER: return self.DER @@ -581,7 +663,9 @@ class RSApublic(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POW(self): - """Get the POW value of this public key.""" + """ + Get the POW value of this public key. + """ assert not self.empty() if not self.POW: self.POW = POW.derRead(POW.RSA_PUBLIC_KEY, self.get_DER()) @@ -592,8 +676,9 @@ class RSApublic(DER_object): return calculate_SKI(self.get_DER()) def POWify_OID(oid): - """Utility function to convert tuple form of an OID to - the dotted-decimal string form that POW uses. + """ + Utility function to convert tuple form of an OID to the + dotted-decimal string form that POW uses. """ if isinstance(oid, str): return POWify_OID(rpki.oids.name2oid[oid]) @@ -601,7 +686,8 @@ def POWify_OID(oid): return ".".join(str(i) for i in oid) class CMS_object(DER_object): - """Class to hold a CMS-wrapped object. + """ + Class to hold a CMS-wrapped object. CMS-wrapped objects are a little different from the other DER_object types because the signed object is CMS wrapping inner content that's @@ -641,7 +727,9 @@ class CMS_object(DER_object): print_on_der_error = True def get_DER(self): - """Get the DER value of this CMS_object.""" + """ + Get the DER value of this CMS_object. + """ assert not self.empty() if self.DER: return self.DER @@ -651,24 +739,32 @@ class CMS_object(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POW(self): - """Get the POW value of this CMS_object.""" + """ + Get the POW value of this CMS_object. + """ assert not self.empty() if not self.POW: self.POW = POW.derRead(POW.CMS_MESSAGE, self.get_DER()) return self.POW def get_content(self): - """Get the inner content of this CMS_object.""" + """ + Get the inner content of this CMS_object. + """ assert self.content is not None return self.content def set_content(self, content): - """Set the (inner) content of this CMS_object, clearing the wrapper.""" + """ + Set the (inner) content of this CMS_object, clearing the wrapper. + """ self.clear() self.content = content def verify(self, ta): - """Verify CMS wrapper and store inner content.""" + """ + Verify CMS wrapper and store inner content. + """ try: cms = self.get_POW() @@ -741,8 +837,9 @@ class CMS_object(DER_object): return self.get_content() def extract(self): - """Extract and store inner content from CMS wrapper without - verifying the CMS. + """ + Extract and store inner content from CMS wrapper without verifying + the CMS. DANGER WILL ROBINSON!!! @@ -768,7 +865,9 @@ class CMS_object(DER_object): return self.get_content() def sign(self, keypair, certs, crls = None, no_certs = False): - """Sign and wrap inner content.""" + """ + Sign and wrap inner content. + """ rpki.log.trace() @@ -797,20 +896,26 @@ class CMS_object(DER_object): self.POW = cms class DER_CMS_object(CMS_object): - """Class to hold CMS objects with DER-based content.""" + """ + Class to hold CMS objects with DER-based content. + """ def encode(self): """Encode inner content for signing.""" return self.get_content().toString() def decode(self, der): - """Decode DER and set inner content.""" + """ + Decode DER and set inner content. + """ obj = self.content_class() obj.fromString(der) self.content = obj class SignedManifest(DER_CMS_object): - """Class to hold a signed manifest.""" + """ + Class to hold a signed manifest. + """ pem_converter = PEM_converter("RPKI MANIFEST") content_class = rpki.manifest.Manifest @@ -826,7 +931,9 @@ class SignedManifest(DER_CMS_object): @classmethod def build(cls, serial, thisUpdate, nextUpdate, names_and_objs, keypair, certs, version = 0): - """Build a signed manifest.""" + """ + Build a signed manifest. + """ self = cls() filelist = [] for name, obj in names_and_objs: @@ -846,7 +953,9 @@ class SignedManifest(DER_CMS_object): return self class ROA(DER_CMS_object): - """Class to hold a signed ROA.""" + """ + Class to hold a signed ROA. + """ pem_converter = PEM_converter("ROUTE ORIGIN ATTESTATION") content_class = rpki.roa.RouteOriginAttestation @@ -854,7 +963,9 @@ class ROA(DER_CMS_object): @classmethod def build(cls, as_number, ipv4, ipv6, keypair, certs, version = 0): - """Build a ROA.""" + """ + Build a ROA. + """ self = cls() r = rpki.roa.RouteOriginAttestation() r.version.set(version) @@ -865,7 +976,9 @@ class ROA(DER_CMS_object): return self class XML_CMS_object(CMS_object): - """Class to hold CMS-wrapped XML protocol data.""" + """ + Class to hold CMS-wrapped XML protocol data. + """ econtent_oid = POWify_OID("id-ct-xml") @@ -896,7 +1009,9 @@ class XML_CMS_object(CMS_object): return lxml.etree.tostring(self.get_content(), pretty_print = True, encoding = self.encoding, xml_declaration = True) def schema_check(self): - """Handle XML RelaxNG schema check.""" + """ + Handle XML RelaxNG schema check. + """ try: self.schema.assertValid(self.get_content()) except lxml.etree.DocumentInvalid: @@ -904,14 +1019,18 @@ class XML_CMS_object(CMS_object): raise def dump_to_disk(self, prefix): - """Write DER of current message to disk, for debugging.""" + """ + Write DER of current message to disk, for debugging. + """ f = open(prefix + rpki.sundial.now().isoformat() + "Z.cms", "wb") f.write(self.get_DER()) f.close() @classmethod def wrap(cls, msg, keypair, certs, crls = None, pretty_print = False): - """Build a CMS-wrapped XML PDU and return its DER encoding.""" + """ + Build a CMS-wrapped XML PDU and return its DER encoding. + """ rpki.log.trace() self = cls() self.set_content(msg.toXML()) @@ -926,7 +1045,9 @@ class XML_CMS_object(CMS_object): @classmethod def unwrap(cls, der, ta, pretty_print = False): - """Unwrap a CMS-wrapped XML PDU and return Python objects.""" + """ + Unwrap a CMS-wrapped XML PDU and return Python objects. + """ self = cls(DER = der) if self.dump_inbound_cms: self.dump_to_disk(self.dump_inbound_cms) @@ -939,13 +1060,17 @@ class XML_CMS_object(CMS_object): return msg class CRL(DER_object): - """Class to hold a Certificate Revocation List.""" + """ + Class to hold a Certificate Revocation List. + """ formats = ("DER", "POW", "POWpkix") pem_converter = PEM_converter("X509 CRL") def get_DER(self): - """Get the DER value of this CRL.""" + """ + Get the DER value of this CRL. + """ assert not self.empty() if self.DER: return self.DER @@ -958,14 +1083,18 @@ class CRL(DER_object): raise rpki.exceptions.DERObjectConversionError, "No conversion path to DER available" def get_POW(self): - """Get the POW value of this CRL.""" + """ + Get the POW value of this CRL. + """ assert not self.empty() if not self.POW: self.POW = POW.derRead(POW.X509_CRL, self.get_DER()) return self.POW def get_POWpkix(self): - """Get the POW.pkix value of this CRL.""" + """ + Get the POW.pkix value of this CRL. + """ assert not self.empty() if not self.POWpkix: crl = POW.pkix.CertificateList() @@ -987,6 +1116,9 @@ class CRL(DER_object): @classmethod def generate(cls, keypair, issuer, serial, thisUpdate, nextUpdate, revokedCertificates, version = 1, digestType = "sha256WithRSAEncryption"): + """ + Generate a new CRL. + """ crl = POW.pkix.CertificateList() crl.setVersion(version) crl.setIssuer(issuer.get_POWpkix().getSubject()) |