diff options
author | Rob Austein <sra@hactrn.net> | 2015-11-17 05:58:04 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-11-17 05:58:04 +0000 |
commit | 8f711b4c16a047870af959775f8d8b8a048f333d (patch) | |
tree | a95817f3f30473a296c6d9dcdb3f1959d4d84b18 /rp | |
parent | dda7d640eaf91b079823cb09e376113627e6f74f (diff) |
First baby steps towards testing new rpki.POW extended validation code.
svn path=/branches/tk705/; revision=6178
Diffstat (limited to 'rp')
-rwxr-xr-x | rp/rcynic/rcynicng | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng new file mode 100755 index 00000000..b9d23380 --- /dev/null +++ b/rp/rcynic/rcynicng @@ -0,0 +1,152 @@ +#!/usr/bin/env python + +# $Id$ + +""" +Reimplementation of rcynic in Python. Work in progress. + +Well, OK, at the moment this doesn't even come close to being a +replacement for the C version of rcynic, must less adding the new +features that were the reason for bothering with all this. Right now, +this is just a test framework for the new POW.c code to support Python +RP code. Gotta start somewhere. +""" + +import os +import sys +import time +import argparse + +import rpki.POW + +from lxml.etree import ElementTree, Element, SubElement, Comment + +args = None + +def check_dir(s): + if not os.path.isdir(s): + raise argparse.ArgumentTypeError("%r is not a directory" % s) + return s + +def parse_options(): + global args # pylint: disable=W0603 + parser = argparse.ArgumentParser(description = __doc__) + parser.add_argument("--unauthenticated", type = check_dir, default = "rcynic-data/unauthenticated") + parser.add_argument("--old-authenticated", type = check_dir, default = "rcynic-data/authenticated.old") + parser.add_argument("--tals", type = check_dir, default = "sample-trust-anchors") + parser.add_argument("--output", default = "rcynic-data/rcynicng-output") + args = parser.parse_args() + + +def read_tals(): + for root, dirs, files in os.walk(args.tals): + for fn in files: + if fn.endswith(".tal"): + with open(os.path.join(root, fn), "r") as f: + lines = f.readlines() + uri = lines.pop(0).strip() + b64 = "".join(lines[lines.index("\n"):]) + key = rpki.POW.Asymmetric.derReadPublic(b64.decode("base64")) + yield uri, key + +def uri_to_fn(uri, base = None): + fn = uri[uri.index("://")+3:] + if base is not None: + fn = os.path.join(base, fn) + return fn + +def first_uri(uris, scheme): + for uri in uris: + if uri.startswith(scheme): + return uri + return None + +def first_rsync_uri(uris): + return first_uri(uris, "rsync://") + + +def walk_tree(ca, trusted, crl, basedir): + ca_status = set() + ca.verify(trusted = trusted, crl = crl, status = ca_status) + trusted.insert(0, ca) + diruri, mfturi = [first_rsync_uri(uri) for uri in ca.getSIA()[:2]] + mft = rpki.POW.Manifest.derReadFile(uri_to_fn(mfturi, basedir)) + ee = mft.certs()[0] + crldp = first_rsync_uri(ee.getCRLDP()) + crl = rpki.POW.CRL.derReadFile(uri_to_fn(crldp, basedir)) + crl_status = set() + mft_status = set() + crl.verify(ca, crl_status) + ee.verify(trusted = trusted, crl = crl, status = mft_status) + mft.verify(status = mft_status) + + print "CA status: ", ", ".join(str(s) for s in ca_status) + print "CRL status:", ", ".join(str(s) for s in crl_status) + print "MFT status:", ", ".join(str(s) for s in mft_status) + + for fn, digest in mft.getFiles(): + + with open(os.path.join(uri_to_fn(diruri, basedir), fn), "rb") as f: + obj = f.read() + dgst = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + dgst.update(obj) + print fn, digest.encode("hex"), "OK hash" if dgst.digest() == digest else "Bad hash" + + if fn.endswith(".crl") and obj != crl.derWrite(): + print "CRL mismatch" + if fn.endswith(".crl"): + continue + + if fn.endswith(".roa"): + roa = rpki.POW.ROA.derRead(obj) + roa_status = set() + ee = roa.certs()[0] + ee.verify(trusted = trusted, crl = crl, status = roa_status) + roa.verify(status = roa_status) + continue + + if fn.endswith(".gbr"): + gbr = rpki.POW.CMS.derRead(obj) + gbr_status = set() + ee = gbr.certs()[0] + ee.verify(trusted = trusted, crl = crl, status = gbr_status) + vcard = gbr.verify(status = gbr_status) + print vcard + continue + + cer = rpki.POW.X509.derRead(obj) + bc = cer.getBasicConstraints() + if bc and bc[0]: + try: + walk_tree(cer, trusted, crl, basedir) + except rpki.POW.Error as e: + print "CA", diruri + fn, "failed:", e + else: + cer_status = set() + cer.verify(trusted = trusted, crl = crl, status = cer_status) + + +def main(): + + os.putenv("TZ", "UTC") + time.tzset() + + parse_options() + + basedir = args.unauthenticated + + for uri, pk in read_tals(): + print + try: + x = rpki.POW.X509.derReadFile(uri_to_fn(uri, basedir)) + except rpki.POW.OpenSSLError: + print "Couldn't open TA {}".format(uri) + else: + ok = pk.derWritePublic() == x.getPublicKey().derWritePublic() + print "OK " if ok else "Bad", uri + if ok: + walk_tree(x, [x], None, basedir) + + +if __name__ == "__main__": + main() |