aboutsummaryrefslogtreecommitdiff
path: root/myrpki.rototill/myrpki.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2010-03-13 23:33:55 +0000
committerRob Austein <sra@hactrn.net>2010-03-13 23:33:55 +0000
commitdfbc51d4c4b712ca32c5fd8fef359e6160100476 (patch)
tree5ee80ee707f2da4321b80b0fca543aed88a2de59 /myrpki.rototill/myrpki.py
parent5dae56baf99b4140c77a541a918fe31c13b5ae01 (diff)
Refactor OpenSSL code (tastes better! less filling!)
svn path=/myrpki.rototill/myrpki.py; revision=3082
Diffstat (limited to 'myrpki.rototill/myrpki.py')
-rw-r--r--myrpki.rototill/myrpki.py118
1 files changed, 35 insertions, 83 deletions
diff --git a/myrpki.rototill/myrpki.py b/myrpki.rototill/myrpki.py
index 82a88386..95902791 100644
--- a/myrpki.rototill/myrpki.py
+++ b/myrpki.rototill/myrpki.py
@@ -547,38 +547,36 @@ class CA(object):
p = subprocess.Popen(cmd, env = env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
stdout, stderr = p.communicate(stdin)
if p.wait() != 0:
- sys.stderr.write("OpenSSL reported: " + stderr + "\n")
+ sys.stderr.write("OpenSSL command failed: " + stderr + "\n")
raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
return stdout
def run_ca(self, *args):
"""
- Run OpenSSL "ca" command with tailored environment variables and common initial
- arguments. "ca" is rather chatty, so we suppress its output except on errors.
+ Run OpenSSL "ca" command with common initial arguments.
"""
- cmd = (self.openssl, "ca", "-batch", "-config", self.cfg) + args
- p = subprocess.Popen(cmd, env = self.env, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
- log = p.communicate()[0]
- if p.wait() != 0:
- sys.stderr.write(log)
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
+ self.run_openssl("ca", "-batch", "-config", self.cfg, *args)
def run_req(self, key_file, req_file):
"""
- Run OpenSSL "req" command with tailored environment variables and common arguments.
- "req" is rather chatty, so we suppress its output except on errors.
+ Run OpenSSL "genrsa" and "req" commands.
"""
if not os.path.exists(key_file):
self.run_openssl("genrsa", "-out", key_file, "2048")
if not os.path.exists(req_file):
- cmd = (self.openssl, "req", "-new", "-sha256", "-config", self.cfg,
- "-key", key_file,
- "-out", req_file)
- p = subprocess.Popen(cmd, env = self.env, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
- log = p.communicate()[0]
- if p.wait() != 0:
- sys.stderr.write(log)
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
+ self.run_openssl("req", "-new", "-sha256", "-config", self.cfg, "-key", key_file, "-out", req_file)
+
+ def run_dgst(self, input, algorithm = "md5"):
+ """
+ Run OpenSSL "dgst" command, return cleaned-up result.
+ """
+ hash = self.run_openssl("dgst", "-" + algorithm, stdin = input)
+ #
+ # Twits just couldn't leave well enough alone, grr.
+ hash = "".join(hash.split())
+ if hash.startswith("(stdin)="):
+ hash = hash[len("(stdin)="):]
+ return hash
@staticmethod
def touch_file(filename, content = None):
@@ -636,18 +634,13 @@ class CA(object):
Sign an XML object with CMS, return Base64 text.
"""
self.ee(ee_name, base_name)
- xml = ElementToString(etree_pre_write(elt))
- oid = ".".join(str(i) for i in rpki.oids.name2oid["id-ct-xml"])
- cmd = (self.openssl, "cms", "-sign", "-binary", "-nodetach", "-nosmimecap",
- "-keyid", "-outform", "DER", "-md", "sha256", "-econtent_type", oid,
- "-inkey", "%s/%s.key" % (self.dir, base_name),
- "-signer", "%s/%s.cer" % (self.dir, base_name))
- p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- cms, err = p.communicate(xml)
- if p.wait() != 0:
- sys.stderr.write("OpenSSL reported: " + err + "\n")
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
- return base64.b64encode(cms)
+ return base64.b64encode(self.run_openssl(
+ "cms", "-sign", "-binary", "-outform", "DER",
+ "-keyid", "-md", "sha256", "-nodetach", "-nosmimecap",
+ "-econtent_type", ".".join(str(i) for i in rpki.oids.name2oid["id-ct-xml"]),
+ "-inkey", "%s/%s.key" % (self.dir, base_name),
+ "-signer", "%s/%s.cer" % (self.dir, base_name),
+ stdin = ElementToString(etree_pre_write(elt))))
def cms_xml_verify(self, b64, ca):
"""
@@ -656,27 +649,20 @@ class CA(object):
the issuer of the EE certificate bundled with the CMS, and must
previously have been cross-certified under our trust anchor.
"""
-
# In theory, we should be able to use the -certfile parameter to
# pass in the CA certificate, but in practice, I have never gotten
# this to work, either with the command line tool or in the
# OpenSSL C API. Dunno why. Passing both TA and CA via -CAfile
# does work, so we do that, using a temporary file, sigh.
-
CAfile = os.path.join(self.dir, "temp.%s.pem" % os.getpid())
try:
f = open(CAfile, "w")
f.write(open(self.cer).read())
f.write(open(ca).read())
f.close()
- cmd = (self.openssl, "cms", "-verify", "-inform", "DER", "-CAfile", CAfile)
- p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- xml, err = p.communicate(base64.b64decode(b64))
- if p.wait() != 0:
- if err:
- sys.stderr.write("OpenSSL reported: " + err + "\n")
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
- return etree_post_read(ElementFromString(xml))
+ return etree_post_read(ElementFromString(self.run_openssl(
+ "cms", "-verify", "-inform", "DER", "-CAfile", CAfile,
+ stdin = base64.b64decode(b64))))
finally:
if os.path.exists(CAfile):
os.unlink(CAfile)
@@ -688,28 +674,16 @@ class CA(object):
if pkcs10 is None:
return None, None
-
- pkcs10 = base64.b64decode(pkcs10)
- assert pkcs10
+ pkcs10 = base64.b64decode(pkcs10)
- cmd = (self.openssl, "dgst", "-md5")
- p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
- hash = p.communicate(pkcs10)[0].strip()
- if p.wait() != 0:
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
+ hash = self.run_dgst(pkcs10)
req_file = "%s/bsc.%s.req" % (self.dir, hash)
cer_file = "%s/bsc.%s.cer" % (self.dir, hash)
if not os.path.exists(cer_file):
-
- cmd = (self.openssl, "req", "-inform", "DER", "-out", req_file)
- p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE)
- p.communicate(pkcs10)
- if p.wait() != 0:
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
-
+ self.run_openssl("req", "-inform", "DER", "-out", req_file, stdin = pkcs10)
self.run_ca("-extensions", "ca_x509_ext_ee", "-in", req_file, "-out", cer_file)
return req_file, cer_file
@@ -720,11 +694,8 @@ class CA(object):
"""
fn = os.path.join(self.dir, filename or "temp.%s.cer" % os.getpid())
try:
- cmd = (self.openssl, "x509", "-inform", "DER", "-out", fn)
- p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE)
- p.communicate(base64.b64decode(b64))
- if p.wait() != 0:
- raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd)
+ self.run_openssl("x509", "-inform", "DER", "-out", fn,
+ stdin = base64.b64decode(b64))
return self.xcert(fn, path_restriction)
finally:
if not filename and os.path.exists(fn):
@@ -736,33 +707,14 @@ class CA(object):
Cross-certify a certificate represented as a PEM file.
"""
- if not cert:
- return None
-
- if not os.path.exists(cert):
- #print "Certificate %s doesn't exist, skipping" % cert
+ if not cert or not os.path.exists(cert):
return None
# Extract public key and subject name from PEM file and hash it so
# we can use the result as a tag for cross-certifying this cert.
- cmd1 = (self.openssl, "x509", "-noout", "-pubkey", "-subject", "-in", cert)
- cmd2 = (self.openssl, "dgst", "-md5")
-
- p1 = subprocess.Popen(cmd1, env = self.env, stdout = subprocess.PIPE)
- p2 = subprocess.Popen(cmd2, env = self.env, stdin = p1.stdout, stdout = subprocess.PIPE)
-
- hash = p2.communicate()[0]
-
- if p1.wait() != 0:
- raise subprocess.CalledProcessError(returncode = p1.returncode, cmd = cmd1)
- if p2.wait() != 0:
- raise subprocess.CalledProcessError(returncode = p2.returncode, cmd = cmd2)
-
- # Sigh. Idiots just couldn't leave well enough alone.
- hash = "".join(hash.split())
- if hash.startswith("(stdin)="):
- hash = hash[len("(stdin)="):]
+ hash = self.run_dgst(self.run_openssl(
+ "x509", "-noout", "-pubkey", "-subject", "-in", cert))
# Cross-certify the cert we were given, if we haven't already.
# This only works for self-signed certs, due to limitations of the