diff options
author | Rob Austein <sra@hactrn.net> | 2015-11-22 08:55:25 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-11-22 08:55:25 +0000 |
commit | 72eb9c012c6d4cdf5f470607e5eacf6e36114457 (patch) | |
tree | 457c1f922021fb84426b2f71255b9c9fb90778b7 /rp/rcynic | |
parent | 6ec036bfdceadc4ca72f27d4369dfded0c194a19 (diff) |
Convert certificate tree walking code to use new tasking model.
Get full rsync code working, history cache and all.
svn path=/branches/tk705/; revision=6184
Diffstat (limited to 'rp/rcynic')
-rwxr-xr-x | rp/rcynic/rcynicng | 694 |
1 files changed, 480 insertions, 214 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 1c7d6de8..7e136ca8 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -20,16 +20,20 @@ import argparse import subprocess import tornado.gen +import tornado.locks import tornado.ioloop import tornado.queues import tornado.process import rpki.POW +from rpki.oids import id_kp_bgpsec_router + from lxml.etree import ElementTree, Element, SubElement, Comment logger = logging.getLogger("rcynicng") +codes = rpki.POW.validation_status class Generation(object): @@ -72,8 +76,8 @@ class Status(object): self.status = set() def __str__(self): - return "{self.timestamp} {self.uri} {status} {self.generation}".format( - self = self, status = ",".join(str(s) for s in sorted(self.status))) + return "{my.timestamp} {my.uri} {status} {my.generation}".format( + my = self, status = ",".join(str(s) for s in sorted(self.status))) @property def timestamp(self): @@ -94,75 +98,341 @@ class Status(object): return self.status -class CertInfo(object): - """ - Certificate info, like rcynic:tos certinfo_t. Not sure we really - need this, but follow rcynic:tos model for now, clean up later. - """ +class X509(rpki.POW.X509): - def __init__(self, **kwargs): - bools = ("ca", "ta") - nones = ("generation", "uri", "caIssuers", "crldp", - "caDirectory", "rpkiManifest", "signedObjectRepository", "rpkiNotify") - for name in bools: - setattr(self, name, False) - for name in nones: - setattr(self, name, None) - for name in kwargs: - if name in bools + nones: - setattr(self, name, kwargs[name]) - else: - raise ValueError + @classmethod + def derReadURI(cls, uri, generation, cms = None): + fn = uri_to_filename(uri, generation.tree) + if not os.path.exists(fn): + Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND) + return None + if cms is None: + with open(fn, "rb") as f: + der = f.read() + else: + # XXX awful kludge to work around current lack of subclass + # support in rpki.POW.CMS.certs(). + der = cms.certs()[0].derWrite() + self = cls.derRead(der) + self.uri = uri + self.fn = fn + self.generation = generation + self.sha256 = sha256(der) if cms is None else None + self.bc = self.getBasicConstraints() + self.aki = self.getAKI() + self.ski = self.getSKI() + self.eku = self.getEKU() + self.aia = self.getAIA() + self.sia = self.getSIA() + self.crldp = self.getCRLDP() + self.is_ca = self.bc is not None and self.bc[0] + self.caDirectory, self.rpkiManifest, self.signedObjectRepository, self.rpkiNotify \ + = self.sia or (None, None, None, None) + return self + + @staticmethod + def count_uris(uris, scheme = "rsync://"): + count = 0 + if uris is not None: + for uri in uris: + if uri.startswith(scheme): + count += 1 + return count + + def check(self, trusted = None, crl = None): + status = Status.update(self.uri, self.generation) + is_ta = trusted is None and crl is None + is_routercert = (self.eku is not None and id_kp_bgpsec_router in self.eku and + not self.is_ca and self.uri.endswith(".cer")) + if self.eku is not None and (self.is_ca or not self.uri.endswith(".cer")): + status.add(codes.INAPPROPRIATE_EKU_EXTENSION) + if is_ta and not self.is_ca: + status.add(codes.MALFORMED_TRUST_ANCHOR) + if is_ta and self.aia is not None: + status.add(codes.AIA_EXTENSION_FORBIDDEN) + if not is_ta and self.aia is None: + status.add(codes.AIA_EXTENSION_MISSING) + if is_routercert and self.sia is not None: + status.add(codes.SIA_EXTENSION_FORBIDDEN) + if not is_routercert and self.sia is None: + status.add(codes.SIA_EXTENSION_MISSING) + if is_ta and self.crldp is not None: + status.add(codes.CRLDP_EXTENSION_FORBIDDEN) + if not is_ta and self.crldp is None: + status.add(codes.CRLDP_EXTENSION_MISSING) + n_rsync_caIssuers = self.count_uris(self.aia) + n_rsync_caDirectory = self.count_uris(self.caDirectory) + n_rsync_rpkiManifest = self.count_uris(self.rpkiManifest) + n_rsync_signedObjectRepository = self.count_uris(self.signedObjectRepository) + if n_rsync_caIssuers > 1 or n_rsync_caDirectory > 1 or n_rsync_rpkiManifest > 1 or n_rsync_signedObjectRepository > 1: + status.add(codes.MULTIPLE_RSYNC_URIS_IN_EXTENSION) + if self.aia is not None and n_rsync_caIssuers == 0: + status.add(codes.MALFORMED_AIA_EXTENSION) + if self.is_ca: + ok = n_rsync_caDirectory != 0 and n_rsync_rpkiManifest != 0 and n_rsync_signedObjectRepository == 0 + elif not is_routercert: + ok = n_rsync_caDirectory == 0 and n_rsync_rpkiManifest == 0 and n_rsync_signedObjectRepository != 0 + else: + ok = self.sia is None + if not ok: + status.add(codes.MALFORMED_SIA_EXTENSION) + if not is_ta and self.count_uris(self.crldp) == 0: + status.add(codes.MALFORMED_CRLDP_EXTENSION) + try: + self.verify(trusted = [self] if trusted is None else trusted, crl = crl, status = status) + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + codes.normalize(status) + return not any(s.kind == "bad" for s in status) -class WalkFrame(object): - """ - Certificate tree walk stack frame, like rcynic:tos walk_ctx_t. +class CRL(rpki.POW.CRL): + + @classmethod + def derReadURI(cls, uri, generation = None): + fn = uri_to_filename(uri, generation.tree) + if not os.path.exists(fn): + Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND) + return None + with open(fn, "rb") as f: + der = f.read() + self = cls.derRead(der) + self.uri = uri + self.fn = fn + self.generation = generation + self.sha256 = sha256(der) + self.aki = self.getAKI() + self.thisUpdate = self.getThisUpdate() + self.nextUpdate = self.getNextUpdate() + self.number = self.getCRLNumber() + return self + + def check(self, issuer): + status = Status.update(self.uri, self.generation) + try: + self.verify(issuer, status) + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + codes.normalize(status) + return not any(s.kind == "bad" for s in status) - Try using bound method in place of explicit state enum, see how - that works. - Not sure how much of this is really needed, follow rcynic:tos for - now and prune later. +class Ghostbuster(rpki.POW.CMS): + + @classmethod + def derReadURI(cls, uri, generation): + fn = uri_to_filename(uri, generation.tree) + if not os.path.exists(fn): + Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND) + return None + with open(fn, "rb") as f: + der = f.read() + self = cls.derRead(der) + self.uri = uri + self.fn = fn + self.generation = generation + self.sha256 = sha256(der) + self.ee = X509.derReadURI(uri, generation, self) + self.vcard = None + return self + + def check(self, trusted = None, crl = None): + status = Status.update(self.uri, self.generation) + self.ee.check(trusted, crl) + try: + self.vcard = self.verify() + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + +class Manifest(rpki.POW.Manifest): + + @classmethod + def derReadURI(cls, uri, generation): + fn = uri_to_filename(uri, generation.tree) + if not os.path.exists(fn): + Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND) + return None + with open(fn, "rb") as f: + der = f.read() + self = cls.derRead(der) + self.uri = uri + self.fn = fn + self.ee = X509.derReadURI(uri, generation, self) + self.fah = None + self.generation = generation + self.thisUpdate = None + self.nextUpdate = None + self.number = None + return self + + def check(self, trusted = None, crl = None): + status = Status.update(self.uri, self.generation) + self.ee.check(trusted, crl) + try: + self.verify() + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + self.thisUpdate = self.getThisUpdate() + self.nextUpdate = self.getNextUpdate() + self.number = self.getManifestNumber() + self.fah = self.getFiles() + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + +class ROA(rpki.POW.ROA): + + @classmethod + def derReadURI(cls, uri, generation): + fn = uri_to_filename(uri, generation.tree) + if not os.path.exists(fn): + Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND) + return None + with open(fn, "rb") as f: + der = f.read() + self = cls.derRead(der) + self.uri = uri + self.fn = fn + self.generation = generation + self.sha256 = sha256(der) + self.ee = X509.derReadURI(uri, generation, self) + self.asn = None + self.prefixes = None + return self + + def check(self, trusted = None, crl = None): + status = Status.update(self.uri, self.generation) + self.ee.check(trusted, crl) + try: + vcard = self.verify() + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + self.asn = self.getASID() + self.prefixes = self.getPrefixes() + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + +class WalkFrame(object): + """ + Certificate tree walk stack frame. This is basically just a + preamble and a loop, broken out into several separate methods so + that we can fork new tasks in the middle then resume processing of + the current state machine (ie, this frame) when appropriate (eg, + after an rsync or RRDP fetch completes). """ - def __init__(self): - self.certinfo = CertInfo() - self.cert = None - self.manifest = None - self.manifest_generation = None - self.filenames = [] - self.manifest_iteration = None - self.filename_iteration = None - self.stale_manifest = False - self.crldp = None - self.certs = [] - self.crls = [] + def __init__(self, cer): + self.cer = cer self.state = self.initial @tornado.gen.coroutine - def initial(self): - raise NotImplementedError + def __call__(self, wsk): + yield self.state(wsk) @tornado.gen.coroutine - def fetch(self): - raise NotImplementedError + def initial(self, wsk): + self.diruri = first_rsync_uri(self.cer.caDirectory) - @tornado.gen.coroutine - def ready(self): - raise NotImplementedError + fetch = Fetcher(self.diruri) - @tornado.gen.coroutine - def current(self): - raise NotImplementedError + if fetch.needed(): + + # This is where we'd fork another wsk task so we can keep + # busy while rsync is running. Defer that until linear + # version is working, so for now just wait for rsync. + + yield fetch.fetch() + + self.state = self.ready @tornado.gen.coroutine - def backup(self): - raise NotImplementedError + def ready(self, wsk): + self.trusted = wsk.trusted() + self.generation = Generation.current # XXX Wrong, but it's what walk_tree() was doing, fix later + + self.mfturi = first_rsync_uri(self.cer.rpkiManifest) + self.mft = Manifest.derReadURI(self.mfturi, self.generation) + + if self.mft is None: + wsk.pop() + return + + self.crluri = first_rsync_uri(self.mft.ee.crldp) + self.crl = CRL.derReadURI(self.crluri, self.generation) + + if self.crl is None or not self.crl.check(self.cer) or self.mft is None or not self.mft.check(self.trusted, self.crl): + wsk.pop() + return + + # CRL.check() probably ought to check issuer's caDirectory + # against CRL's URI in any case. If we do that, then we know + # that crluri.startswith(diruri) and can just simplify the + # following loop by setting crlfn = crluri[len(diruri):]. + + for name, digest in self.mft.fah: + if self.crluri == self.diruri + name: + if digest != self.crl.sha256: + status.add(codes.DIGEST_MISMATCH) + break + else: + Status.update(self.crluri, self.generation).add(codes.CRL_NOT_IN_MANIFEST) + + # Use an explicit iterator so we can resume it later. + # Run the loop in a separate method for the same reason. + + self.mft_iterator = iter(self.mft.getFiles()) + self.state = self.loop @tornado.gen.coroutine - def done(self): - raise NotImplementedError + def loop(self, wsk): + + counter = 0 + counter_max_before_yield = 50 + + for fn, digest in self.mft_iterator: + + counter += 1 + if counter > counter_max_before_yield: + yield tornado.gen.moment + counter = 0 + + uri = self.diruri + fn + + if uri == self.crluri: + continue + + if fn.endswith(".roa"): + roa = ROA.derReadURI(uri, self.generation) + roa.check() # XXX Do something with result + continue + + if fn.endswith(".gbr"): + gbr = Ghostbuster.derReadURI(uri, self.generation) + gbr.check() # XXX Do something with result + continue + + if fn.endswith(".cer"): + cer = X509.derReadURI(uri, self.generation) + cer.check() # XXX Do something with result + if cer.is_ca: + wsk.push(cer) + + # XXX Temporary: Need to integrate with FSM + # looping, rsync fetching, etc -- this is just a + # hack to preserve old walk_tree() behavior + # temporarily for testing. + + return + + continue + + Status.update(uri, self.generation).add(codes.UNKNOWN_OBJECT_TYPE_SKIPPED) + + wsk.pop() class WalkTask(object): @@ -171,24 +441,34 @@ class WalkTask(object): STACK_OF(walk_ctx_t) in rcynic:tos. """ - def __init__(self): - self.stack = [] + def __init__(self, wsk = None): + self.wsk = [] if wsk is None else wsk @tornado.gen.coroutine def __call__(self): - yield self.stack[-1].state() + while self.wsk: + yield self.wsk[-1](wsk = self) + + def push(self, cer): + self.wsk.append(WalkFrame(cer)) + + def pop(self): + return self.wsk.pop() + + def clone(self): + return WalkTask(wsk = list(self.wsk)) + + def trusted(self): + stack = [w.cer for w in self.wsk] + stack.reverse() + return stack -# Not sure we need all the rsync_status_t, rsync_state_t, and -# rsync_ctx_t stuff from rcynic:tos, add later if we do. # Probably do need some analogue of the rsync_history database, for # tracking down hosts, URIs already fetched, and (perhaps) limiting # simultaneous connection attempts to a single host. -# Analogue to rcynic:tos STACK_OF(task_t): queue of tasks to be -# invoked by the next available worker. Various fun options here, go -# with simplest (unbounded simple queue) initially. task_queue = tornado.queues.Queue() @@ -206,7 +486,6 @@ def parse_arguments(): raise argparse.ArgumentTypeError("%r is not a positive integer " % s) return i - parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("--unauthenticated", type = check_dir, default = "rcynic-data/unauthenticated") parser.add_argument("--old-authenticated", type = check_dir, default = "rcynic-data/authenticated.old") @@ -228,10 +507,10 @@ def read_tals(): b64 = "".join(lines[lines.index("\n"):]) key = rpki.POW.Asymmetric.derReadPublic(b64.decode("base64")) if not uri.endswith(".cer"): - Status.update(furi, None).add(rpki.POW.validation_status.MALFORMED_TAL_URI) + Status.update(furi, None).add(codes.MALFORMED_TAL_URI) yield uri, key except: - Status.update(furi, None).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR) + Status.update(furi, None).add(codes.UNREADABLE_TRUST_ANCHOR_LOCATOR) def uri_to_filename(uri, base = None): @@ -240,13 +519,11 @@ def uri_to_filename(uri, base = None): fn = os.path.join(base, fn) return fn -def uri_to_basename(uri): - return uri.rpartition("/")[2] - def first_uri(uris, scheme): - for uri in uris: - if uri.startswith(scheme): - return uri + if uris is not None: + for uri in uris: + if uri.startswith(scheme): + return uri return None def first_rsync_uri(uris): @@ -258,124 +535,117 @@ def sha256(bytes): return d.digest() -@tornado.gen.coroutine -def walk_tree(cauri, ca, trusted, crl, basedir): - trusted.insert(0, ca) - - sia = ca.getSIA() - diruri = first_rsync_uri(sia[0]) - mfturi = first_rsync_uri(sia[1]) - try: - mft = rpki.POW.Manifest.derReadFile(uri_to_filename(mfturi, basedir)) - except rpki.POW.Error as e: - logger.warn("Couldn't read %s: %s", mfturi, e) - return - ee = mft.certs()[0] - crldp = ee.getCRLDP() - crluri = first_rsync_uri(crldp) - try: - crl = rpki.POW.CRL.derReadFile(uri_to_filename(crluri, basedir)) - except rpki.POW.Error as e: - logger.warn("Couldn't read %s: %s", crluri, e) - return - - crl_status = Status.update(crluri) - crl.verify(ca, crl_status) - - mft_status = Status.update(mfturi) - ee.verify(trusted = trusted, crl = crl, status = mft_status) - mft.verify(status = mft_status) - - crl_status.add(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST) - - for fn, digest in mft.getFiles(): - - # XXX There are enough objects in the RIPE repository that we - # need to let the I/O loop run occasionally. Probably not on - # every object as this does, but without this it takes about - # 18 seconds to check the whole RIPE database, which makes - # APNIC look even slower than they are. Maybe yield every N - # objects for some tunable value of N, or something like that. - - yield tornado.gen.moment - - uri = diruri + fn - status = Status.update(uri) - - if uri == crluri: - if digest != sha256(crl.derWrite()): - status.add(rpki.POW.validation_status.DIGEST_MISMATCH) - status.remove(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST) - continue - - with open(os.path.join(uri_to_filename(diruri, basedir), fn), "rb") as f: - der = f.read() - if sha256(der) != digest: - status.add(rpki.POW.validation_status.DIGEST_MISMATCH) - - if fn.endswith(".roa"): - roa = rpki.POW.ROA.derRead(der) - ee = roa.certs()[0] - ee.verify(trusted = trusted, crl = crl, status = status) - roa.verify(status = status) - continue - - if fn.endswith(".gbr"): - gbr = rpki.POW.CMS.derRead(der) - ee = gbr.certs()[0] - ee.verify(trusted = trusted, crl = crl, status = status) - vcard = gbr.verify(status = status) - continue - - if fn.endswith(".cer"): - cer = rpki.POW.X509.derRead(der) - cer.verify(trusted = trusted, crl = crl, status = status) - is_ca = (cer.getBasicConstraints() or (False, None))[0] - if is_ca: - yield walk_tree(diruri + fn, cer, trusted, crl, basedir) - continue - - status.add(rpki.POW.validation_status.UNKNOWN_OBJECT_TYPE_SKIPPED) +class Fetcher(object): + """ + Network transfer methods and history database. + At the moment this is rsync-only; eventually it will include + support for HTTPS and RRDP. + """ -@tornado.gen.coroutine -def rsync_ta(uri): - assert uri.endswith(".cer") - yield rsync(uri, ()) + # Internal protocol: + # + # - Instances which have just gotten to the query stage are not registered + # + # - Instances which are in progress are listed in .history and + # have a Condition object in .pending; instances which depend on + # this should wait for the condition, then return. + # + # - Instances which have completed are listed in .history and have + # .pending set to None. -@tornado.gen.coroutine -def rsync_sia(uri): - assert uri.endswith("/") - yield rsync(uri, ("--recursive", "--delete")) + _rsync_deadhosts = set() + _rsync_history = dict() -@tornado.gen.coroutine -def rsync(uri, cmd_extra): + def __init__(self, uri): + self.uri = uri + self.pending = None + self.status = None + + def _rsync_split_uri(self): + return tuple(self.uri.rstrip("/").split("/")[2:]) - # This is where we should check our internal history database to - # see if we've already fetched this URI or something covering it; - # if so, just return success at this point, we've done all the fetching - # we're going to do for this URI on this rcynic run. + def _rsync_find(self, path): + for i in xrange(1, len(path)): + target = path[:i+1] + try: + return self._rsync_history[target] + except KeyError: + continue + return None + + def needed(self): + if self.uri.startswith("rsync://"): + return self._rsync_needed() + raise ValueError + + def _rsync_needed(self): + path = self._rsync_split_uri() + if path[0] in self._rsync_deadhosts: + return False + entry = self._rsync_find(path) + return entry is None or entry.pending is not None - cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes", "-4"] - cmd.extend(cmd_extra) - cmd.append(uri) - cmd.append(uri_to_filename(uri, args.unauthenticated)) + def fetch(self): + if self.uri.startswith("rsync://"): + return self._rsync_fetch() + raise ValueError - dn = os.path.dirname(cmd[-1]) - if not os.path.exists(dn): - os.makedirs(dn) + @tornado.gen.coroutine + def _rsync_fetch(self): + assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") or self.uri.endswith("/")) - t0 = time.time() - proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT) - logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd)) - ret, out = yield [proc.wait_for_exit(raise_error = False), proc.stdout.read_until_close()] - t1 = time.time() - for line in out.splitlines(): - logger.debug("rsync[%s] %s", proc.pid, line) - logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, ret) + path = self._rsync_split_uri() + if path[0] in self._rsync_deadhosts: + return + other = self._rsync_find(path) + if other is not None: + if other.pending is not None: + yield other.pending.wait() + return - # Should do something with rsync result and validation status database here. + self.pending = tornado.locks.Condition() + self._rsync_history[path] = self + try: + cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes"] + if self.uri.endswith("/"): + cmd.append("--recursive") + cmd.append("--delete") + cmd.append(self.uri) + cmd.append(uri_to_filename(self.uri, args.unauthenticated)) + + dn = os.path.dirname(cmd[-1]) + if not os.path.exists(dn): + os.makedirs(dn) + + # We use the stdout close from rsync to detect when the subprocess has finished. + # There's a lovely tornado.process.Subprocess.wait_for_exit() method which does + # exactly what one might want -- but Unix signal handling still hasn't caught up + # to the software interrupt architecture ITS had forty years ago, so signals still + # cause random "system call interrupted" failures in other libraries. Nothing + # Tornado can do about this, so we avoid signals entirely and collect the process + # exit status directly from the operating system. In theory, the WNOHANG isn't + # really necessary, we use it anyway to be safe in case theory is wrong this week. + + t0 = time.time() + proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT) + logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd)) + output = yield proc.stdout.read_until_close() + pid, self.status = os.waitpid(proc.pid, os.WNOHANG) + if (pid, self.status) == (0, 0): + logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", proc.pid) + t1 = time.time() + for line in output.splitlines(): + logger.debug("rsync[%s] %s", proc.pid, line) + logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, self.status) + + # Should do something with rsync result and validation status database here. + + finally: + pending = self.pending + self.pending = None + pending.notify_all() class CheckTALTask(object): @@ -388,42 +658,33 @@ class CheckTALTask(object): @tornado.gen.coroutine def __call__(self): - yield rsync_ta(self.uri) + yield Fetcher(self.uri).fetch() for generation in (Generation.current, Generation.backup): - try: - yield self.check(generation) - - # Need to convert this into initialization of a - # WalkTask object and insertion of that task into the - # task_queue. Check rcynic:tos to see how it handles - # the walk_ctx initialization. - + if (yield self.check(generation)): logger.debug("Starting walk of %s", self.uri) - yield walk_tree(self.uri, self.cer, [self.cer], None, generation.tree) - + wsk = WalkTask() + wsk.push(self.cer) + task_queue.put(wsk) return - except rpki.POW.Error: - pass - Status.update(self.uri, None).add(rpki.POW.validation_status.TRUST_ANCHOR_SKIPPED) + else: + Status.update(self.uri, None).add(codes.TRUST_ANCHOR_SKIPPED) @tornado.gen.coroutine def check(self, generation): status = Status.update(self.uri, generation) - try: - self.cer = rpki.POW.X509.derReadFile(uri_to_filename(self.uri, generation.tree)) - except rpki.POW.OpenSSLError: - status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR) - status.add(rpki.POW.validation_status.OBJECT_REJECTED) - raise rpki.POW.ValidationError - if self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic(): - status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH) - status.add(rpki.POW.validation_status.OBJECT_REJECTED) - raise rpki.POW.ValidationError - try: - self.cer.verify(trusted = [self.cer], status = status) - except rpki.POW.ValidationError: - status.add(rpki.POW.validation_status.OBJECT_REJECTED) - raise + self.cer = X509.derReadURI(self.uri, generation) + ok = False + if self.cer is None: + status.add(codes.UNREADABLE_TRUST_ANCHOR) + status.add(codes.OBJECT_REJECTED) + elif self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic(): + status.add(codes.TRUST_ANCHOR_KEY_MISMATCH) + status.add(codes.OBJECT_REJECTED) + elif not self.cer.check(): + status.add(codes.OBJECT_REJECTED) + else: + ok = True + raise tornado.gen.Return(ok) @tornado.gen.coroutine @@ -435,13 +696,13 @@ def worker(meself): while True: task = yield task_queue.get() try: - logger.info("Worker %s starting %s", meself, task) + #logger.debug("Worker %s starting %s", meself, task) yield task() except: logger.exception("Worker %s caught unhandled exception from %s", meself, task) finally: task_queue.task_done() - logger.info("Worker %s finished %s", meself, task) + #logger.debug("Worker %s finished %s", meself, task) @tornado.gen.coroutine @@ -452,6 +713,15 @@ def main(): yield task_queue.join() +def final_report(): + # Should be doing something to generate rcynic.xml here. + + if False: + for uri in sorted(Status.db): + print Status.db[uri] + else: + print "{} entries in status database".format(len(Status.db)) + os.putenv("TZ", "UTC") time.tzset() @@ -465,8 +735,4 @@ logging.getLogger().setLevel(logging.DEBUG) tornado.ioloop.IOLoop.current().run_sync(main) -if False: - for uri in sorted(Status.db): - print Status.db[uri] - -# Should be doing something to generate rcynic.xml here. +final_report() |