diff options
author | Rob Austein <sra@hactrn.net> | 2015-11-20 08:17:23 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-11-20 08:17:23 +0000 |
commit | 17d892bb1939a5c7421cf69119858b471a5c5539 (patch) | |
tree | b2e217d09144134c42416baecf95ff5524e48a7a | |
parent | 63f731051c9ff62346ec2464bce59ca483c50a2d (diff) |
Checkpoint. Not useful yet, but starting to come together. Tornado is awesome.
svn path=/branches/tk705/; revision=6182
-rwxr-xr-x | rp/rcynic/rcynicng | 337 |
1 files changed, 300 insertions, 37 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index d6aaf56e..1c7d6de8 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -15,31 +15,73 @@ RP code. Gotta start somewhere. import os import sys import time +import logging import argparse +import subprocess + +import tornado.gen +import tornado.ioloop +import tornado.queues +import tornado.process import rpki.POW from lxml.etree import ElementTree, Element, SubElement, Comment +logger = logging.getLogger("rcynicng") + + +class Generation(object): + + all = [] + + def __init__(self, name, tree): + self.name = name + self.tree = tree + self.all.append(self) + self.pos = len(self.all) + setattr(self.__class__, name, self) + + def __hash__(self): + return hash(self.name) + + def __cmp__(self, other): + return cmp(self.pos, 0 if other is None else other.pos) + + def __str__(self): + return self.name + + class Status(object): """ Validation status database, like validation_status_t in rcynic:tos. + + rcynic:tos version of this data structure is stored as an AVL + tree, because the OpenSSL STACK_OF() sort-and-bsearch turned out + to be a very poor choice for the input data. Remains to be seen + whether we need to do something like that here too. """ db = dict() - def __init__(self, uri, generation = None): - assert generation in ("current", "backup", None) + def __init__(self, uri, generation): + assert generation is None or isinstance(generation, Generation) self.uri = uri - self.generation = generation - self.timestamp = None + self._generation = generation + self._timestamp = None self.status = set() def __str__(self): - return "{time} {self.uri} {status} {self.generation}".format( - self = self, - time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.timestamp)), - status = ",".join(str(s) for s in sorted(self.status))) + return "{self.timestamp} {self.uri} {status} {self.generation}".format( + self = self, status = ",".join(str(s) for s in sorted(self.status))) + + @property + def timestamp(self): + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self._timestamp)) + + @property + def generation(self): + return str(self._generation) @classmethod def update(cls, uri, generation = None): @@ -48,10 +90,109 @@ class Status(object): self = cls.db[key] except KeyError: self = cls.db[key] = cls(uri, generation) - self.timestamp = time.time() + self._timestamp = time.time() return self.status +class CertInfo(object): + """ + Certificate info, like rcynic:tos certinfo_t. Not sure we really + need this, but follow rcynic:tos model for now, clean up later. + """ + + def __init__(self, **kwargs): + bools = ("ca", "ta") + nones = ("generation", "uri", "caIssuers", "crldp", + "caDirectory", "rpkiManifest", "signedObjectRepository", "rpkiNotify") + for name in bools: + setattr(self, name, False) + for name in nones: + setattr(self, name, None) + for name in kwargs: + if name in bools + nones: + setattr(self, name, kwargs[name]) + else: + raise ValueError + + +class WalkFrame(object): + """ + Certificate tree walk stack frame, like rcynic:tos walk_ctx_t. + + Try using bound method in place of explicit state enum, see how + that works. + + Not sure how much of this is really needed, follow rcynic:tos for + now and prune later. + """ + + def __init__(self): + self.certinfo = CertInfo() + self.cert = None + self.manifest = None + self.manifest_generation = None + self.filenames = [] + self.manifest_iteration = None + self.filename_iteration = None + self.stale_manifest = False + self.crldp = None + self.certs = [] + self.crls = [] + self.state = self.initial + + @tornado.gen.coroutine + def initial(self): + raise NotImplementedError + + @tornado.gen.coroutine + def fetch(self): + raise NotImplementedError + + @tornado.gen.coroutine + def ready(self): + raise NotImplementedError + + @tornado.gen.coroutine + def current(self): + raise NotImplementedError + + @tornado.gen.coroutine + def backup(self): + raise NotImplementedError + + @tornado.gen.coroutine + def done(self): + raise NotImplementedError + + +class WalkTask(object): + """ + Task corresponding to one walk stack, roughly analgous to + STACK_OF(walk_ctx_t) in rcynic:tos. + """ + + def __init__(self): + self.stack = [] + + @tornado.gen.coroutine + def __call__(self): + yield self.stack[-1].state() + + +# Not sure we need all the rsync_status_t, rsync_state_t, and +# rsync_ctx_t stuff from rcynic:tos, add later if we do. + +# Probably do need some analogue of the rsync_history database, for +# tracking down hosts, URIs already fetched, and (perhaps) limiting +# simultaneous connection attempts to a single host. + +# Analogue to rcynic:tos STACK_OF(task_t): queue of tasks to be +# invoked by the next available worker. Various fun options here, go +# with simplest (unbounded simple queue) initially. + +task_queue = tornado.queues.Queue() + + def parse_arguments(): def check_dir(s): @@ -59,11 +200,19 @@ def parse_arguments(): raise argparse.ArgumentTypeError("%r is not a directory" % s) return s + def posint(s): + i = int(s) + if i <= 0: + raise argparse.ArgumentTypeError("%r is not a positive integer " % s) + return i + + parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("--unauthenticated", type = check_dir, default = "rcynic-data/unauthenticated") parser.add_argument("--old-authenticated", type = check_dir, default = "rcynic-data/authenticated.old") parser.add_argument("--tals", type = check_dir, default = "sample-trust-anchors") parser.add_argument("--output", default = "rcynic-data/rcynicng-output") + parser.add_argument("--workers", type = posint, default = 10) return parser.parse_args() @@ -79,10 +228,10 @@ def read_tals(): b64 = "".join(lines[lines.index("\n"):]) key = rpki.POW.Asymmetric.derReadPublic(b64.decode("base64")) if not uri.endswith(".cer"): - Status.update(furi).add(rpki.POW.validation_status.MALFORMED_TAL_URI) + Status.update(furi, None).add(rpki.POW.validation_status.MALFORMED_TAL_URI) yield uri, key except: - Status.update(furi).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR) + Status.update(furi, None).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR) def uri_to_filename(uri, base = None): @@ -109,6 +258,7 @@ def sha256(bytes): return d.digest() +@tornado.gen.coroutine def walk_tree(cauri, ca, trusted, crl, basedir): trusted.insert(0, ca) @@ -118,7 +268,7 @@ def walk_tree(cauri, ca, trusted, crl, basedir): try: mft = rpki.POW.Manifest.derReadFile(uri_to_filename(mfturi, basedir)) except rpki.POW.Error as e: - print mfturi, e + logger.warn("Couldn't read %s: %s", mfturi, e) return ee = mft.certs()[0] crldp = ee.getCRLDP() @@ -126,7 +276,7 @@ def walk_tree(cauri, ca, trusted, crl, basedir): try: crl = rpki.POW.CRL.derReadFile(uri_to_filename(crluri, basedir)) except rpki.POW.Error as e: - print crluri, e + logger.warn("Couldn't read %s: %s", crluri, e) return crl_status = Status.update(crluri) @@ -139,6 +289,16 @@ def walk_tree(cauri, ca, trusted, crl, basedir): crl_status.add(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST) for fn, digest in mft.getFiles(): + + # XXX There are enough objects in the RIPE repository that we + # need to let the I/O loop run occasionally. Probably not on + # every object as this does, but without this it takes about + # 18 seconds to check the whole RIPE database, which makes + # APNIC look even slower than they are. Maybe yield every N + # objects for some tunable value of N, or something like that. + + yield tornado.gen.moment + uri = diruri + fn status = Status.update(uri) @@ -172,38 +332,141 @@ def walk_tree(cauri, ca, trusted, crl, basedir): cer.verify(trusted = trusted, crl = crl, status = status) is_ca = (cer.getBasicConstraints() or (False, None))[0] if is_ca: - walk_tree(diruri + fn, cer, trusted, crl, basedir) + yield walk_tree(diruri + fn, cer, trusted, crl, basedir) continue status.add(rpki.POW.validation_status.UNKNOWN_OBJECT_TYPE_SKIPPED) +@tornado.gen.coroutine +def rsync_ta(uri): + assert uri.endswith(".cer") + yield rsync(uri, ()) + +@tornado.gen.coroutine +def rsync_sia(uri): + assert uri.endswith("/") + yield rsync(uri, ("--recursive", "--delete")) + +@tornado.gen.coroutine +def rsync(uri, cmd_extra): + + # This is where we should check our internal history database to + # see if we've already fetched this URI or something covering it; + # if so, just return success at this point, we've done all the fetching + # we're going to do for this URI on this rcynic run. + + cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes", "-4"] + cmd.extend(cmd_extra) + cmd.append(uri) + cmd.append(uri_to_filename(uri, args.unauthenticated)) + + dn = os.path.dirname(cmd[-1]) + if not os.path.exists(dn): + os.makedirs(dn) + + t0 = time.time() + proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT) + logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd)) + ret, out = yield [proc.wait_for_exit(raise_error = False), proc.stdout.read_until_close()] + t1 = time.time() + for line in out.splitlines(): + logger.debug("rsync[%s] %s", proc.pid, line) + logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, ret) + + # Should do something with rsync result and validation status database here. + + +class CheckTALTask(object): + + def __init__(self, uri, key): + self.uri = uri + self.key = key + + def __repr__(self): + return "<CheckTALTask: \"{}\">".format(self.uri) + + @tornado.gen.coroutine + def __call__(self): + yield rsync_ta(self.uri) + for generation in (Generation.current, Generation.backup): + try: + yield self.check(generation) + + # Need to convert this into initialization of a + # WalkTask object and insertion of that task into the + # task_queue. Check rcynic:tos to see how it handles + # the walk_ctx initialization. + + logger.debug("Starting walk of %s", self.uri) + yield walk_tree(self.uri, self.cer, [self.cer], None, generation.tree) + + return + except rpki.POW.Error: + pass + Status.update(self.uri, None).add(rpki.POW.validation_status.TRUST_ANCHOR_SKIPPED) + + @tornado.gen.coroutine + def check(self, generation): + status = Status.update(self.uri, generation) + try: + self.cer = rpki.POW.X509.derReadFile(uri_to_filename(self.uri, generation.tree)) + except rpki.POW.OpenSSLError: + status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR) + status.add(rpki.POW.validation_status.OBJECT_REJECTED) + raise rpki.POW.ValidationError + if self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic(): + status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH) + status.add(rpki.POW.validation_status.OBJECT_REJECTED) + raise rpki.POW.ValidationError + try: + self.cer.verify(trusted = [self.cer], status = status) + except rpki.POW.ValidationError: + status.add(rpki.POW.validation_status.OBJECT_REJECTED) + raise + + +@tornado.gen.coroutine +def worker(meself): + # + # NB: This particular style of control loop REQUIRES an except + # clause, even if that except clause is just a pass statement. + # + while True: + task = yield task_queue.get() + try: + logger.info("Worker %s starting %s", meself, task) + yield task() + except: + logger.exception("Worker %s caught unhandled exception from %s", meself, task) + finally: + task_queue.task_done() + logger.info("Worker %s finished %s", meself, task) + + +@tornado.gen.coroutine +def main(): + for i in xrange(args.workers): + tornado.ioloop.IOLoop.current().spawn_callback(worker, i) + yield [task_queue.put(CheckTALTask(uri, key)) for uri, key in read_tals()] + yield task_queue.join() + + os.putenv("TZ", "UTC") time.tzset() args = parse_arguments() -basedir = args.unauthenticated +Generation("current", args.unauthenticated) +Generation("backup", args.old_authenticated) -for uri, key in read_tals(): - status = Status.update(uri) - status.add(rpki.POW.validation_status.OBJECT_REJECTED) - try: - cer = rpki.POW.X509.derReadFile(uri_to_filename(uri, basedir)) - except rpki.POW.OpenSSLError: - status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR) - continue - if key.derWritePublic() != cer.getPublicKey().derWritePublic(): - status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH) - continue - trusted = [cer] - try: - cer.verify(trusted = trusted, status = status) - except: - continue - else: - status.remove(rpki.POW.validation_status.OBJECT_REJECTED) - walk_tree(uri, cer, trusted, None, basedir) - -for uri in sorted(Status.db): - print Status.db[uri] +# Put this under config/argparse control later, for now I want to see everything +logging.getLogger().setLevel(logging.DEBUG) + +tornado.ioloop.IOLoop.current().run_sync(main) + +if False: + for uri in sorted(Status.db): + print Status.db[uri] + +# Should be doing something to generate rcynic.xml here. |