aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/x509.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/x509.py')
-rw-r--r--rpkid/rpki/x509.py419
1 files changed, 287 insertions, 132 deletions
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py
index 5475a452..fb1a5a2b 100644
--- a/rpkid/rpki/x509.py
+++ b/rpkid/rpki/x509.py
@@ -78,20 +78,6 @@ def first_rsync_uri(xia):
return uri
return None
-def _find_xia_uri(extension, name):
- """
- Find a rsync URI in an SIA or AIA extension.
- Returns the URI if found, otherwise None.
- """
- oid = rpki.oids.name2oid[name]
-
- # extension may be None if the AIA is not present
- if extension:
- for method, location in extension:
- if method == oid and location[0] == "uri" and location[1].startswith("rsync://"):
- return location[1]
- return None
-
class X501DN(object):
"""
Class to hold an X.501 Distinguished Name.
@@ -126,7 +112,7 @@ class X501DN(object):
"""
def __str__(self):
- return "".join("/" + "+".join("%s=%s" % (rpki.oids.safe_dotted2name(a[0]), a[1])
+ return "".join("/" + "+".join("%s=%s" % (rpki.oids.oid2name(a[0]), a[1])
for a in rdn)
for rdn in self.dn)
@@ -145,10 +131,18 @@ class X501DN(object):
rpki.log.debug("++ %r %r" % (self, self.dn))
@classmethod
- def from_cn(cls, s):
- assert isinstance(s, (str, unicode))
+ def from_cn(cls, cn, sn = None):
+ assert isinstance(cn, (str, unicode))
+ if isinstance(sn, (int, long)):
+ sn = "%08X" % sn
+ elif isinstance(sn, (str, unicode)):
+ assert all(c in "0123456789abcdefABCDEF" for c in sn)
+ sn = str(sn)
self = cls()
- self.dn = (((rpki.oids.safe_name2dotted("commonName"), s),),)
+ if sn is not None:
+ self.dn = (((rpki.oids.commonName, cn),), ((rpki.oids.serialNumber, sn),))
+ else:
+ self.dn = (((rpki.oids.commonName, cn),),)
return self
@classmethod
@@ -161,6 +155,28 @@ class X501DN(object):
def get_POW(self):
return self.dn
+ def extract_cn_and_sn(self):
+ cn = None
+ sn = None
+
+ for rdn in self.dn:
+ if len(rdn) == 1 and len(rdn[0]) == 2:
+ oid = rdn[0][0]
+ val = rdn[0][1]
+ if oid == rpki.oids.commonName and cn is None:
+ cn = val
+ continue
+ if oid == rpki.oids.serialNumber and sn is None:
+ sn = val
+ continue
+ raise rpki.exceptions.BadX510DN("Bad subject name: %s" % (self.dn,))
+
+ if cn is None:
+ raise rpki.exceptions.BadX510DN("Subject name is missing CN: %s" % (self.dn,))
+
+ return cn, sn
+
+
class DER_object(object):
"""
Virtual class to hold a generic DER object.
@@ -300,14 +316,26 @@ class DER_object(object):
def get_DER(self):
"""
Get the DER value of this object.
-
- Subclasses will almost certainly override this method.
+ Subclasses may need to override this method.
"""
self.check()
if self.DER:
return self.DER
+ if self.POW:
+ self.DER = self.POW.derWrite()
+ return self.get_DER()
raise rpki.exceptions.DERObjectConversionError("No conversion path to DER available")
+ def get_POW(self):
+ """
+ Get the rpki.POW value of this object.
+ Subclasses may need to override this method.
+ """
+ self.check()
+ if not self.POW: # pylint: disable=E0203
+ self.POW = self.POW_class.derRead(self.get_DER())
+ return self.POW
+
def get_Base64(self):
"""
Get the Base64 encoding of the DER value of this object.
@@ -367,18 +395,22 @@ class DER_object(object):
def get_AKI(self):
"""
- Get the AKI extension from this object. Only works for subclasses
- that support getExtension().
+ Get the AKI extension from this object, if supported.
"""
return self.get_POW().getAKI()
def get_SKI(self):
"""
- Get the SKI extension from this object. Only works for subclasses
- that support getExtension().
+ Get the SKI extension from this object, if supported.
"""
return self.get_POW().getSKI()
+ def get_EKU(self):
+ """
+ Get the Extended Key Usage extension from this object, if supported.
+ """
+ return self.get_POW().getEKU()
+
def get_SIA(self):
"""
Get the SIA extension from this object. Only works for subclasses
@@ -524,27 +556,6 @@ class X509(DER_object):
POW_class = rpki.POW.X509
- def get_DER(self):
- """
- Get the DER value of this certificate.
- """
- self.check()
- if self.DER:
- return self.DER
- if self.POW:
- self.DER = self.POW.derWrite()
- return self.get_DER()
- raise rpki.exceptions.DERObjectConversionError("No conversion path to DER available")
-
- def get_POW(self):
- """
- Get the rpki.POW value of this certificate.
- """
- self.check()
- if not self.POW: # pylint: disable=E0203
- self.POW = rpki.POW.X509.derRead(self.get_DER())
- return self.POW
-
def getIssuer(self):
"""
Get the issuer of this certificate.
@@ -579,7 +590,7 @@ class X509(DER_object):
"""
Extract the public key from this certificate.
"""
- return RSApublic(POW = self.get_POW().getPublicKey())
+ return PublicKey(POW = self.get_POW().getPublicKey())
def get_SKI(self):
"""
@@ -594,13 +605,16 @@ class X509(DER_object):
return self.getNotAfter() <= rpki.sundial.now()
def issue(self, keypair, subject_key, serial, sia, aia, crldp, notAfter,
- cn = None, resources = None, is_ca = True, notBefore = None):
+ cn = None, resources = None, is_ca = True, notBefore = None,
+ sn = None, eku = None):
"""
Issue an RPKI certificate.
"""
assert aia is not None and crldp is not None
+ assert eku is None or not is_ca
+
return self._issue(
keypair = keypair,
subject_key = subject_key,
@@ -611,15 +625,18 @@ class X509(DER_object):
notBefore = notBefore,
notAfter = notAfter,
cn = cn,
+ sn = sn,
resources = resources,
is_ca = is_ca,
aki = self.get_SKI(),
- issuer_name = self.getSubject())
+ issuer_name = self.getSubject(),
+ eku = eku)
@classmethod
def self_certify(cls, keypair, subject_key, serial, sia, notAfter,
- cn = None, resources = None, notBefore = None):
+ cn = None, resources = None, notBefore = None,
+ sn = None):
"""
Generate a self-certified RPKI certificate.
"""
@@ -639,15 +656,17 @@ class X509(DER_object):
notBefore = notBefore,
notAfter = notAfter,
cn = cn,
+ sn = sn,
resources = resources,
is_ca = True,
aki = ski,
- issuer_name = X501DN.from_cn(cn))
+ issuer_name = X501DN.from_cn(cn, sn),
+ eku = None)
@classmethod
def _issue(cls, keypair, subject_key, serial, sia, aia, crldp, notAfter,
- cn, resources, is_ca, aki, issuer_name, notBefore):
+ cn, sn, resources, is_ca, aki, issuer_name, notBefore, eku):
"""
Common code to issue an RPKI certificate.
"""
@@ -673,13 +692,13 @@ class X509(DER_object):
cert.setVersion(2)
cert.setSerial(serial)
cert.setIssuer(issuer_name.get_POW())
- cert.setSubject(X501DN.from_cn(cn).get_POW())
+ cert.setSubject(X501DN.from_cn(cn, sn).get_POW())
cert.setNotBefore(notBefore)
cert.setNotAfter(notAfter)
cert.setPublicKey(subject_key.get_POW())
cert.setSKI(ski)
cert.setAKI(aki)
- cert.setCertificatePolicies((POWify_OID("id-cp-ipAddr-asNumber"),))
+ cert.setCertificatePolicies((rpki.oids.id_cp_ipAddr_asNumber,))
if crldp is not None:
cert.setCRLDP((crldp,))
@@ -712,6 +731,10 @@ class X509(DER_object):
ipv6 = ("inherit" if resources.v6.inherit else
((r.min, r.max) for r in resources.v6)))
+ if eku is not None:
+ assert not is_ca
+ cert.setEKU(eku)
+
cert.sign(keypair.get_POW(), rpki.POW.SHA256_DIGEST)
return cls(POW = cert)
@@ -741,7 +764,7 @@ class X509(DER_object):
keypair = keypair,
issuer_name = subject_name,
subject_name = subject_name,
- subject_key = keypair.get_RSApublic(),
+ subject_key = keypair.get_public(),
serial = serial,
now = now,
notAfter = notAfter,
@@ -753,7 +776,7 @@ class X509(DER_object):
"""
Issue a normal BPKI certificate.
"""
- assert keypair.get_RSApublic() == self.getPublicKey()
+ assert keypair.get_public() == self.getPublicKey()
return self._bpki_certify(
keypair = keypair,
issuer_name = self.getSubject(),
@@ -777,7 +800,7 @@ class X509(DER_object):
if now is None:
now = rpki.sundial.now()
- issuer_key = keypair.get_RSApublic()
+ issuer_key = keypair.get_public()
assert (issuer_key == subject_key) == (issuer_name == subject_name)
assert is_ca or issuer_name != subject_name
@@ -837,10 +860,11 @@ class PKCS10(DER_object):
## @var allowed_extensions
# Extensions allowed by RPKI profile.
- allowed_extensions = frozenset(rpki.oids.safe_name2dotted(name)
- for name in ("basicConstraints",
- "keyUsage",
- "subjectInfoAccess"))
+ allowed_extensions = frozenset((rpki.oids.basicConstraints,
+ rpki.oids.keyUsage,
+ rpki.oids.subjectInfoAccess,
+ rpki.oids.extendedKeyUsage))
+
def get_DER(self):
"""
@@ -873,91 +897,189 @@ class PKCS10(DER_object):
"""
Extract the public key from this certification request.
"""
- return RSApublic(POW = self.get_POW().getPublicKey())
+ return PublicKey(POW = self.get_POW().getPublicKey())
+
+ def get_SKI(self):
+ """
+ Compute SKI for public key from this certification request.
+ """
+ return self.getPublicKey().get_SKI()
- def check_valid_rpki(self):
+
+ def check_valid_request_common(self):
"""
- Check this certification request to see whether it's a valid
- request for an RPKI certificate. This is broken out of the
- up-down protocol code because it's somewhat involved and the
- up-down code doesn't need to know the details.
+ Common code for checking this certification requests to see
+ whether they conform to the RPKI certificate profile.
Throws an exception if the request isn't valid, so if this method
returns at all, the request is ok.
- At the moment, this only allows requests for CA certificates; as a
- direct consequence, it also rejects ExtendedKeyUsage, because the
- RPKI profile only allows EKU for EE certificates.
+ You probably don't want to call this directly, as it only performs
+ the checks that are common to all RPKI certificates.
"""
if not self.get_POW().verify():
- raise rpki.exceptions.BadPKCS10("Signature check failed")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 signature check failed")
ver = self.get_POW().getVersion()
if ver != 0:
- raise rpki.exceptions.BadPKCS10("Bad version number %s" % ver)
+ raise rpki.exceptions.BadPKCS10("PKCS #10 request has bad version number %s" % ver)
- alg = rpki.oids.safe_dotted2name(self.get_POW().getSignatureAlgorithm())
+ ku = self.get_POW().getKeyUsage()
- if alg != "sha256WithRSAEncryption":
- raise rpki.exceptions.BadPKCS10("Bad signature algorithm %s" % alg)
+ if ku is not None and self.expected_ca_keyUsage != ku:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 keyUsage doesn't match profile: %r" % ku)
- bc = self.get_POW().getBasicConstraints()
-
- if bc is None or not bc[0]:
- raise rpki.exceptions.BadPKCS10("Request for EE certificate not allowed here")
+ forbidden_extensions = self.get_POW().getExtensionOIDs() - self.allowed_extensions
- if bc[1] is not None:
- raise rpki.exceptions.BadPKCS10("basicConstraints must not specify Path Length")
+ if forbidden_extensions:
+ raise rpki.exceptions.BadExtension("Forbidden extension%s in PKCS #10 certificate request: %s" % (
+ "" if len(forbidden_extensions) == 1 else "s",
+ ", ".join(forbidden_extensions)))
- ku = self.get_POW().getKeyUsage()
- if ku is not None and self.expected_ca_keyUsage != ku:
- raise rpki.exceptions.BadPKCS10("keyUsage doesn't match basicConstraints: %r" % ku)
+ def check_valid_request_ca(self):
+ """
+ Check this certification request to see whether it's a valid
+ request for an RPKI CA certificate.
+
+ Throws an exception if the request isn't valid, so if this method
+ returns at all, the request is ok.
+ """
- if any(oid not in self.allowed_extensions
- for oid in self.get_POW().getExtensionOIDs()):
- raise rpki.exceptions.BadExtension("Forbidden extension(s) in certificate request")
+ self.check_valid_request_common()
+ alg = self.get_POW().getSignatureAlgorithm()
+ bc = self.get_POW().getBasicConstraints()
+ eku = self.get_POW().getEKU()
sias = self.get_POW().getSIA()
+ if alg != rpki.oids.sha256WithRSAEncryption:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 has bad signature algorithm for CA: %s" % alg)
+
+ if bc is None or not bc[0] or bc[1] is not None:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA bad basicConstraints")
+
+ if eku is not None:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA EKU not allowed")
+
if sias is None:
- raise rpki.exceptions.BadPKCS10("Certificate request is missing SIA extension")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA missing")
caRepository, rpkiManifest, signedObject = sias
if signedObject:
- raise rpki.exceptions.BadPKCS10("CA certificate request has SIA id-ad-signedObject")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA must not have id-ad-signedObject")
if not caRepository:
- raise rpki.exceptions.BadPKCS10("Certificate request is missing SIA id-ad-caRepository")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA must have id-ad-caRepository")
if not any(uri.startswith("rsync://") for uri in caRepository):
- raise rpki.exceptions.BadPKCS10("Certificate request SIA id-ad-caRepository contains no rsync URIs")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA id-ad-caRepository contains no rsync URIs")
+
+ if any(uri.startswith("rsync://") and not uri.endswith("/") for uri in caRepository):
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA id-ad-caRepository does not end with slash")
if not rpkiManifest:
- raise rpki.exceptions.BadPKCS10("Certificate request is missing SIA id-ad-rpkiManifest")
-
- if not any(uri.startswith("rsync://") for uri in rpkiManifest):
- raise rpki.exceptions.BadPKCS10("Certificate request SIA id-ad-rpkiManifest contains no rsync URIs")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA must have id-ad-rpkiManifest")
- if any(uri.startswith("rsync://") and not uri.endswith("/") for uri in caRepository):
- raise rpki.exceptions.BadPKCS10("Certificate request SIA id-ad-caRepository does not end with slash")
+ if not any(uri.startswith("rsync://") for uri in rpkiManifest):
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA id-ad-rpkiManifest contains no rsync URIs")
if any(uri.startswith("rsync://") and uri.endswith("/") for uri in rpkiManifest):
- raise rpki.exceptions.BadPKCS10("Certificate request SIA id-ad-rpkiManifest ends with slash")
+ raise rpki.exceptions.BadPKCS10("PKCS #10 CA SIA id-ad-rpkiManifest ends with slash")
+
+
+ def check_valid_request_ee(self):
+ """
+ Check this certification request to see whether it's a valid
+ request for an RPKI EE certificate.
+
+ Throws an exception if the request isn't valid, so if this method
+ returns at all, the request is ok.
+
+ We're a bit less strict here than we are for either CA
+ certificates or BGPSEC router certificates, because the profile is
+ less tightly nailed down for unspecified-use RPKI EE certificates.
+ Future specific purposes may impose tighter constraints.
+
+ Note that this method does NOT apply to so-called "infrastructure"
+ EE certificates (eg, the EE certificates embedded in manifests and
+ ROAs); those are constrained fairly tightly, but they're also
+ generated internally so we don't need to check them as user or
+ protocol input.
+ """
+
+ self.check_valid_request_common()
+
+ alg = self.get_POW().getSignatureAlgorithm()
+ bc = self.get_POW().getBasicConstraints()
+ sia = self.get_POW().getSIA()
+
+ caRepository, rpkiManifest, signedObject = sia or (None, None, None)
+
+ if alg not in (rpki.oids.sha256WithRSAEncryption, rpki.oids.ecdsa_with_SHA256):
+ raise rpki.exceptions.BadPKCS10("PKCS #10 has bad signature algorithm for EE: %s" % alg)
+
+ if bc is not None and (bc[0] or bc[1] is not None):
+ raise rpki.exceptions.BadPKCS10("PKCS #10 EE has bad basicConstraints")
+
+ if caRepository:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 EE must not have id-ad-caRepository")
+
+ if rpkiManifest:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 EE must not have id-ad-rpkiManifest")
+
+ if signedObject and not any(uri.startswith("rsync://") for uri in signedObject):
+ raise rpki.exceptions.BadPKCS10("PKCS #10 EE SIA id-ad-signedObject contains no rsync URIs")
+
+
+ def check_valid_request_router(self):
+ """
+ Check this certification request to see whether it's a valid
+ request for a BGPSEC router certificate.
+
+ Throws an exception if the request isn't valid, so if this method
+ returns at all, the request is ok.
+
+ draft-ietf-sidr-bgpsec-pki-profiles 3.2 says follow RFC 6487 3
+ except where explicitly overriden, and does not override for SIA.
+ But draft-ietf-sidr-bgpsec-pki-profiles also says that router
+ certificates don't get SIA, while RFC 6487 requires SIA. So what
+ do we do with SIA in PKCS #10 for router certificates?
+
+ For the moment, ignore it, but make sure we don't include it in
+ the certificate when we get to the code that generates that.
+ """
+
+ self.check_valid_request_ee()
+
+ alg = self.get_POW().getSignatureAlgorithm()
+ eku = self.get_POW().getEKU()
+
+ if alg != rpki.oids.ecdsa_with_SHA256:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 has bad signature algorithm for router: %s" % alg)
+
+ # Not really clear to me whether PKCS #10 should have EKU or not, so allow
+ # either, but insist that it be the right one if present.
+
+ if eku is not None and rpki.oids.id_kp_bgpsec_router not in eku:
+ raise rpki.exceptions.BadPKCS10("PKCS #10 router must have EKU")
+
@classmethod
def create(cls, keypair, exts = None, is_ca = False,
- caRepository = None, rpkiManifest = None, signedObject = None):
+ caRepository = None, rpkiManifest = None, signedObject = None,
+ cn = None, sn = None, eku = None):
"""
Create a new request for a given keypair.
"""
assert exts is None, "Old calling sequence to rpki.x509.PKCS10.create()"
- cn = "".join(("%02X" % ord(i) for i in keypair.get_SKI()))
+ if cn is None:
+ cn = "".join(("%02X" % ord(i) for i in keypair.get_SKI()))
if isinstance(caRepository, str):
caRepository = (caRepository,)
@@ -970,7 +1092,7 @@ class PKCS10(DER_object):
req = rpki.POW.PKCS10()
req.setVersion(0)
- req.setSubject(X501DN.from_cn(cn).get_POW())
+ req.setSubject(X501DN.from_cn(cn, sn).get_POW())
req.setPublicKey(keypair.get_POW())
if is_ca:
@@ -980,6 +1102,9 @@ class PKCS10(DER_object):
if caRepository or rpkiManifest or signedObject:
req.setSIA(caRepository, rpkiManifest, signedObject)
+ if eku:
+ req.setEKU(eku)
+
req.sign(keypair.get_POW(), rpki.POW.SHA256_DIGEST)
return cls(POW = req)
@@ -1014,9 +1139,10 @@ class insecure_debug_only_rsa_key_generator(object):
self.keyno += 1
return v
-class RSA(DER_object):
+
+class PrivateKey(DER_object):
"""
- Class to hold an RSA key pair.
+ Class to hold a Public/Private key pair.
"""
POW_class = rpki.POW.Asymmetric
@@ -1055,18 +1181,6 @@ class RSA(DER_object):
assert self.empty()
self.POW = self.POW_class.pemReadPrivate(pem)
- @classmethod
- def generate(cls, keylength = 2048, quiet = False):
- """
- Generate a new keypair.
- """
- if not quiet:
- rpki.log.debug("Generating new %d-bit RSA key" % keylength)
- if generate_insecure_debug_only_rsa_key is not None:
- return cls(POW = generate_insecure_debug_only_rsa_key())
- else:
- return cls(POW = rpki.POW.Asymmetric.generateRSA(keylength))
-
def get_public_DER(self):
"""
Get the DER encoding of the public key from this keypair.
@@ -1079,15 +1193,15 @@ class RSA(DER_object):
"""
return self.get_POW().calculateSKI()
- def get_RSApublic(self):
+ def get_public(self):
"""
- Convert the public key of this keypair into a RSApublic object.
+ Convert the public key of this keypair into a PublicKey object.
"""
- return RSApublic(DER = self.get_public_DER())
+ return PublicKey(DER = self.get_public_DER())
-class RSApublic(DER_object):
+class PublicKey(DER_object):
"""
- Class to hold an RSA public key.
+ Class to hold a public key.
"""
POW_class = rpki.POW.Asymmetric
@@ -1132,22 +1246,63 @@ class RSApublic(DER_object):
"""
return self.get_POW().calculateSKI()
-def POWify_OID(oid):
+class KeyParams(DER_object):
+ """
+ Wrapper for OpenSSL's asymmetric key parameter classes.
+ """
+
+ POW_class = rpki.POW.AsymmetricParams
+
+ @classmethod
+ def generateEC(cls, curve = rpki.POW.EC_P256_CURVE):
+ return cls(POW = rpki.POW.AsymmetricParams.generateEC(curve = curve))
+
+class RSA(PrivateKey):
"""
- Utility function to convert tuple form of an OID to the
- dotted-decimal string form that rpki.POW uses.
+ Class to hold an RSA key pair.
"""
- if isinstance(oid, str):
- return POWify_OID(rpki.oids.name2oid[oid])
- else:
- return ".".join(str(i) for i in oid)
+
+ @classmethod
+ def generate(cls, keylength = 2048, quiet = False):
+ """
+ Generate a new keypair.
+ """
+ if not quiet:
+ rpki.log.debug("Generating new %d-bit RSA key" % keylength)
+ if generate_insecure_debug_only_rsa_key is not None:
+ return cls(POW = generate_insecure_debug_only_rsa_key())
+ else:
+ return cls(POW = rpki.POW.Asymmetric.generateRSA(keylength))
+
+class ECDSA(PrivateKey):
+ """
+ Class to hold an ECDSA key pair.
+ """
+
+ @classmethod
+ def generate(cls, params = None, quiet = False):
+ """
+ Generate a new keypair.
+ """
+
+ if params is None:
+ if not quiet:
+ rpki.log.debug("Generating new ECDSA key parameters")
+ params = KeyParams.generateEC()
+
+ assert isinstance(params, KeyParams)
+
+ if not quiet:
+ rpki.log.debug("Generating new ECDSA key")
+
+ return cls(POW = rpki.POW.Asymmetric.generateFromParams(params.get_POW()))
class CMS_object(DER_object):
"""
Abstract class to hold a CMS object.
"""
- econtent_oid = POWify_OID("id-data")
+ econtent_oid = rpki.oids.id_data
POW_class = rpki.POW.CMS
## @var dump_on_verify_failure
@@ -1492,7 +1647,7 @@ class SignedManifest(DER_CMS_object):
Class to hold a signed manifest.
"""
- econtent_oid = POWify_OID("id-ct-rpkiManifest")
+ econtent_oid = rpki.oids.id_ct_rpkiManifest
POW_class = rpki.POW.Manifest
def getThisUpdate(self):
@@ -1525,7 +1680,7 @@ class SignedManifest(DER_CMS_object):
obj.setManifestNumber(serial)
obj.setThisUpdate(thisUpdate)
obj.setNextUpdate(nextUpdate)
- obj.setAlgorithm(POWify_OID(rpki.oids.name2oid["id-sha256"]))
+ obj.setAlgorithm(rpki.oids.id_sha256)
obj.addFiles(filelist)
self = cls(POW = obj)
@@ -1537,7 +1692,7 @@ class ROA(DER_CMS_object):
Class to hold a signed ROA.
"""
- econtent_oid = POWify_OID("id-ct-routeOriginAttestation")
+ econtent_oid = rpki.oids.id_ct_routeOriginAttestation
POW_class = rpki.POW.ROA
@classmethod
@@ -1614,7 +1769,7 @@ class XML_CMS_object(Wrapped_CMS_object):
Class to hold CMS-wrapped XML protocol data.
"""
- econtent_oid = POWify_OID("id-ct-xml")
+ econtent_oid = rpki.oids.id_ct_xml
## @var dump_outbound_cms
# If set, we write all outbound XML-CMS PDUs to disk, for debugging.
@@ -1755,7 +1910,7 @@ class Ghostbuster(Wrapped_CMS_object):
managed by the back-end.
"""
- econtent_oid = POWify_OID("id-ct-rpkiGhostbusters")
+ econtent_oid = rpki.oids.id_ct_rpkiGhostbusters
def encode(self):
"""