diff options
author | Rob Austein <sra@hactrn.net> | 2010-03-13 23:33:55 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2010-03-13 23:33:55 +0000 |
commit | dfbc51d4c4b712ca32c5fd8fef359e6160100476 (patch) | |
tree | 5ee80ee707f2da4321b80b0fca543aed88a2de59 /myrpki.rototill/myrpki.py | |
parent | 5dae56baf99b4140c77a541a918fe31c13b5ae01 (diff) |
Refactor OpenSSL code (tastes better! less filling!)
svn path=/myrpki.rototill/myrpki.py; revision=3082
Diffstat (limited to 'myrpki.rototill/myrpki.py')
-rw-r--r-- | myrpki.rototill/myrpki.py | 118 |
1 files changed, 35 insertions, 83 deletions
diff --git a/myrpki.rototill/myrpki.py b/myrpki.rototill/myrpki.py index 82a88386..95902791 100644 --- a/myrpki.rototill/myrpki.py +++ b/myrpki.rototill/myrpki.py @@ -547,38 +547,36 @@ class CA(object): p = subprocess.Popen(cmd, env = env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = p.communicate(stdin) if p.wait() != 0: - sys.stderr.write("OpenSSL reported: " + stderr + "\n") + sys.stderr.write("OpenSSL command failed: " + stderr + "\n") raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) return stdout def run_ca(self, *args): """ - Run OpenSSL "ca" command with tailored environment variables and common initial - arguments. "ca" is rather chatty, so we suppress its output except on errors. + Run OpenSSL "ca" command with common initial arguments. """ - cmd = (self.openssl, "ca", "-batch", "-config", self.cfg) + args - p = subprocess.Popen(cmd, env = self.env, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) - log = p.communicate()[0] - if p.wait() != 0: - sys.stderr.write(log) - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) + self.run_openssl("ca", "-batch", "-config", self.cfg, *args) def run_req(self, key_file, req_file): """ - Run OpenSSL "req" command with tailored environment variables and common arguments. - "req" is rather chatty, so we suppress its output except on errors. + Run OpenSSL "genrsa" and "req" commands. """ if not os.path.exists(key_file): self.run_openssl("genrsa", "-out", key_file, "2048") if not os.path.exists(req_file): - cmd = (self.openssl, "req", "-new", "-sha256", "-config", self.cfg, - "-key", key_file, - "-out", req_file) - p = subprocess.Popen(cmd, env = self.env, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) - log = p.communicate()[0] - if p.wait() != 0: - sys.stderr.write(log) - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) + self.run_openssl("req", "-new", "-sha256", "-config", self.cfg, "-key", key_file, "-out", req_file) + + def run_dgst(self, input, algorithm = "md5"): + """ + Run OpenSSL "dgst" command, return cleaned-up result. + """ + hash = self.run_openssl("dgst", "-" + algorithm, stdin = input) + # + # Twits just couldn't leave well enough alone, grr. + hash = "".join(hash.split()) + if hash.startswith("(stdin)="): + hash = hash[len("(stdin)="):] + return hash @staticmethod def touch_file(filename, content = None): @@ -636,18 +634,13 @@ class CA(object): Sign an XML object with CMS, return Base64 text. """ self.ee(ee_name, base_name) - xml = ElementToString(etree_pre_write(elt)) - oid = ".".join(str(i) for i in rpki.oids.name2oid["id-ct-xml"]) - cmd = (self.openssl, "cms", "-sign", "-binary", "-nodetach", "-nosmimecap", - "-keyid", "-outform", "DER", "-md", "sha256", "-econtent_type", oid, - "-inkey", "%s/%s.key" % (self.dir, base_name), - "-signer", "%s/%s.cer" % (self.dir, base_name)) - p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) - cms, err = p.communicate(xml) - if p.wait() != 0: - sys.stderr.write("OpenSSL reported: " + err + "\n") - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) - return base64.b64encode(cms) + return base64.b64encode(self.run_openssl( + "cms", "-sign", "-binary", "-outform", "DER", + "-keyid", "-md", "sha256", "-nodetach", "-nosmimecap", + "-econtent_type", ".".join(str(i) for i in rpki.oids.name2oid["id-ct-xml"]), + "-inkey", "%s/%s.key" % (self.dir, base_name), + "-signer", "%s/%s.cer" % (self.dir, base_name), + stdin = ElementToString(etree_pre_write(elt)))) def cms_xml_verify(self, b64, ca): """ @@ -656,27 +649,20 @@ class CA(object): the issuer of the EE certificate bundled with the CMS, and must previously have been cross-certified under our trust anchor. """ - # In theory, we should be able to use the -certfile parameter to # pass in the CA certificate, but in practice, I have never gotten # this to work, either with the command line tool or in the # OpenSSL C API. Dunno why. Passing both TA and CA via -CAfile # does work, so we do that, using a temporary file, sigh. - CAfile = os.path.join(self.dir, "temp.%s.pem" % os.getpid()) try: f = open(CAfile, "w") f.write(open(self.cer).read()) f.write(open(ca).read()) f.close() - cmd = (self.openssl, "cms", "-verify", "-inform", "DER", "-CAfile", CAfile) - p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) - xml, err = p.communicate(base64.b64decode(b64)) - if p.wait() != 0: - if err: - sys.stderr.write("OpenSSL reported: " + err + "\n") - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) - return etree_post_read(ElementFromString(xml)) + return etree_post_read(ElementFromString(self.run_openssl( + "cms", "-verify", "-inform", "DER", "-CAfile", CAfile, + stdin = base64.b64decode(b64)))) finally: if os.path.exists(CAfile): os.unlink(CAfile) @@ -688,28 +674,16 @@ class CA(object): if pkcs10 is None: return None, None - - pkcs10 = base64.b64decode(pkcs10) - assert pkcs10 + pkcs10 = base64.b64decode(pkcs10) - cmd = (self.openssl, "dgst", "-md5") - p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE, stdout = subprocess.PIPE) - hash = p.communicate(pkcs10)[0].strip() - if p.wait() != 0: - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) + hash = self.run_dgst(pkcs10) req_file = "%s/bsc.%s.req" % (self.dir, hash) cer_file = "%s/bsc.%s.cer" % (self.dir, hash) if not os.path.exists(cer_file): - - cmd = (self.openssl, "req", "-inform", "DER", "-out", req_file) - p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE) - p.communicate(pkcs10) - if p.wait() != 0: - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) - + self.run_openssl("req", "-inform", "DER", "-out", req_file, stdin = pkcs10) self.run_ca("-extensions", "ca_x509_ext_ee", "-in", req_file, "-out", cer_file) return req_file, cer_file @@ -720,11 +694,8 @@ class CA(object): """ fn = os.path.join(self.dir, filename or "temp.%s.cer" % os.getpid()) try: - cmd = (self.openssl, "x509", "-inform", "DER", "-out", fn) - p = subprocess.Popen(cmd, env = self.env, stdin = subprocess.PIPE) - p.communicate(base64.b64decode(b64)) - if p.wait() != 0: - raise subprocess.CalledProcessError(returncode = p.returncode, cmd = cmd) + self.run_openssl("x509", "-inform", "DER", "-out", fn, + stdin = base64.b64decode(b64)) return self.xcert(fn, path_restriction) finally: if not filename and os.path.exists(fn): @@ -736,33 +707,14 @@ class CA(object): Cross-certify a certificate represented as a PEM file. """ - if not cert: - return None - - if not os.path.exists(cert): - #print "Certificate %s doesn't exist, skipping" % cert + if not cert or not os.path.exists(cert): return None # Extract public key and subject name from PEM file and hash it so # we can use the result as a tag for cross-certifying this cert. - cmd1 = (self.openssl, "x509", "-noout", "-pubkey", "-subject", "-in", cert) - cmd2 = (self.openssl, "dgst", "-md5") - - p1 = subprocess.Popen(cmd1, env = self.env, stdout = subprocess.PIPE) - p2 = subprocess.Popen(cmd2, env = self.env, stdin = p1.stdout, stdout = subprocess.PIPE) - - hash = p2.communicate()[0] - - if p1.wait() != 0: - raise subprocess.CalledProcessError(returncode = p1.returncode, cmd = cmd1) - if p2.wait() != 0: - raise subprocess.CalledProcessError(returncode = p2.returncode, cmd = cmd2) - - # Sigh. Idiots just couldn't leave well enough alone. - hash = "".join(hash.split()) - if hash.startswith("(stdin)="): - hash = hash[len("(stdin)="):] + hash = self.run_dgst(self.run_openssl( + "x509", "-noout", "-pubkey", "-subject", "-in", cert)) # Cross-certify the cert we were given, if we haven't already. # This only works for self-signed certs, due to limitations of the |