diff options
Diffstat (limited to 'rpkid/rpki/x509.py')
-rw-r--r-- | rpkid/rpki/x509.py | 152 |
1 files changed, 132 insertions, 20 deletions
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py index c9ed2864..fa46fb74 100644 --- a/rpkid/rpki/x509.py +++ b/rpkid/rpki/x509.py @@ -26,9 +26,9 @@ bring together the functionality I need in a way that hides at least some of the nasty details. This involves a lot of format conversion. """ -import POW, tlslite.api, POW.pkix, base64, time -import rpki.exceptions, rpki.resource_set, rpki.cms, rpki.oids, rpki.sundial -import rpki.manifest, rpki.roa +import POW, tlslite.api, POW.pkix, base64, lxml.etree, os +import rpki.exceptions, rpki.resource_set, rpki.oids, rpki.sundial +import rpki.manifest, rpki.roa, rpki.relaxng def calculate_SKI(public_key_der): """Calculate the SKI value given the DER representation of a public @@ -208,6 +208,23 @@ class DER_object(object): """Convert to SQL storage format.""" return self.get_DER() + def dumpasn1(self): + """Prettyprint an ASN.1 DER object using cryptlib dumpasn1 tool. + Use a temporary file rather than popen4() because dumpasn1 uses + seek() when decoding ASN.1 content nested in OCTET STRING values. + """ + + fn = "dumpasn1.tmp" + try: + f = open(fn, "wb") + f.write(self.get_DER()) + f.close() + f = os.popen("dumpasn1 2>&1 -a " + fn) + print "\n".join(x for x in f.read().splitlines() if x.startswith(" ")) + f.close() + finally: + os.unlink(fn) + class X509(DER_object): """X.509 certificates. @@ -579,6 +596,15 @@ class RSApublic(DER_object): """Calculate the SKI of this public key.""" return calculate_SKI(self.get_DER()) +def POWify(oid): + """Utility function to convert tuple form of an OID to + the dotted-decimal string form that POW uses. + """ + if isinstance(oid, str): + return POWify(rpki.oids.name2oid[oid]) + else: + return ".".join(str(i) for i in oid) + class CMS_object(DER_object): """Class to hold a CMS-wrapped object. @@ -593,7 +619,10 @@ class CMS_object(DER_object): formats = ("DER",) other_clear = ("content",) + econtent_oid = POWify("id-data") + dump_on_verify_failure = False + def get_DER(self): """Get the DER value of this CMS_object.""" assert not self.empty() @@ -611,20 +640,57 @@ class CMS_object(DER_object): self.clear() self.content = content - def verify_and_store(self, ta, obj): + def verify(self, ta): """Verify CMS wrapper and store inner content.""" - s = rpki.cms.verify(self.get_DER(), ta) - obj.fromString(s) - self.content = obj - def sign(self, keypair, certs): + cms = POW.derRead(POW.CMS_MESSAGE, self.get_DER()) + store = POW.X509Store() + if isinstance(ta, (tuple, list)): + for x in ta: + store.addTrust(x.get_POW()) + else: + store.addTrust(ta.get_POW()) + try: + content = cms.verify(store) + except: + if self.dump_on_verify_failure: + print "CMS verification failed, dumping ASN.1:" + self.dumpasn1() + raise rpki.exceptions.CMSVerificationFailed, "CMS verification failed" + self.decode(content) + return self.get_content() + + def sign(self, keypair, certs, no_certs = False): """Sign and wrap inner content.""" - self.DER = rpki.cms.sign(self.get_content().toString(), keypair, certs) -class SignedManifest(CMS_object): + cms = POW.CMS() + cms.sign(certs[0].get_POW(), + keypair.get_POW(), + [x.get_POW() for x in certs[1:]], + self.encode(), + self.econtent_oid, + no_certs) + self.DER = cms.derWrite() + +class DER_CMS_object(CMS_object): + """Class to hold CMS objects with DER-based content.""" + + def encode(self): + """Encode inner content for signing.""" + return self.get_content().toString() + + def decode(self, der): + """Decode DER and set inner content.""" + obj = self.content_class() + obj.fromString(der) + self.content = obj + +class SignedManifest(DER_CMS_object): """Class to hold a signed manifest.""" pem_converter = PEM_converter("RPKI MANIFEST") + content_class = rpki.manifest.Manifest + econtent_oid = POWify("id-ct-rpkiManifest") def getThisUpdate(self): """Get thisUpdate value from this manifest.""" @@ -634,10 +700,6 @@ class SignedManifest(CMS_object): """Get nextUpdate value from this manifest.""" return rpki.sundial.datetime.fromGeneralizedTime(self.get_content().nextUpdate.get()) - def verify(self, ta): - """Verify this manifest.""" - self.verify_and_store(ta, rpki.manifest.Manifest()) - @classmethod def build(cls, serial, thisUpdate, nextUpdate, names_and_objs, keypair, certs, version = 0): """Build a signed manifest.""" @@ -653,20 +715,18 @@ class SignedManifest(CMS_object): m.manifestNumber.set(serial) m.thisUpdate.set(thisUpdate.toGeneralizedTime()) m.nextUpdate.set(nextUpdate.toGeneralizedTime()) - m.fileHashAlg.set((2, 16, 840, 1, 101, 3, 4, 2, 1)) # id-sha256 + m.fileHashAlg.set(rpki.oids.name2oid["id-sha256"]) m.fileList.set(filelist) self.set_content(m) self.sign(keypair, certs) return self -class ROA(CMS_object): +class ROA(DER_CMS_object): """Class to hold a signed ROA.""" pem_converter = PEM_converter("ROUTE ORIGIN ATTESTATION") - - def verify(self, ta): - """Verify this ROA.""" - self.verify_and_store(ta, rpki.roa.RouteOriginAttestation()) + content_class = rpki.roa.RouteOriginAttestation + econtent_oid = POWify("id-ct-routeOriginAttestation") @classmethod def build(cls, as_number, exact_match, ipv4, ipv6, keypair, certs, version = 0): @@ -681,6 +741,58 @@ class ROA(CMS_object): self.sign(keypair, certs) return self +class XML_CMS_object(CMS_object): + """Class to hold CMS-wrapped XML protocol data.""" + + econtent_oid = POWify("id-ct-xml") + + def encode(self): + """Encode inner content for signing.""" + return lxml.etree.tostring(self.get_content(), pretty_print = True, encoding = self.encoding, xml_declaration = True) + + def decode(self, xml): + """Decode XML and set inner content.""" + self.content = lxml.etree.fromstring(xml) + + def prettyprint_content(self): + """Prettyprint XML content of this message.""" + return lxml.etree.tostring(self.get_content(), pretty_print = True, encoding = self.encoding, xml_declaration = True) + + def schema_check(self): + """Handle XML RelaxNG schema check.""" + try: + self.schema.assertValid(self.get_content()) + except lxml.etree.DocumentInvalid: + rpki.log.error("PDU failed schema check: " + self.prettyprint_content()) + raise + + @classmethod + def build(cls, elt, keypair, certs): + """Build a CMS-wrapped XML PDU.""" + self = cls() + self.set_content(elt) + self.schema_check() + self.sign(keypair, certs) + return self + + def verify(self, ta): + """Wrapper around CMS_object.verify(), adds RelaxNG schema check.""" + CMS_object.verify(self, ta) + self.schema_check() + return self.get_content() + +class left_right_pdu(XML_CMS_object): + """Class to hold a CMS-signed left-right PDU.""" + + encoding = "us-ascii" + schema = rpki.relaxng.left_right + +class up_down_pdu(XML_CMS_object): + """Class to hold a CMS-signed up-down PDU.""" + + encoding = "UTF-8" + schema = rpki.relaxng.up_down + class CRL(DER_object): """Class to hold a Certificate Revocation List.""" |