#*****************************************************************************#
#* *#
#* Copyright (c) 2002, Peter Shannon *#
#* All rights reserved. *#
#* *#
#* Redistribution and use in source and binary forms, with or without *#
#* modification, are permitted provided that the following conditions *#
#* are met: *#
#* *#
#* * Redistributions of source code must retain the above *#
#* copyright notice, this list of conditions and the following *#
#* disclaimer. *#
#* *#
#* * Redistributions in binary form must reproduce the above *#
#* copyright notice, this list of conditions and the following *#
#* disclaimer in the documentation and/or other materials *#
#* provided with the distribution. *#
#* *#
#* * The name of the contributors may be used to endorse or promote *#
#* products derived from this software without specific prior *#
#* written permission. *#
#* *#
#* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS *#
#* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT *#
#* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS *#
#* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS *#
#* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *#
#* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *#
#* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, *#
#* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY *#
#* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT *#
#* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE *#
#* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *#
#* *#
#*****************************************************************************#
import types, time, pprint, cStringIO, POW, _der
from _simpledb import OidData as _OidData
from _der import *
DEBUG = 0
_oidData = _OidData()
obj2oid = _oidData.obj2oid
oid2obj = _oidData.oid2obj
_fragments = []
def _docset():
return _der._docset() + _fragments
def _addFragment(frag):
global _fragments
_fragments.append(frag)
_addFragment('''
This is a helper function for turning a UTCTime string into an
integer. It isn't built into the encoder since the various
functions which are used to manipulate the tm structure are
notoriously unreliable.
''')
def utc2time(val):
'der encoded value not including tag or length'
if not isinstance(val, types.StringType):
raise DerError, 'argument should be a string'
t = time.strptime(val, '%y%m%d%H%M%SZ')
return int(time.mktime(t))
_addFragment('''
This is a helper function for turning an integer into a
UTCTime string. It isn't built into the encoder since the
various functions which are used to manipulate the tm structure
are notoriously unreliable.
''')
def time2utc(val):
'numerical time value like time_t'
val = int(val)
t = time.gmtime(val)
return time.strftime('%y%m%d%H%M%SZ', t)
_addFragment('''
This is a helper function for turning a GeneralizedTime string into an
integer. It isn't built into the encoder since the various
functions which are used to manipulate the tm structure are
notoriously unreliable.
''')
def gen2Time(val):
'der encoded value not including tag or length'
if not isinstance(val, types.StringType):
raise DerError, 'argument should be a string'
t = time.strptime(val, '%Y%m%d%H%M%SZ')
return int(time.mktime(t))
_addFragment('''
This is a helper function for turning an integer into a
GeneralizedTime string. It isn't built into the encoder since the
various functions which are used to manipulate the tm structure
are notoriously unreliable.
''')
def time2gen(val):
'numerical time value like time_t'
val = int(val)
t = time.gmtime(val)
return time.strftime('%Y%m%d%H%M%SZ', t)
_addFragment('''
ip should be a list or tuple of integers,
from 0 to 256.
Setting IpAddress
ip = IpAddress()
ip.set( ip42oct(192, 168, 0, 231) )
''')
def ip42oct(val0, val1, val2, val3):
return chr(val0) + chr(val1) + chr(val2) + chr(val3)
_addFragment('''
Returns a tuple of 4 integers, from 0 to 256.
''')
def oct2ip4(val):
if not isinstance(val, types.StringType) or len(val) != 4:
raise DerError, 'parameter should be string of 4 characters'
return ( ord(val[0]), ord(val[1]), ord(val[2]), ord(val[3]) )
#---------- certificate support ----------#
class TbsCertificate(Sequence):
def __init__(self, optional=0, default=''):
self.version = Integer()
self.explicitVersion = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.version, 0, 'oAMCAQA=\n' )
self.serial = Integer()
self.signature = AlgorithmIdentifier()
self.issuer = Name()
self.subject = Name()
self.subjectPublicKeyInfo = SubjectPublicKeyInfo()
self.validity = Validity()
self.issuerUniqueID = BitString(1)
self.issuerUniqueID.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 )
self.subjectUniqueID = BitString(1)
self.subjectUniqueID.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 )
self.extensions = Extensions()
self.explicitExtensions = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 3, self.extensions, 1 )
contents = [
self.explicitVersion,
self.serial,
self.signature,
self.issuer,
self.validity,
self.subject,
self.subjectPublicKeyInfo,
self.issuerUniqueID,
self.subjectUniqueID,
self.explicitExtensions
]
Sequence.__init__(self, contents, optional, default)
class Validity(Sequence):
def __init__(self, optional=0, default=''):
Time = lambda : Choice({ 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() })
self.notBefore = Time()
self.notAfter = Time()
contents = [self.notBefore, self.notAfter]
Sequence.__init__(self, contents, optional, default)
class DirectoryString(Choice):
def __init__(self, optional=0, default=''):
choices = { 'teletexString' : T61String(),
'printableString' : PrintableString(),
'universalString' : UniversalString(),
'bmpString' : BmpString(),
'utf8String' : Utf8String() }
Choice.__init__(self, choices, optional, default)
class AttributeTypeAndValue(Sequence):
def __init__(self, optional=0, default=''):
self.type = Oid()
self.dirstr = DirectoryString()
contents = [ self.type, self.dirstr ]
Sequence.__init__(self, contents, optional, default)
class RelativeDistinguishedName(SetOf):
def __init__(self, optional=0, default=''):
SetOf.__init__(self, AttributeTypeAndValue, optional, default)
class Name(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, RelativeDistinguishedName, optional, default)
class AlgorithmIdentifier(Sequence):
def __init__(self, optional=0, default=''):
self.algorithm = Oid()
self.parameters = Null()
contents = [self.algorithm, self.parameters]
Sequence.__init__(self, contents, optional, default)
class SubjectPublicKeyInfo(Sequence):
def __init__(self, optional=0, default=''):
self.algorithmId = AlgorithmIdentifier()
self.subjectPublicKey = AltBitString()
contents = [ self.algorithmId, self.subjectPublicKey ]
Sequence.__init__(self, contents, optional, default)
class Extensions(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, Extension, optional, default)
_addFragment('''
Setting Certificate
rsa = POW.Asymmetric()
cert = POW.pkix.Certificate()
cert.setVersion(1)
cert.setSerial(5)
name = ( (( o2i('countryName'), ('printableString', 'GB') ),),
(( o2i('stateOrProvinceName'), ('printableString', 'Hertfordshire') ),),
(( o2i('organizationName'), ('printableString', 'The House') ),),
(( o2i('commonName'), ('printableString', 'Client') ),) )
cert.setIssuer(name)
cert.setSubject(name)
now = POW.pkix.time2gen( time.time() )
then = POW.pkix.time2gen(time.time() + 60*60*24*365*12)
cert.setNotBefore( ('generalTime', now) )
cert.setNotAfter( ( 'generalTime', then) )
cert.setIssuerUniqueID((1,0,1,0))
cert.setSubjectUniqueID((1,0,0,1))
cert.sign(rsa, POW.MD5_DIGEST)
''')
class Certificate(Sequence):
_addFragment('''
Certificate
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
self.tbs = TbsCertificate()
self.signatureAlgorithm = AlgorithmIdentifier()
self.signatureValue = AltBitString()
contents = [ self.tbs, self.signatureAlgorithm, self.signatureValue ]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
Certificate
setVersion
version
This function sets an Integer object. 0
indicates a version 1 certificate, 1 a version 2 certificate and 2 a
version 3 certificate.
''')
def setVersion(self, version):
self.tbs.version.set(version)
_addFragment('''
This function returns whatever the version object is set to,
this should be 0, 1 or 2.
''')
def getVersion(self):
return self.tbs.version.get()
_addFragment('''
Certificate
setSerial
serial
This function sets an Integer object.
No two certificates issued should ever have the same serial
number.
''')
def setSerial(self, serial):
self.tbs.serial.set(serial)
_addFragment('''
This function returns whatever the serial object is set to.
''')
def getSerial(self):
return self.tbs.serial.get()
_addFragment('''
Certificate
setIssuer
names
This function sets an Name object.
See Certificate class for an example.
''')
def setIssuer(self, issuer):
self.tbs.issuer.set(issuer)
_addFragment('''
This function returns a complex tuple containing other tuples.
''')
def getIssuer(self):
return self.tbs.issuer.get()
_addFragment('''
Certificate
setSubject
names
This function sets an Name object.
See Certificate class for an example.
''')
def setSubject(self, subject):
self.tbs.subject.set(subject)
_addFragment('''
This function returns a complex tuple containing other tuples.
''')
def getSubject(self):
return self.tbs.subject.get()
_addFragment('''
Certificate
setNotBefore
time
This function sets a Choice object.
It can be either a GeneralTime or
UTCTime object. The functions
gen2time, utc2time,
time2gen and time2utc
can be used to convert to and from integer times and their
string representation.
setNotBefore method usage
cert = POW.pkix.Certificate()
now = POW.pkix.time2gen( time.time() )
cert.setNotBefore( ('generalTime', now) )
''')
def setNotBefore(self, nb):
self.tbs.validity.notBefore.set(nb)
_addFragment('''
This function returns a tuple indicating which type of time was
stored and its value. See setNotBefore for details.
''')
def getNotBefore(self):
return self.tbs.validity.notBefore.get()
_addFragment('''
Certificate
setNotAfter
time
This function sets a Choice object.
See setNotBefore for details.
''')
def setNotAfter(self, na):
self.tbs.validity.notAfter.set(na)
_addFragment('''
This function returns a tuple indicating which type of time was
stored and its value. See setNotBefore for details.
''')
def getNotAfter(self):
return self.tbs.validity.notAfter.get()
_addFragment('''
Certificate
setIssuerUniqueID
id
This function sets a BitString object.
This is part of the X509v2 standard and is quite poorly
regarded in general, its use is not recommended. It is set
using the normal BitString method, that
is with a sequence of true/false objects.
''')
def setIssuerUniqueID(self, id):
self.tbs.issuerUniqueID.set(id)
_addFragment('''
Certificate
getIssuerUniqueID
This function returns a tuple of integers, 1 or 0.
''')
def getIssuerUniqueID(self):
return self.tbs.issuerUniqueID.get()
_addFragment('''
Certificate
setSubjectUniqueID
id
This function sets a BitString object.
This is part of the X509v2 standard and is quite poorly
regarded in general, its use is not recommended. It is set
using the normal BitString method, that
is with a sequence of true/false objects.
''')
def setSubjectUniqueID(self, id):
self.tbs.subjectUniqueID.set(id)
_addFragment('''
Certificate
getSubjectUniqueID
This function returns a tuple of integers, 1 or 0.
''')
def getSubjectUniqueID(self):
return self.tbs.subjectUniqueID.get()
_addFragment('''
Certificate
setExtensions
extns
This method sets an Extensions object,
defined as SEQUENCE OF Extension. The parameter
extns should consist of a list or tuple
of values suitable to set an extension. See the extension
class for details.
''')
def setExtensions(self, extns):
self.tbs.extensions.set(extns)
_addFragment('''
Certificate
getExtensions
This function returns a tuple of
Extension values. See
Extension for details.
''')
def getExtensions(self):
return self.tbs.extensions.get()
_addFragment('''
Certificate
sign
rsa
digestType
This function updates structured of the
Certificate and
tbs as appropriate and performs the
specified digest on the tbs and set
signedText to signed the digest.
''')
def sign(self, rsa, digestType):
signatureMap = { POW.MD2_DIGEST : (1, 2, 840, 113549, 1, 1, 2), # md2WithRSAEncryption
POW.MD5_DIGEST : (1, 2, 840, 113549, 1, 1, 4), # md5WithRSAEncryption
POW.SHA_DIGEST : (1, 3, 14, 3, 2, 15), # shaWithRSAEncryption
POW.SHA1_DIGEST : (1, 2, 840, 113549, 1, 1, 5), # sha1withRSAEncryption
POW.RIPEMD160_DIGEST : (1, 2, 840, 113549, 1, 1, 6) } # ripemd160WithRSAEncryption
self.tbs.subjectPublicKeyInfo.set( (((1, 2, 840, 113549, 1, 1, 1), None), rsa.derWrite(POW.RSA_PUBLIC_KEY) ) )
self.tbs.signature.set( [signatureMap[digestType], None] )
digest = POW.Digest(digestType)
digest.update( self.tbs.toString() )
signedText = rsa.sign( digest.digest(), digestType )
self.signatureAlgorithm.set( [signatureMap[digestType], None] )
self.signatureValue.set(signedText)
_addFragment('''
This function works out what kind of digest was used to
during signing, calculates the digest of
tbs and verifies the envelope using the
key.
''')
def verify(self, rsa):
signatureMap = { (1, 2, 840, 113549, 1, 1, 2) : POW.MD2_DIGEST, # md2WithRSAEncryption
(1, 2, 840, 113549, 1, 1, 4) : POW.MD5_DIGEST, # md5WithRSAEncryption
(1, 3, 14, 3, 2, 15) : POW.SHA_DIGEST, # shaWithRSAEncryption
(1, 2, 840, 113549, 1, 1, 5) : POW.SHA1_DIGEST, # sha1withRSAEncryption
(1, 2, 840, 113549, 1, 1, 6) : POW.RIPEMD160_DIGEST } # ripemd160WithRSAEncryption
digestOid = self.signatureAlgorithm.get()[0]
digestType = signatureMap[digestOid]
digest = POW.Digest(digestType)
digest.update( self.tbs.toString() )
return rsa.verify( self.signatureValue.get(), digest.digest(), digestType )
#---------- certificate support ----------#
#---------- CRL ----------#
class RevokedCertificate(Sequence):
def __init__(self, optional=0, default=''):
self.userCertificate = Integer()
self.revocationDate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() } )
self.crlEntryExtensions = Extensions(1)
contents = [ self.userCertificate, self.revocationDate, self.crlEntryExtensions ]
Sequence.__init__(self, contents, optional, default)
class RevokedCertificates(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, RevokedCertificate, optional, default)
class TbsCertList(Sequence):
def __init__(self, optional=0, default=''):
self.version = Integer(1)
self.signature = AlgorithmIdentifier()
self.issuer = Name()
self.thisUpdate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() } )
self.nextUpdate = Choice( { 'generalTime' : GeneralizedTime(), 'utcTime' : UtcTime() }, 1 )
self.revokedCertificates = RevokedCertificates(1)
self.crlExtensions = Extensions()
self.explicitCrlExtensions = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.crlExtensions, 1 )
contents = [ self.version,
self.signature,
self.issuer,
self.thisUpdate,
self.nextUpdate,
self.revokedCertificates,
self.explicitCrlExtensions ]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
Setting CertificateList
now = POW.pkix.time2gen( time.time() )
then = POW.pkix.time2gen(time.time() + 60*60*24*365*12)
rsa = POW.Asymmetric()
crl = POW.pkix.CertificateList()
crl.setThisUpdate( ('generalTime', now ) )
name = ( (( o2i('countryName'), ('printableString', 'GB') ),),
(( o2i('stateOrProvinceName'), ('printableString', 'Hertfordshire') ),),
(( o2i('organizationName'), ('printableString', 'The House') ),),
(( o2i('commonName'), ('printableString', 'Client') ),) )
myRevocations = (
(1, ('generalTime', now), ()),
(2, ('generalTime', now), ()),
(3, ('generalTime', now), (( o2i('cRLReason'), 0, 1),))
)
crl.setIssuer(name)
crl.setRevokedCertificates( myRevocations )
crl.sign(rsa, POW.MD5_DIGEST)
''')
class CertificateList(Sequence):
_addFragment('''
CertificateList
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
self.tbs = TbsCertList()
self.signatureAlgorithm = AlgorithmIdentifier()
self.signature = AltBitString()
contents = [self.tbs, self.signatureAlgorithm, self.signature]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
CertificateList
setVersion
version
This function sets an Integer object. 0
indicates a version 1 CRL, and 1 a version 2 CRL.
''')
def setVersion(self, version):
self.tbs.version.set(version)
_addFragment('''
CertificateList
getVersion
This function returns whatever the version object is set to,
this should be 0, 1 or 2.
''')
def getVersion(self):
return self.tbs.version.get()
_addFragment('''
CertificateList
setIssuer
names
This function sets an Name object.
''')
def setIssuer(self, issuer):
self.tbs.issuer.set(issuer)
_addFragment('''
CertificateList
getIssuer
This function returns a complex tuple containing other tuples.
''')
def getIssuer(self):
return self.tbs.issuer.get()
_addFragment('''
setThisUpdate
setNotBefore
time
This function sets a Choice object.
It can be either a GeneralTime or
UTCTime object. The functions
gen2time, utc2time,
time2gen and time2utc
can be used to convert to and from integer times and their
string representation.
setNotBefore method usage
crl = POW.pkix.CertificateList()
now = POW.pkix.time2gen( time.time() )
crl.setNotBefore( ('generalTime', now) )
''')
def setThisUpdate(self, nu):
self.tbs.thisUpdate.set(nu)
_addFragment('''
CertificateList
getThisUpdate
This function returns a tuple containing two strings. The first
is either 'utcTime' or 'generalTime' and the second is the time
value as a string.
''')
def getThisUpdate(self):
return self.tbs.thisUpdate.get()
_addFragment('''
CertificateList
setNextUpdate
See set setThisUpdate.
''')
def setNextUpdate(self, nu):
self.tbs.nextUpdate.set(nu)
_addFragment('''
CertificateList
getNextUpdate
See set getThisUpdate.
''')
def getNextUpdate(self):
return self.tbs.nextUpdate.get()
_addFragment('''
CertificateList
setExtensions
extns
This method sets an Extensions object,
defined as SEQUENCE OF Extension. The parameter
extns should consist of a list or tuple
of values suitable to set an extension. See the extension
class for details.
''')
def setExtensions(self, extns):
self.tbs.crlExtensions.set(extns)
_addFragment('''
CertificateList
getExtensions
This function returns a tuple of
Extension values. See
Extension for details.
''')
def getExtensions(self):
return self.tbs.crlExtensions.get()
_addFragment('''
CertificateList
setRevokedCertificates
This function sets a sequence of
revokedCertificate objects.
This object is optional. See
CertificateList for an example of its
use.
''')
def setRevokedCertificates(self, rc):
self.tbs.revokedCertificates.set(rc)
_addFragment('''
CertificateList
getRevokedCertificates
This function return a sequence of
revokedCertificate objects or None.
''')
def getRevokedCertificates(self):
return self.tbs.revokedCertificates.get()
_addFragment('''
This function updates structured of the
certificateList and
tBSCertList as appropriate, performs the
specified digest on the tBSCertList and sets
signedValue to signed the digest.
''')
def sign(self, rsa, digestType):
signatureMap = { POW.MD2_DIGEST : (1, 2, 840, 113549, 1, 1, 2), # md2WithRSAEncryption
POW.MD5_DIGEST : (1, 2, 840, 113549, 1, 1, 4), # md5WithRSAEncryption
POW.SHA_DIGEST : (1, 3, 14, 3, 2, 15), # shaWithRSAEncryption
POW.SHA1_DIGEST : (1, 2, 840, 113549, 1, 1, 5), # sha1withRSAEncryption
POW.RIPEMD160_DIGEST : (1, 2, 840, 113549, 1, 1, 6) } # ripemd160WithRSAEncryption
self.tbs.signature.set( [signatureMap[digestType], None] )
digest = POW.Digest(digestType)
digest.update( self.tbs.toString() )
signedText = rsa.sign( digest.digest(), digestType )
self.signatureAlgorithm.set( [signatureMap[digestType], None] )
self.signature.set(signedText)
_addFragment('''
This function works out what kind of digest was used to during
signing, calculates the digest of
tBSCertList and verifies the
signedText using the key.
''')
def verify(self, rsa):
signatureMap = { (1, 2, 840, 113549, 1, 1, 2) : POW.MD2_DIGEST, # md2WithRSAEncryption
(1, 2, 840, 113549, 1, 1, 4) : POW.MD5_DIGEST, # md5WithRSAEncryption
(1, 3, 14, 3, 2, 15) : POW.SHA_DIGEST, # shaWithRSAEncryption
(1, 2, 840, 113549, 1, 1, 5) : POW.SHA1_DIGEST, # sha1withRSAEncryption
(1, 2, 840, 113549, 1, 1, 6) : POW.RIPEMD160_DIGEST } # ripemd160WithRSAEncryption
digestOid = self.signatureAlgorithm.get()[0]
digestType = signatureMap[digestOid]
digest = POW.Digest(digestType)
digest.update( self.tbs.toString() )
return rsa.verify( self.signature.get(), digest.digest(), digestType )
#---------- CRL ----------#
#---------- GeneralNames object support ----------#
class OtherName(Sequence):
def __init__(self, optional=0, default=''):
self.typeId = Oid()
self.any = Any()
contents = [self.typeId, self.any]
Sequence.__init__(self, contents, optional, default)
class EdiPartyName(Sequence):
def __init__(self, optional=0, default=''):
self.nameAssigner = DirectoryString()
self.partyName = DirectoryString()
self.explicitNameAssigner = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.nameAssigner, 1 )
self.explicitPartyName = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 1, self.partyName )
contents = [ self.explicitNameAssigner, self.explicitPartyName ]
Sequence.__init__(self, contents, optional, default)
class IpAddress(OctetString):
pass
class GeneralName(Choice):
def __init__(self, optional=0, default=''):
otherName = OtherName()
otherName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 0 )
rfc822Name = IA5String()
rfc822Name.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 )
dnsName = IA5String()
dnsName.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 )
directoryName = Name()
explicitDirectoryName = Explicit( CLASS_CONTEXT, FORM_CONSTRUCTED, 4, directoryName)
ediPartyName = EdiPartyName()
ediPartyName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 5 )
uri = IA5String()
uri.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 6 )
ipAddress = IpAddress()
ipAddress.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 7 )
registeredId = Oid()
registeredId.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 8 )
choices = { 'otherName' : otherName ,
'rfc822Name' : rfc822Name ,
'dNSName' : dnsName ,
'directoryName' : explicitDirectoryName ,
'ediPartyName' : ediPartyName ,
'uri' : uri ,
'iPAddress' : ipAddress ,
'registeredId' : registeredId }
Choice.__init__(self, choices, optional, default)
class GeneralNames(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, GeneralName, optional, default)
#---------- GeneralNames object support ----------#
#---------- extensions ----------#
_addFragment('''
BasicConstraints
Sequence
This little extension has recently caused plenty of problems for
several large organisations. It consist of a
Boolean and an
Integer. The first indicates if the owner
is a CA, the second indicates how long a chain of CAs you should
trust which the subject of this certificate trusts.
Setting BasicConstraints
bc = BasicConstraints()
bc.set( (1, 1) )
''')
class BasicConstraints(Sequence):
_addFragment('''
BasicConstraints
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
self.ca = Boolean(0, 'AQEA\n')
self.pathLenConstraint = Integer(1)
contents = [self.ca, self.pathLenConstraint]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
''')
class KeyUsage(BitString):
pass
_addFragment('''
SubjectAltName
GeneralNames
''')
class SubjectAltName(GeneralNames):
pass
_addFragment('''
IssuerAltName
GeneralNames
''')
class IssuerAltName(GeneralNames):
pass
_addFragment('''
SubjectKeyIdentifier
OctetString
''')
class SubjectKeyIdentifier(OctetString):
pass
_addFragment('''
AuthorityKeyIdentifier
Sequence
Setting AuthorityKeyIdentifier
id = AuthorityKeyIdentifier()
authdigest = POW.Digest( POW.SHA1_DIGEST )
authdigest.update(rsa.derWrite(POW.RSA_PUBLIC_KEY))
keyHash = authdigest.digest()
id.set( (keyHash, None, None) )
''')
class AuthorityKeyIdentifier(Sequence):
_addFragment('''
AuthorityKeyIdentifier
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
self.keyIdentifier = OctetString(1)
self.keyIdentifier.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 0 )
self.authorityCertIssuer = GeneralNames(1)
self.authorityCertIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 1 )
self.authorityCertSerialNumber = Integer(1)
self.authorityCertSerialNumber.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 2 )
contents = [self.keyIdentifier, self.authorityCertIssuer, self.authorityCertSerialNumber]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
PrivateKeyUsagePeriod
Sequence
Setting PrivateKeyUsagePeriod
period = PrivateKeyUsagePeriod()
period.set( ( time2gen( time.time() ), None) )
''')
class PrivateKeyUsagePeriod(Sequence):
_addFragment('''
PrivateKeyUsagePeriod
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
self.notBefore = GeneralizedTime()
self.notBefore.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 0 )
self.notAfter = GeneralizedTime()
self.notAfter.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 )
contents = [self.notBefore, self.notAfter]
Sequence.__init__(self, contents, optional, default)
class DisplayText(Choice):
def __init__(self, optional=0, default=''):
choices = { 'visibleString' : VisibleString(),
'bmpString' : BmpString(),
'utf8String' : Utf8String() }
Choice.__init__(self, choices, optional, default)
class NoticeNumbers(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, Integer, optional, default)
class NoticeReference(Sequence):
def __init__(self, optional=0, default=''):
self.organization = DisplayText()
self.noticeNumbers = NoticeNumbers()
contents = [self.organization, self.noticeNumbers]
Sequence.__init__(self, contents, optional, default)
class UserNotice(Sequence):
def __init__(self, optional=0, default=''):
self.noticeRef = NoticeReference(1)
self.explicitText = DisplayText(1)
contents = [self.noticeRef, self.explicitText]
Sequence.__init__(self, contents, optional, default)
class Qualifier(Choice):
def __init__(self, optional=0, default=''):
choices = { 'cPSuri' : IA5String(),
'userNotice' : UserNotice() }
Choice.__init__(self, choices, optional, default)
class PolicyQualifierInfo(Sequence):
def __init__(self, optional=0, default=''):
self.policyQualifierId = Oid()
self.qualifier = Qualifier()
contents = [self.policyQualifierId, self.qualifier]
Sequence.__init__(self, contents, optional, default)
class PolicyQualifiers(SequenceOf):
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, PolicyQualifierInfo, optional, default)
class PolicyInformation(Sequence):
def __init__(self, optional=0, default=''):
self.policyIdentifier = Oid()
self.policyQualifiers = PolicyQualifiers(1)
contents = [self.policyIdentifier, self.policyQualifiers]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
CertificatePolicies
SequenceOf
Setting CertificatePolicies
data = (
( o2i('id-cti-ets-proofOfReceipt'), (
(o2i('cps'), ('cPSuri', 'http://www.p-s.org.uk/policies/policy1')),
(o2i('unotice'), ( 'userNotice',
((('visibleString', 'The House'),(1,2,3)),
('visibleString', 'We guarentee nothing')))),
)),
( o2i('id-cti-ets-proofOfOrigin'), (
(o2i('cps'), ('cPSuri', 'http://www.p-s.org.uk/policies/policy2')),
))
)
policies = CertificatePolicies()
policies.set( data )
''')
class CertificatePolicies(SequenceOf):
_addFragment('''
CertificatePolicies
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, PolicyInformation, optional, default)
class DistributionPointName(Choice):
def __init__(self, optional=0, default=''):
fullName = GeneralNames()
fullName.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 0 )
nameRelativeToCRLIssuer = RelativeDistinguishedName()
nameRelativeToCRLIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 1 )
choices = { 'fullName' : fullName,
'nameRelativeToCRLIssuer ' : nameRelativeToCRLIssuer }
Choice.__init__(self, choices, optional, default)
class DistributionPoint(Sequence):
def __init__(self, optional=0, default=''):
self.distributionPoint = DistributionPointName(1)
self.explicitDistributionPoint = Explicit(CLASS_CONTEXT, FORM_CONSTRUCTED, 0, self.distributionPoint)
self.reasons = BitString(1)
self.reasons.implied( CLASS_CONTEXT, FORM_PRIMITIVE, 1 )
self.cRLIssuer = GeneralNames(1)
self.cRLIssuer.implied( CLASS_CONTEXT, FORM_CONSTRUCTED, 2 )
contents = [self.explicitDistributionPoint, self.reasons, self.cRLIssuer]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
CRLDistrobutionPoints
SequenceOf
Setting CRLDistrobutionPoints
n1 = ('directoryName',
( (( o2i('countryName'), ('printableString', 'UK') ),),
(( o2i('stateOrProvinceName'), ('printableString', 'Herts') ),),
(( o2i('organizationName'), ('printableString', 'The House') ),),
(( o2i('commonName'), ('printableString', 'Shannon Works') ),) ) )
n2 = ('iPAddress', POW.pkix.ip42oct(192,168,100,51))
data = ( ( ('fullName',(n1, n2)), (1,1,1,1,1), (n1,) ), )
points = CRLDistrobutionPoints()
points.set( data )
''')
class CRLDistributionPoints(SequenceOf):
_addFragment('''
CRLDistrobutionPoints
optional=0
default=''
''')
def __init__(self, optional=0, default=''):
SequenceOf.__init__(self, DistributionPoint, optional, default)
_addFragment('''
''')
class CrlNumber(Integer):
pass
_addFragment('''
DeltaCrlIndicator
Integer
''')
class DeltaCrlIndicator(Integer):
pass
_addFragment('''
InvalidityDate
GeneralizedTime
''')
class InvalidityDate(GeneralizedTime):
pass
_addFragment('''
''')
class CrlReason(Enum):
pass
#---------- X509 extensions ----------#
_addFragment('''
This class is a useful little object. It is set by passing three
values: an oid, an integer(a boolean really) and a value. The
boolean indicates if this extension is critical. The value is
used to set the extension once it has been created. The oid
is used to create the correct object which, to be fully supported it must
be one of these:
basicConstraints
subjectAltName
issuerAltName
authorityKeyIdentifier
privateKeyUsagePeriod
certificatePolicies
cRLDistributionPoints
subjectKeyIdentifier
keyUsage
crlNumber
deltaCrlIndicator
invalidityDate
crlReason
Setting Extension
extn = Extension()
email = ('rfc822Name', 'peter_shannon@yahoo.com')
extn.set( (obj2oid('subjectAltName'),1, (email,)) )
''')
class Extension(Sequence):
classMap = {
(2, 5, 29, 19) : BasicConstraints,
(2, 5, 29, 17) : SubjectAltName,
(2, 5, 29, 18) : IssuerAltName,
(2, 5, 29, 35) : AuthorityKeyIdentifier,
(2, 5, 29, 16) : PrivateKeyUsagePeriod,
(2, 5, 29, 32) : CertificatePolicies,
(2, 5, 29, 31) : CRLDistributionPoints,
(2, 5, 29, 14) : SubjectKeyIdentifier,
(2, 5, 29, 15) : KeyUsage,
(2, 5, 29, 20) : CrlNumber,
(2, 5, 29, 27) : DeltaCrlIndicator,
(2, 5, 29, 24) : InvalidityDate,
(2, 5, 29, 21) : CrlReason,
}
# Missing -- fix later
# extendedKeyUsage
# privateKeyUsagePeriod
# policyMappings
# nameConstraints
# policyConstraints
# subjectDirectoryAttributes
# authorityInfoAccess
# instructionCode
# issuingDistrobutionPoint
def __init__(self, optional=0, default=''):
self.extnID = Oid()
self.critical = Boolean(0, 'AQEA\n')
self.extnValue = OctetString()
contents = [self.extnID, self.critical, self.extnValue]
Sequence.__init__(self, contents, optional, default)
_addFragment('''
values should be a sequence of three
values, the oid, critical marker and a value to set the
extension. If an unknown oid is passed to this function it
will raise an exception. critical is a
boolean. value will be used to set the
extension after it has been created.
''')
def set(self, (oid, critical, val) ):
self.extnID.set( oid )
self.critical.set( critical )
extnObj = None
if self.classMap.has_key(oid):
extnObj = self.classMap[oid]()
else:
if not (isinstance(oid, types.TupleType) or isinstance(oid, types.ListType)):
raise DerError, 'the oid should be specified as a sequence of integers'
else:
raise DerError, 'unkown object extension %s' % oid
try:
extnObj.set( val )
self.extnValue.set( extnObj.toString() )
except DerError, e:
raise DerError, 'failed to set %s, with:\n\t%s\nresulting in:\n\t%s' % (oid, val, `e`)
_addFragment('''
There are several ways this function might fail to decode an
extension. Firstly if the extension was marked critical but if
the oid cannot be mapped to a class or If a failure occurs decoding the
extnValue, an exception will be raised.
If a failure occurred and the extension was not marked critical it
will return a tuple like this: (oid, critical,
()). If no failures occur a tuple will be returned,
containg the oid, critical and extension values.
''')
def get(self):
oid = self.extnID.get()
critical = self.critical.get()
if self.classMap.has_key(oid):
extnObj = self.classMap[oid]()
else:
if critical:
raise DerError, 'failed to read critical extension %s' % oid
else:
return (name, critical, ())
try:
extnObj = self.classMap[oid]()
extnObj.fromString(self.extnValue.get())
value = extnObj.get()
except:
if critical:
raise DerError, 'failed to read critical extension %s' % name
else:
return (oid, critical, ())
return (oid, critical, value)