#*****************************************************************************# #* *# #* Copyright (c) 2002, Peter Shannon *# #* All rights reserved. *# #* *# #* Redistribution and use in source and binary forms, with or without *# #* modification, are permitted provided that the following conditions *# #* are met: *# #* *# #* * Redistributions of source code must retain the above *# #* copyright notice, this list of conditions and the following *# #* disclaimer. *# #* *# #* * Redistributions in binary form must reproduce the above *# #* copyright notice, this list of conditions and the following *# #* disclaimer in the documentation and/or other materials *# #* provided with the distribution. *# #* *# #* * The name of the contributors may be used to endorse or promote *# #* products derived from this software without specific prior *# #* written permission. *# #* *# #* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *# #* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *# #* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS *# #* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS *# #* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *# #* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *# #* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, *# #* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY *# #* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT *# #* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE *# #* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *# #* *# #*****************************************************************************# import types, time, pprint, cStringIO, _der from _simpledb import OidData as _OidData from _der import * DEBUG = 0 _oidData = _OidData() obj2oid = _oidData.obj2oid oid2obj = _oidData.oid2obj _fragments = [] def _docset(): return _der._docset() + _fragments #---------- crypto driver ----------# class CryptoDriver(object): """Dispatcher for crypto calls. This module has very minimal dependencies on crypto code, as it's almost entirely about ASN.1 encoding and decoding. Rather than wiring in the handful of crypto calls, we dispatch them through this driver. The default driver uses POW, but you can replace it with any crypto package you like. This is a virtual class. You will have to subtype it. """ def getOID(self, digestType): """Convert a digest identifier into an OID. If the identifier we get is a tuple, we assume it's already an OID and just return it. If the identifier is in the driver identifier mapping table, we use that to return an OID. Otherwise, we try mapping it via the name-to-OID database. """ if isinstance(digestType, tuple): return digestType if digestType in self.driver2OID: return self.driver2OID[digestType] return obj2oid(digestType) def sign(self, key, oid, plaintext): """Sign something with an RSA key and a given digest algorithm.""" raise NotImplementedError def verify(self, key, oid, plaintext, signature): """Verify a signature.""" raise NotImplementedError def toPublicDER(self, key): """Get the DER representation of an RSA key.""" raise NotImplementedError def fromPublicDER(self, der): """Set the driver representation of an RSA key from DER.""" raise NotImplementedError class POWCryptoDriver(CryptoDriver): """Dispatcher for crypto calls using POW package.""" def __init__(self): global POW import POW self.driver2OID = {} for k, v in (("MD2_DIGEST", (1, 2, 840, 113549, 1, 1, 2)), # md2WithRSAEncryption ("MD5_DIGEST", (1, 2, 840, 113549, 1, 1, 4)), # md5WithRSAEncryption ("SHA_DIGEST", (1, 3, 14, 3, 2, 15)), # shaWithRSAEncryption ("SHA1_DIGEST", (1, 2, 840, 113549, 1, 1, 5)), # sha1withRSAEncryption ("RIPEMD160_DIGEST", (1, 2, 840, 113549, 1, 1, 6)), # ripemd160WithRSAEncryption ("SHA256_DIGEST", (1, 2, 840, 113549, 1, 1, 11)), # sha256WithRSAEncryption ("SHA384_DIGEST", (1, 2, 840, 113549, 1, 1, 12)), # sha384WithRSAEncryption ("SHA512_DIGEST", (1, 2, 840, 113549, 1, 1, 13)), # sha512WithRSAEncryption ): try: self.driver2OID[getattr(POW, k)] = v except AttributeError: pass self.OID2driver = dict((v,k) for k,v in self.driver2OID.items()) def _digest(self, oid, plaintext): digest = POW.Digest(self.OID2driver[oid]) digest.update(plaintext) return digest.digest() def sign(self, key, oid, plaintext): return key.sign(self._digest(oid, plaintext), self.OID2driver[oid]) def verify(self, key, oid, plaintext, signature): return key.verify(signature, self._digest(oid, plaintext), self.OID2driver[oid]) def toPublicDER(self, key): return key.derWrite(POW.RSA_PUBLIC_KEY) def fromPublicDER(self, der): return POW.derRead(POW.RSA_PUBLIC_KEY, der) _cryptoDriver = None # Don't touch this directly def setCryptoDriver(driver): """Set crypto driver. The driver should be an instance of CryptoDriver. """ assert isinstance(driver, CryptoDriver) global _cryptoDriver _cryptoDriver = driver def getCryptoDriver(): """Return the currently selected CryptoDriver instance. If no driver has been selected, instantiate the default POW driver. """ global _cryptoDriver if _cryptoDriver is None: setCryptoDriver(POWCryptoDriver()) return _cryptoDriver #---------- crypto driver ----------# def _addFragment(frag): global _fragments _fragments.append(frag) _addFragment('''
utc2time time
This is a helper function for turning a UTCTime string into an integer. It isn't built into the encoder since the various functions which are used to manipulate the tm structure are notoriously unreliable.
''') def utc2time(val): 'der encoded value not including tag or length' if not isinstance(val, types.StringType): raise DerError, 'argument should be a string' t = time.strptime(val, '%y%m%d%H%M%SZ') return int(time.mktime(t)) _addFragment('''
time2utc time
This is a helper function for turning an integer into a UTCTime string. It isn't built into the encoder since the various functions which are used to manipulate the tm structure are notoriously unreliable.
''') def time2utc(val): 'numerical time value like time_t' val = int(val) t = time.gmtime(val) return time.strftime('%y%m%d%H%M%SZ', t) _addFragment('''
gen2time time
This is a helper function for turning a GeneralizedTime string into an integer. It isn't built into the encoder since the various functions which are used to manipulate the tm structure are notoriously unreliable.
''') def gen2Time(val): 'der encoded value not including tag or length' if not isinstance(val, types.StringType): raise DerError, 'argument should be a string' t = time.strptime(val, '%Y%m%d%H%M%SZ') return int(time.mktime(t)) _addFragment('''
time2gen time
This is a helper function for turning an integer into a GeneralizedTime string. It isn't built into the encoder since the various functions which are used to manipulate the tm structure are notoriously unreliable.
''') def time2gen(val): 'numerical time value like time_t' val = int(val) t = time.gmtime(val) return time.strftime('%Y%m%d%H%M%SZ', t) _addFragment('''
ip42oct ip
ip should be a list or tuple of integers, from 0 to 256. Setting <classname>IpAddress</classname> ip = IpAddress() ip.set( ip42oct(192, 168, 0, 231) )
''') def ip42oct(val0, val1, val2, val3): return chr(val0) + chr(val1) + chr(val2) + chr(val3) _addFragment('''
oct2ip4 val
Returns a tuple of 4 integers, from 0 to 256.
''') def oct2ip4(val): if not isinstance(val, types.StringType) or len(val) != 4: raise DerError, 'parameter should be string of 4 characters' return ( ord(val[0]), ord(val[1]), ord(val[2]), ord(val[3]) ) #---------- certificate support ----------# class TbsCertificate(Sequence): def __init__(self, optional=0, default=''): self.version = Integer() self.explicitVersion = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.version, 0, 'oAMCAQA=\n' ) self.serial = Integer() self.signature = AlgorithmIdentifier() self.issuer = Name() self.subject = Name() self.subjectPublicKeyInfo = SubjectPublicKeyInfo() self.validity = Validity() self.issuerUniqueID = BitString(1) self.issuerUniqueID.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 ) self.subjectUniqueID = BitString(1) self.subjectUniqueID.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 ) self.extensions = Extensions() self.explicitExtensions = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 3, self.extensions, 1 ) contents = [ self.explicitVersion, self.serial, self.signature, self.issuer, self.validity, self.subject, self.subjectPublicKeyInfo, self.issuerUniqueID, self.subjectUniqueID, self.explicitExtensions ] Sequence.__init__(self, contents, optional, default) class Validity(Sequence): def __init__(self, optional=0, default=''): Time = lambda : Choice({ 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() }) self.notBefore = Time() self.notAfter = Time() contents = [self.notBefore, self.notAfter] Sequence.__init__(self, contents, optional, default) # IA5String should not be allowed in DirectoryString, but old # implementations (deprecated but not quite outlawed by RFC 3280) # sometimes use it for EmailAddress attributes in subject names, which # triggers decode failures here unless we violate RFC 3280 by allowing # IA5String. Do not use, do not use, do not use. class DirectoryString(Choice): def __init__(self, optional=0, default=''): choices = { 'teletexString' : T61String(), 'printableString' : PrintableString(), 'universalString' : UniversalString(), 'bmpString' : BmpString(), 'utf8String' : Utf8String(), 'ia5String' : IA5String() } Choice.__init__(self, choices, optional, default) class AttributeTypeAndValue(Sequence): def __init__(self, optional=0, default=''): self.type = Oid() self.dirstr = DirectoryString() contents = [ self.type, self.dirstr ] Sequence.__init__(self, contents, optional, default) class RelativeDistinguishedName(SetOf): def __init__(self, optional=0, default=''): SetOf.__init__(self, AttributeTypeAndValue, optional, default) class Name(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, RelativeDistinguishedName, optional, default) class AlgorithmIdentifier(Sequence): def __init__(self, optional=0, default=''): self.algorithm = Oid() self.parameters = Null() contents = [self.algorithm, self.parameters] Sequence.__init__(self, contents, optional, default) class SubjectPublicKeyInfo(Sequence): def __init__(self, optional=0, default=''): self.algorithmId = AlgorithmIdentifier() self.subjectPublicKey = AltBitString() contents = [ self.algorithmId, self.subjectPublicKey ] Sequence.__init__(self, contents, optional, default) class Extensions(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, Extension, optional, default) _addFragment('''
Certificate Sequence
Setting <classname>Certificate</classname> rsa = POW.Asymmetric() cert = POW.pkix.Certificate() cert.setVersion(1) cert.setSerial(5) name = ( (( o2i('countryName'), ('printableString', 'GB') ),), (( o2i('stateOrProvinceName'), ('printableString', 'Hertfordshire') ),), (( o2i('organizationName'), ('printableString', 'The House') ),), (( o2i('commonName'), ('printableString', 'Client') ),) ) cert.setIssuer(name) cert.setSubject(name) now = POW.pkix.time2gen( time.time() ) then = POW.pkix.time2gen(time.time() + 60*60*24*365*12) cert.setNotBefore( ('generalTime', now) ) cert.setNotAfter( ( 'generalTime', then) ) cert.setIssuerUniqueID((1,0,1,0)) cert.setSubjectUniqueID((1,0,0,1)) cert.sign(rsa, POW.MD5_DIGEST)
''') class Certificate(Sequence): _addFragment('''
Certificate optional=0 default=''
''') def __init__(self, optional=0, default=''): self.tbs = TbsCertificate() self.signatureAlgorithm = AlgorithmIdentifier() self.signatureValue = AltBitString() contents = [ self.tbs, self.signatureAlgorithm, self.signatureValue ] Sequence.__init__(self, contents, optional, default) _addFragment('''
Certificate setVersion version
This function sets an Integer object. 0 indicates a version 1 certificate, 1 a version 2 certificate and 2 a version 3 certificate.
''') def setVersion(self, version): self.tbs.version.set(version) _addFragment('''
Certificate getVersion
This function returns whatever the version object is set to, this should be 0, 1 or 2.
''') def getVersion(self): return self.tbs.version.get() _addFragment('''
Certificate setSerial serial
This function sets an Integer object. No two certificates issued should ever have the same serial number.
''') def setSerial(self, serial): self.tbs.serial.set(serial) _addFragment('''
Certificate getVersion
This function returns whatever the serial object is set to.
''') def getSerial(self): return self.tbs.serial.get() _addFragment('''
Certificate setIssuer names
This function sets an Name object. See Certificate class for an example.
''') def setIssuer(self, issuer): self.tbs.issuer.set(issuer) _addFragment('''
Certificate getIssuer
This function returns a complex tuple containing other tuples.
''') def getIssuer(self): return self.tbs.issuer.get() _addFragment('''
Certificate setSubject names
This function sets an Name object. See Certificate class for an example.
''') def setSubject(self, subject): self.tbs.subject.set(subject) _addFragment('''
Certificate getSubject
This function returns a complex tuple containing other tuples.
''') def getSubject(self): return self.tbs.subject.get() _addFragment('''
Certificate setNotBefore time
This function sets a Choice object. It can be either a GeneralTime or UTCTime object. The functions gen2time, utc2time, time2gen and time2utc can be used to convert to and from integer times and their string representation. <function>setNotBefore</function> method usage cert = POW.pkix.Certificate() now = POW.pkix.time2gen( time.time() ) cert.setNotBefore( ('generalTime', now) )
''') def setNotBefore(self, nb): self.tbs.validity.notBefore.set(nb) _addFragment('''
Certificate getNotBefore
This function returns a tuple indicating which type of time was stored and its value. See setNotBefore for details.
''') def getNotBefore(self): return self.tbs.validity.notBefore.get() _addFragment('''
Certificate setNotAfter time
This function sets a Choice object. See setNotBefore for details.
''') def setNotAfter(self, na): self.tbs.validity.notAfter.set(na) _addFragment('''
Certificate getNotAfter
This function returns a tuple indicating which type of time was stored and its value. See setNotBefore for details.
''') def getNotAfter(self): return self.tbs.validity.notAfter.get() _addFragment('''
Certificate setIssuerUniqueID id
This function sets a BitString object. This is part of the X509v2 standard and is quite poorly regarded in general, its use is not recommended. It is set using the normal BitString method, that is with a sequence of true/false objects.
''') def setIssuerUniqueID(self, id): self.tbs.issuerUniqueID.set(id) _addFragment('''
Certificate getIssuerUniqueID
This function returns a tuple of integers, 1 or 0.
''') def getIssuerUniqueID(self): return self.tbs.issuerUniqueID.get() _addFragment('''
Certificate setSubjectUniqueID id
This function sets a BitString object. This is part of the X509v2 standard and is quite poorly regarded in general, its use is not recommended. It is set using the normal BitString method, that is with a sequence of true/false objects.
''') def setSubjectUniqueID(self, id): self.tbs.subjectUniqueID.set(id) _addFragment('''
Certificate getSubjectUniqueID
This function returns a tuple of integers, 1 or 0.
''') def getSubjectUniqueID(self): return self.tbs.subjectUniqueID.get() _addFragment('''
Certificate setExtensions extns
This method sets an Extensions object, defined as SEQUENCE OF Extension. The parameter extns should consist of a list or tuple of values suitable to set an extension. See the extension class for details.
''') def setExtensions(self, extns): self.tbs.extensions.set(extns) _addFragment('''
Certificate getExtensions
This function returns a tuple of Extension values. See Extension for details.
''') def getExtensions(self): return self.tbs.extensions.get() def getExtension(self, oid): for x in self.getExtensions(): if x[0] == oid: return x return None _addFragment('''
Certificate sign rsa digestType
This function updates structured of the Certificate and tbs as appropriate and performs the specified digest on the tbs and set signedText to signed the digest.
''') def sign(self, rsa, digestType): driver = getCryptoDriver() oid = driver.getOID(digestType) self.tbs.signature.set([oid, None]) signedText = driver.sign(rsa, oid, self.tbs.toString()) self.signatureAlgorithm.set([oid, None]) self.signatureValue.set(signedText) _addFragment('''
Certificate verify rsa
This function works out what kind of digest was used to during signing, calculates the digest of tbs and verifies the envelope using the key.
''') def verify(self, rsa): driver = getCryptoDriver() oid = self.signatureAlgorithm.get()[0] return driver.verify(rsa, oid, self.tbs.toString(), self.signatureValue.get()) #---------- certificate support ----------# #---------- CRL ----------# class RevokedCertificate(Sequence): def __init__(self, optional=0, default=''): self.userCertificate = Integer() self.revocationDate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() } ) self.crlEntryExtensions = Extensions(1) contents = [ self.userCertificate, self.revocationDate, self.crlEntryExtensions ] Sequence.__init__(self, contents, optional, default) class RevokedCertificates(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, RevokedCertificate, optional, default) class TbsCertList(Sequence): def __init__(self, optional=0, default=''): self.version = Integer(1) self.signature = AlgorithmIdentifier() self.issuer = Name() self.thisUpdate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() } ) self.nextUpdate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() }, 1 ) self.revokedCertificates = RevokedCertificates(1) self.crlExtensions = Extensions() self.explicitCrlExtensions = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.crlExtensions, 1 ) contents = [ self.version, self.signature, self.issuer, self.thisUpdate, self.nextUpdate, self.revokedCertificates, self.explicitCrlExtensions ] Sequence.__init__(self, contents, optional, default) _addFragment('''
CertificateList Sequence
Setting <classname>CertificateList</classname> now = POW.pkix.time2gen( time.time() ) then = POW.pkix.time2gen(time.time() + 60*60*24*365*12) rsa = POW.Asymmetric() crl = POW.pkix.CertificateList() crl.setThisUpdate( ('generalTime', now ) ) name = ( (( o2i('countryName'), ('printableString', 'GB') ),), (( o2i('stateOrProvinceName'), ('printableString', 'Hertfordshire') ),), (( o2i('organizationName'), ('printableString', 'The House') ),), (( o2i('commonName'), ('printableString', 'Client') ),) ) myRevocations = ( (1, ('generalTime', now), ()), (2, ('generalTime', now), ()), (3, ('generalTime', now), (( o2i('cRLReason'), 0, 1),)) ) crl.setIssuer(name) crl.setRevokedCertificates( myRevocations ) crl.sign(rsa, POW.MD5_DIGEST)
''') class CertificateList(Sequence): _addFragment('''
CertificateList optional=0 default=''
''') def __init__(self, optional=0, default=''): self.tbs = TbsCertList() self.signatureAlgorithm = AlgorithmIdentifier() self.signature = AltBitString() contents = [self.tbs, self.signatureAlgorithm, self.signature] Sequence.__init__(self, contents, optional, default) _addFragment('''
CertificateList setVersion version
This function sets an Integer object. 0 indicates a version 1 CRL, and 1 a version 2 CRL.
''') def setVersion(self, version): self.tbs.version.set(version) _addFragment('''
CertificateList getVersion
This function returns whatever the version object is set to, this should be 0, 1 or 2.
''') def getVersion(self): return self.tbs.version.get() _addFragment('''
CertificateList setIssuer names
This function sets an Name object.
''') def setIssuer(self, issuer): self.tbs.issuer.set(issuer) _addFragment('''
CertificateList getIssuer
This function returns a complex tuple containing other tuples.
''') def getIssuer(self): return self.tbs.issuer.get() _addFragment('''
setThisUpdate setNotBefore time
This function sets a Choice object. It can be either a GeneralTime or UTCTime object. The functions gen2time, utc2time, time2gen and time2utc can be used to convert to and from integer times and their string representation. <function>setNotBefore</function> method usage crl = POW.pkix.CertificateList() now = POW.pkix.time2gen( time.time() ) crl.setNotBefore( ('generalTime', now) )
''') def setThisUpdate(self, nu): self.tbs.thisUpdate.set(nu) _addFragment('''
CertificateList getThisUpdate
This function returns a tuple containing two strings. The first is either 'utcTime' or 'generalTime' and the second is the time value as a string.
''') def getThisUpdate(self): return self.tbs.thisUpdate.get() _addFragment('''
CertificateList setNextUpdate
See set setThisUpdate.
''') def setNextUpdate(self, nu): self.tbs.nextUpdate.set(nu) _addFragment('''
CertificateList getNextUpdate
See set getThisUpdate.
''') def getNextUpdate(self): return self.tbs.nextUpdate.get() _addFragment('''
CertificateList setExtensions extns
This method sets an Extensions object, defined as SEQUENCE OF Extension. The parameter extns should consist of a list or tuple of values suitable to set an extension. See the extension class for details.
''') def setExtensions(self, extns): self.tbs.crlExtensions.set(extns) _addFragment('''
CertificateList getExtensions
This function returns a tuple of Extension values. See Extension for details.
''') def getExtensions(self): return self.tbs.crlExtensions.get() def getExtension(self, oid): for x in self.getExtensions(): if x[0] == oid: return x return None _addFragment('''
CertificateList setRevokedCertificates
This function sets a sequence of revokedCertificate objects. This object is optional. See CertificateList for an example of its use.
''') def setRevokedCertificates(self, rc): self.tbs.revokedCertificates.set(rc) _addFragment('''
CertificateList getRevokedCertificates
This function return a sequence of revokedCertificate objects or None.
''') def getRevokedCertificates(self): return self.tbs.revokedCertificates.get() _addFragment('''
Certificate sign
This function updates structured of the certificateList and tBSCertList as appropriate, performs the specified digest on the tBSCertList and sets signedValue to signed the digest.
''') def sign(self, rsa, digestType): driver = getCryptoDriver() oid = driver.getOID(digestType) self.tbs.signature.set([oid, None]) signedText = driver.sign(rsa, oid, self.tbs.toString()) self.signatureAlgorithm.set([oid, None]) self.signature.set(signedText) _addFragment('''
CertificateList verify
This function works out what kind of digest was used to during signing, calculates the digest of tBSCertList and verifies the signedText using the key.
''') def verify(self, rsa): driver = getCryptoDriver() oid = self.signatureAlgorithm.get()[0] return driver.verify(rsa, oid, self.tbs.toString(), self.signature.get()) #---------- CRL ----------# #---------- PKCS10 ----------# # My ASN.1-foo (and perhaps this ASN.1 implementation) isn't quite up # to X.501 or PKCS #10, so this is partly based on a dump of what # OpenSSL generates, and doesn't handle attributes other than X.509v3 # extensions. class PKCS10AttributeSet(SetOf): def __init__(self, optional=0, default=''): SetOf.__init__(self, Extensions, optional, default) class PKCS10AttributeChoice(Choice): def __init__(self, optional=0, default=''): choices = { 'single' : Extensions(), 'set' : PKCS10AttributeSet() } Choice.__init__(self, choices, optional, default) class PKCS10Attributes(Sequence): def __init__(self, optional=1, default=''): self.oid = Oid() self.val = PKCS10AttributeChoice() contents = [ self.oid, self.val ] Sequence.__init__(self, contents, optional, default) class CertificationRequestInfo(Sequence): def __init__(self, optional=0, default=''): self.version = Integer() self.subject = Name() self.subjectPublicKeyInfo = SubjectPublicKeyInfo() self.attributes = PKCS10Attributes() self.explicitAttributes = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.attributes) contents = [ self.version, self.subject, self.subjectPublicKeyInfo, self.explicitAttributes ] Sequence.__init__(self, contents, optional, default) class CertificationRequest(Sequence): def __init__(self, optional=0, default=''): self.certificationRequestInfo = CertificationRequestInfo() self.signatureAlgorithm = AlgorithmIdentifier() self.signatureValue = AltBitString() contents = [ self.certificationRequestInfo, self.signatureAlgorithm, self.signatureValue ] Sequence.__init__(self, contents, optional, default) def sign(self, rsa, digestType): driver = getCryptoDriver() oid = driver.getOID(digestType) self.certificationRequestInfo.subjectPublicKeyInfo.fromString(driver.toPublicDER(rsa)) signedText = driver.sign(rsa, oid, self.certificationRequestInfo.toString()) self.signatureAlgorithm.set([oid, None]) self.signatureValue.set(signedText) def verify(self): driver = getCryptoDriver() oid = self.signatureAlgorithm.get()[0] rsa = driver.fromPublicDER(self.certificationRequestInfo.subjectPublicKeyInfo.toString()) return driver.verify(rsa, oid, self.certificationRequestInfo.toString(), self.signatureValue.get()) def getExtensions(self): oid = self.certificationRequestInfo.attributes.oid.get() if oid is None: return () if oid != (1, 2, 840, 113549, 1, 9, 14) or \ self.certificationRequestInfo.attributes.val.choice != "set" or \ len(self.certificationRequestInfo.attributes.val.choices["set"]) > 1: raise DerError, "failed to understand X.501 Attribute encoding, sorry: %s" % self.get() return self.certificationRequestInfo.attributes.val.choices["set"][0].get() def getExtension(self, oid): for x in self.getExtensions(): if x[0] == oid: return x return None def setExtensions(self, exts): self.certificationRequestInfo.attributes.oid.set((1, 2, 840, 113549, 1, 9, 14)) self.certificationRequestInfo.attributes.val.set(("set", [exts])) #---------- PKCS10 ----------# #---------- GeneralNames object support ----------# class OtherName(Sequence): def __init__(self, optional=0, default=''): self.typeId = Oid() self.any = Any() contents = [self.typeId, self.any] Sequence.__init__(self, contents, optional, default) class EdiPartyName(Sequence): def __init__(self, optional=0, default=''): self.nameAssigner = DirectoryString() self.partyName = DirectoryString() self.explicitNameAssigner = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.nameAssigner, 1 ) self.explicitPartyName = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 1, self.partyName ) contents = [ self.explicitNameAssigner, self.explicitPartyName ] Sequence.__init__(self, contents, optional, default) class IpAddress(OctetString): pass class GeneralName(Choice): def __init__(self, optional=0, default=''): otherName = OtherName() otherName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 0 ) rfc822Name = IA5String() rfc822Name.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 ) dnsName = IA5String() dnsName.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 ) directoryName = Name() explicitDirectoryName = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 4, directoryName) ediPartyName = EdiPartyName() ediPartyName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 5 ) uri = IA5String() uri.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 6 ) ipAddress = IpAddress() ipAddress.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 7 ) registeredId = Oid() registeredId.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 8 ) choices = { 'otherName' : otherName , 'rfc822Name' : rfc822Name , 'dNSName' : dnsName , 'directoryName' : explicitDirectoryName , 'ediPartyName' : ediPartyName , 'uri' : uri , 'iPAddress' : ipAddress , 'registeredId' : registeredId } Choice.__init__(self, choices, optional, default) class GeneralNames(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, GeneralName, optional, default) #---------- GeneralNames object support ----------# #---------- X509v3 extensions ----------# _addFragment('''
BasicConstraints Sequence
This little extension has recently caused plenty of problems for several large organisations. It consist of a Boolean and an Integer. The first indicates if the owner is a CA, the second indicates how long a chain of CAs you should trust which the subject of this certificate trusts. Setting <classname>BasicConstraints</classname> bc = BasicConstraints() bc.set( (1, 1) )
''') class BasicConstraints(Sequence): _addFragment('''
BasicConstraints optional=0 default=''
''') def __init__(self, optional=0, default=''): self.ca = Boolean(0, 'AQEA\n') self.pathLenConstraint = Integer(1) contents = [self.ca, self.pathLenConstraint] Sequence.__init__(self, contents, optional, default) _addFragment('''
KeyUsage BitString
''') class KeyUsage(BitString): pass _addFragment('''
SubjectAltName GeneralNames
''') class SubjectAltName(GeneralNames): pass _addFragment('''
IssuerAltName GeneralNames
''') class IssuerAltName(GeneralNames): pass _addFragment('''
SubjectKeyIdentifier OctetString
''') class SubjectKeyIdentifier(OctetString): pass _addFragment('''
AuthorityKeyIdentifier Sequence
Setting <classname>AuthorityKeyIdentifier</classname> id = AuthorityKeyIdentifier() authdigest = POW.Digest( POW.SHA1_DIGEST ) authdigest.update(rsa.derWrite(POW.RSA_PUBLIC_KEY)) keyHash = authdigest.digest() id.set( (keyHash, None, None) )
''') class AuthorityKeyIdentifier(Sequence): _addFragment('''
AuthorityKeyIdentifier optional=0 default=''
''') def __init__(self, optional=0, default=''): self.keyIdentifier = OctetString(1) self.keyIdentifier.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 0 ) self.authorityCertIssuer = GeneralNames(1) self.authorityCertIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 1 ) self.authorityCertSerialNumber = Integer(1) self.authorityCertSerialNumber.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 ) contents = [self.keyIdentifier, self.authorityCertIssuer, self.authorityCertSerialNumber] Sequence.__init__(self, contents, optional, default) _addFragment('''
PrivateKeyUsagePeriod Sequence
Setting <classname>PrivateKeyUsagePeriod</classname> period = PrivateKeyUsagePeriod() period.set( ( time2gen( time.time() ), None) )
''') class PrivateKeyUsagePeriod(Sequence): _addFragment('''
PrivateKeyUsagePeriod optional=0 default=''
''') def __init__(self, optional=0, default=''): self.notBefore = GeneralizedTime() self.notBefore.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 0 ) self.notAfter = GeneralizedTime() self.notAfter.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 ) contents = [self.notBefore, self.notAfter] Sequence.__init__(self, contents, optional, default) class DisplayText(Choice): def __init__(self, optional=0, default=''): choices = { 'visibleString' : VisibleString(), 'bmpString' : BmpString(), 'utf8String' : Utf8String() } Choice.__init__(self, choices, optional, default) class NoticeNumbers(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, Integer, optional, default) class NoticeReference(Sequence): def __init__(self, optional=0, default=''): self.organization = DisplayText() self.noticeNumbers = NoticeNumbers() contents = [self.organization, self.noticeNumbers] Sequence.__init__(self, contents, optional, default) class UserNotice(Sequence): def __init__(self, optional=0, default=''): self.noticeRef = NoticeReference(1) self.explicitText = DisplayText(1) contents = [self.noticeRef, self.explicitText] Sequence.__init__(self, contents, optional, default) class Qualifier(Choice): def __init__(self, optional=0, default=''): choices = { 'cPSuri' : IA5String(), 'userNotice' : UserNotice() } Choice.__init__(self, choices, optional, default) class PolicyQualifierInfo(Sequence): def __init__(self, optional=0, default=''): self.policyQualifierId = Oid() self.qualifier = Qualifier() contents = [self.policyQualifierId, self.qualifier] Sequence.__init__(self, contents, optional, default) class PolicyQualifiers(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, PolicyQualifierInfo, optional, default) class PolicyInformation(Sequence): def __init__(self, optional=0, default=''): self.policyIdentifier = Oid() self.policyQualifiers = PolicyQualifiers(1) contents = [self.policyIdentifier, self.policyQualifiers] Sequence.__init__(self, contents, optional, default) _addFragment('''
CertificatePolicies SequenceOf
Setting <classname>CertificatePolicies</classname> data = ( ( o2i('id-cti-ets-proofOfReceipt'), ( (o2i('cps'), ('cPSuri', 'http://www.p-s.org.uk/policies/policy1')), (o2i('unotice'), ( 'userNotice', ((('visibleString', 'The House'),(1,2,3)), ('visibleString', 'We guarentee nothing')))), )), ( o2i('id-cti-ets-proofOfOrigin'), ( (o2i('cps'), ('cPSuri', 'http://www.p-s.org.uk/policies/policy2')), )) ) policies = CertificatePolicies() policies.set( data )
''') class CertificatePolicies(SequenceOf): _addFragment('''
CertificatePolicies optional=0 default=''
''') def __init__(self, optional=0, default=''): SequenceOf.__init__(self, PolicyInformation, optional, default) class DistributionPointName(Choice): def __init__(self, optional=0, default=''): fullName = GeneralNames() fullName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 0 ) nameRelativeToCRLIssuer = RelativeDistinguishedName() nameRelativeToCRLIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 1 ) choices = { 'fullName' : fullName, 'nameRelativeToCRLIssuer ' : nameRelativeToCRLIssuer } Choice.__init__(self, choices, optional, default) class DistributionPoint(Sequence): def __init__(self, optional=0, default=''): self.distributionPoint = DistributionPointName(1) self.explicitDistributionPoint = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.distributionPoint) self.reasons = BitString(1) self.reasons.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 ) self.cRLIssuer = GeneralNames(1) self.cRLIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 2 ) contents = [self.explicitDistributionPoint, self.reasons, self.cRLIssuer] Sequence.__init__(self, contents, optional, default) _addFragment('''
CRLDistrobutionPoints SequenceOf
Setting <classname>CRLDistrobutionPoints</classname> n1 = ('directoryName', ( (( o2i('countryName'), ('printableString', 'UK') ),), (( o2i('stateOrProvinceName'), ('printableString', 'Herts') ),), (( o2i('organizationName'), ('printableString', 'The House') ),), (( o2i('commonName'), ('printableString', 'Shannon Works') ),) ) ) n2 = ('iPAddress', POW.pkix.ip42oct(192,168,100,51)) data = ( ( ('fullName',(n1, n2)), (1,1,1,1,1), (n1,) ), ) points = CRLDistrobutionPoints() points.set( data )
''') class CRLDistributionPoints(SequenceOf): _addFragment('''
CRLDistrobutionPoints optional=0 default=''
''') def __init__(self, optional=0, default=''): SequenceOf.__init__(self, DistributionPoint, optional, default) _addFragment('''
CrlNumber Integer
''') class CrlNumber(Integer): pass _addFragment('''
DeltaCrlIndicator Integer
''') class DeltaCrlIndicator(Integer): pass _addFragment('''
InvalidityDate GeneralizedTime
''') class InvalidityDate(GeneralizedTime): pass _addFragment('''
CrlReason Enum
''') class CrlReason(Enum): pass _addFragment('''
IPAddressRange Sequence
''') class IPAddressRange(Sequence): def __init__(self, optional=0, default=''): self.min = BitString() self.max = BitString() contents = [ self.min, self.max ] Sequence.__init__(self, contents, optional, default) _addFragment('''
IPAddressOrRange Choice
''') class IPAddressOrRange(Choice): def __init__(self, optional=0, default=''): choices = { 'addressPrefix' : BitString(), 'addressRange' : IPAddressRange() } Choice.__init__(self, choices, optional, default) _addFragment('''
IPAddressesOrRanges SequenceOf
''') class IPAddressesOrRanges(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, IPAddressOrRange, optional, default) _addFragment('''
IPAddressChoice Choice
''') class IPAddressChoice(Choice): def __init__(self, optional=0, default=''): choices = { 'inherit' : Null(), 'addressesOrRanges' : IPAddressesOrRanges() } Choice.__init__(self, choices, optional, default) _addFragment('''
IPAddressFamily Sequence
''') class IPAddressFamily(Sequence): def __init__(self, optional=0, default=''): self.addressFamily = OctetString() self.ipAddressChoice = IPAddressChoice() contents = [ self.addressFamily, self.ipAddressChoice ] Sequence.__init__(self, contents, optional, default) _addFragment('''
IPAddrBlocks SequenceOf
Implementation of RFC 3779 section 2.2.3.
''') class IPAddrBlocks(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, IPAddressFamily, optional, default) _addFragment('''
ASRange Sequence
''') class ASRange(Sequence): def __init__(self, optional=0, default=''): self.min = Integer() self.max = Integer() contents = [ self.min, self.max ] Sequence.__init__(self, contents, optional, default) _addFragment('''
ASIdOrRange Choice
''') class ASIdOrRange(Choice): def __init__(self, optional=0, default=''): choices = { 'id' : Integer(), 'range' : ASRange() } Choice.__init__(self, choices, optional, default) _addFragment('''
ASIdsOrRanges SequenceOf
''') class ASIdsOrRanges(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, ASIdOrRange, optional, default) _addFragment('''
ASIdentifierChoice Choice
''') class ASIdentifierChoice(Choice): def __init__(self, optional=0, default=''): choices = { 'inherit' : Null(), 'asIdsOrRanges' : ASIdsOrRanges() } Choice.__init__(self, choices, optional, default) _addFragment('''
ASIdentifiers Sequence
Implementation of RFC 3779 section 3.2.3.
''') class ASIdentifiers(Sequence): def __init__(self, optional=0, default=''): # # This is what we -should- be doing #self.asnum = ASIdentifierChoice() #self.rdi = ASIdentifierChoice() #self.explicitAsnum = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.asnum, 1) #self.explictRdi = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 1, self.rdi, 1) #contents = [ self.explicitAsnum, self.explictRdi ] # # ...but it generates a spurious empty RDI clause, so try this instead # since we know that we never use RDI anyway. self.asnum = ASIdentifierChoice() self.explicitAsnum = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.asnum, 1) contents = [ self.explicitAsnum ] # Sequence.__init__(self, contents, optional, default) def set(self, values): assert len(values) == 1 or (len(values) == 2 and values[1] is None) Sequence.set(self, (values[0],)) _addFragment('''
AccessDescription Sequence
''') class AccessDescription(Sequence): def __init__(self, optional=0, default=''): self.accessMethod = Oid() self.accessLocation = GeneralName() contents = [ self.accessMethod, self.accessLocation ] Sequence.__init__(self, contents, optional, default) _addFragment('''
AuthorityInfoAccess SequenceOf
Implementation of RFC 3280 section 4.2.2.1.
''') class AuthorityInfoAccess(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, AccessDescription, optional, default) _addFragment('''
SubjectInfoAccess SequenceOf
Implementation of RFC 3280 section 4.2.2.2.
''') class SubjectInfoAccess(SequenceOf): def __init__(self, optional=0, default=''): SequenceOf.__init__(self, AccessDescription, optional, default) #---------- X509v3 extensions ----------# _addFragment('''
Extension Sequence
This class is a useful little object. It is set by passing three values: an oid, an integer(a boolean really) and a value. The boolean indicates if this extension is critical. The value is used to set the extension once it has been created. The oid is used to create the correct object which, to be fully supported it must be one of these: basicConstraints subjectAltName issuerAltName authorityKeyIdentifier privateKeyUsagePeriod certificatePolicies cRLDistributionPoints subjectKeyIdentifier keyUsage crlNumber deltaCrlIndicator invalidityDate crlReason Setting <classname>Extension</classname> extn = Extension() email = ('rfc822Name', 'peter_shannon@yahoo.com') extn.set( (obj2oid('subjectAltName'),1, (email,)) )
''') class Extension(Sequence): classMap = { (2, 5, 29, 19) : BasicConstraints, (2, 5, 29, 17) : SubjectAltName, (2, 5, 29, 18) : IssuerAltName, (2, 5, 29, 35) : AuthorityKeyIdentifier, (2, 5, 29, 16) : PrivateKeyUsagePeriod, (2, 5, 29, 32) : CertificatePolicies, (2, 5, 29, 31) : CRLDistributionPoints, (2, 5, 29, 14) : SubjectKeyIdentifier, (2, 5, 29, 15) : KeyUsage, (2, 5, 29, 20) : CrlNumber, (2, 5, 29, 27) : DeltaCrlIndicator, (2, 5, 29, 24) : InvalidityDate, (2, 5, 29, 21) : CrlReason, (1, 3, 6, 1, 5, 5, 7, 1, 1) : AuthorityInfoAccess, (1, 3, 6, 1, 5, 5, 7, 1, 7) : IPAddrBlocks, (1, 3, 6, 1, 5, 5, 7, 1, 8) : ASIdentifiers, (1, 3, 6, 1, 5, 5, 7, 1, 11) : SubjectInfoAccess, } # Missing -- fix later # extendedKeyUsage # privateKeyUsagePeriod # policyMappings # nameConstraints # policyConstraints # subjectDirectoryAttributes # instructionCode # issuingDistrobutionPoint def __init__(self, optional=0, default=''): self.extnID = Oid() self.critical = Boolean(0, 'AQEA') self.extnValue = OctetString() contents = [self.extnID, self.critical, self.extnValue] Sequence.__init__(self, contents, optional, default) _addFragment('''
Extension set values
values should be a sequence of three values, the oid, critical marker and a value to set the extension. If an unknown oid is passed to this function it will raise an exception. critical is a boolean. value will be used to set the extension after it has been created.
''') def set(self, (oid, critical, val) ): self.extnID.set( oid ) self.critical.set( critical ) extnObj = None if self.classMap.has_key(oid): extnObj = self.classMap[oid]() else: if not (isinstance(oid, types.TupleType) or isinstance(oid, types.ListType)): raise DerError, 'the oid should be specified as a sequence of integers' else: raise DerError, 'unknown object extension %s' % oid try: extnObj.set( val ) self.extnValue.set( extnObj.toString() ) except DerError, e: raise DerError, 'failed to set %s, with:\n\t%s\nresulting in:\n\t%s' % (oid, val, `e`) _addFragment('''
Extension get
There are several ways this function might fail to decode an extension. Firstly if the extension was marked critical but if the oid cannot be mapped to a class or If a failure occurs decoding the extnValue, an exception will be raised. If a failure occurred and the extension was not marked critical it will return a tuple like this: (oid, critical, ()). If no failures occur a tuple will be returned, containg the oid, critical and extension values.
''') def get(self): oid = self.extnID.get() critical = self.critical.get() if self.classMap.has_key(oid): extnObj = self.classMap[oid]() else: if critical: raise DerError, 'failed to read critical extension %s' % str(oid) else: return (oid, critical, ()) try: extnObj = self.classMap[oid]() extnObj.fromString(self.extnValue.get()) value = extnObj.get() except: if critical: raise DerError, 'failed to read critical extension %s' % str(oid) else: return (oid, critical, ()) return (oid, critical, value)