#!/usr/bin/env python # $Id$ # Copyright (C) 2015-2016 Parsons Government Services ("PARSONS") # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND PARSONS DISCLAIMS ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # PARSONS BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ RPKI validation engine. """ import os import sys import ssl import time import copy import errno import shutil import socket import logging import argparse import tempfile import urlparse import subprocess import tornado.gen import tornado.locks import tornado.ioloop import tornado.queues import tornado.process import tornado.httpclient import rpki.POW import rpki.log import rpki.config import rpki.sundial import rpki.relaxng import rpki.autoconf from rpki.oids import id_kp_bgpsec_router from lxml.etree import (ElementTree, Element, SubElement, Comment, XML, DocumentInvalid, XMLSyntaxError, iterparse) logger = logging.getLogger("rcynicng") xmlns = rpki.relaxng.rrdp.xmlns tag_delta = xmlns + "delta" tag_notification = xmlns + "notification" tag_publish = xmlns + "publish" tag_snapshot = xmlns + "snapshot" tag_withdraw = xmlns + "withdraw" codes = rpki.POW.validation_status class Status(object): """ Validation status database, like validation_status_t in rcynic:tos. rcynic:tos version of this data structure is stored as an AVL tree, because the OpenSSL STACK_OF() sort-and-bsearch turned out to be a very poor choice for the input data. Remains to be seen whether we need to do something like that here too. """ db = dict() def __init__(self, uri): self.uri = uri self._timestamp = None self.status = set() def __str__(self): return "{my.timestamp} {my.uri} {status}".format( my = self, status = ",".join(str(s) for s in sorted(self.status))) @property def timestamp(self): return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self._timestamp)) @classmethod def get(cls, uri): try: return cls.db[uri].status except KeyError: return None @classmethod def update(cls, uri): try: self = cls.db[uri] except KeyError: self = cls.db[uri] = cls(uri) self._timestamp = time.time() return self.status @classmethod def add(cls, uri, *codes): status = cls.update(uri) for code in codes: status.add(code) @classmethod def remove(cls, uri, *codes): if uri in cls.db: for code in codes: cls.db[uri].status.discard(code) @classmethod def test(cls, uri, code): return uri in cls.db and code in cls.db[uri].status def install_object(obj): obj.obj.authenticated.add(authenticated) obj.obj.save() class X509StoreCTX(rpki.POW.X509StoreCTX): @classmethod def subclass(cls, **kwargs): return type(cls.__name__, (cls,), kwargs) status = None def verify_callback(self, ok): err = self.getError() if err in (codes.X509_V_OK.code, codes.X509_V_ERR_SUBJECT_ISSUER_MISMATCH.code): return ok elif err == codes.X509_V_ERR_CRL_HAS_EXPIRED.code: return True elif err == codes.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT.code: self.status.add(codes.TRUST_ANCHOR_NOT_SELF_SIGNED) return ok else: self.status.add(codes.find(err)) return ok class POW_Mixin(object): @classmethod def store_if_new(cls, der, uri, retrieval): self = cls.derRead(der) ski, aki = self.get_hex_SKI_AKI() return RPKIObject.objects.get_or_create( der = der, defaults = dict(uri = uri, aki = aki, ski = ski, sha256 = sha256hex(der), retrieved = retrieval)) def get_hex_SKI_AKI(self): cer = self.certs()[0] ski = cer.getSKI() aki = cer.getAKI() return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" @property def uri(self): return self.obj.uri @property def aki(self): return self.obj.aki @property def ski(self): return self.obj.ski class X509(rpki.POW.X509, POW_Mixin): def __repr__(self): try: return "".format(self.uri, id(self)) except: return "".format(id(self)) def get_hex_SKI_AKI(self): ski = self.getSKI() aki = self.getAKI() return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" @classmethod def load(cls, obj, cms = None): if cms is not None: # XXX Kludge to work around lack of subclass support in rpki.POW.CMS.certs(). der = cms.certs()[0].derWrite() else: der = obj.der self = cls.derRead(der) self.obj = obj self.bc = self.getBasicConstraints() self.eku = self.getEKU() self.aia = self.getAIA() self.sia = self.getSIA() self.crldp = self.getCRLDP() self.is_ca = self.bc is not None and self.bc[0] self.caDirectory, self.rpkiManifest, self.signedObjectRepository, self.rpkiNotify \ = self.sia or (None, None, None, None) return self @staticmethod def count_uris(uris, scheme = "rsync://"): count = 0 if uris is not None: for uri in uris: if uri.startswith(scheme): count += 1 return count def check(self, trusted, crl): #logger.debug("Starting checks for %r", self) status = Status.update(self.uri) is_ta = trusted is None is_routercert = (self.eku is not None and id_kp_bgpsec_router in self.eku and not self.is_ca and self.uri.endswith(".cer")) if self.eku is not None and (self.is_ca or not self.uri.endswith(".cer")): status.add(codes.INAPPROPRIATE_EKU_EXTENSION) if is_ta and not self.is_ca: status.add(codes.MALFORMED_TRUST_ANCHOR) if is_ta and self.aia is not None: status.add(codes.AIA_EXTENSION_FORBIDDEN) if not is_ta and self.aia is None: status.add(codes.AIA_EXTENSION_MISSING) if is_routercert and self.sia is not None: status.add(codes.SIA_EXTENSION_FORBIDDEN) if not is_routercert and self.sia is None: status.add(codes.SIA_EXTENSION_MISSING) if is_ta and self.crldp is not None: status.add(codes.CRLDP_EXTENSION_FORBIDDEN) if not is_ta and self.crldp is None: status.add(codes.CRLDP_EXTENSION_MISSING) if not is_ta and not self.aki: status.add(codes.AKI_EXTENSION_MISSING) elif not is_ta and self.aki != trusted[0].ski: status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH) serial = self.getSerial() if serial <= 0 or serial > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF: status.add(codes.BAD_CERTIFICATE_SERIAL_NUMBER) if self.getVersion() != 2: status.add(codes.WRONG_OBJECT_VERSION) n_rsync_caIssuers = self.count_uris(self.aia) n_rsync_caDirectory = self.count_uris(self.caDirectory) n_rsync_rpkiManifest = self.count_uris(self.rpkiManifest) n_rsync_signedObjectRepository = self.count_uris(self.signedObjectRepository) if n_rsync_caIssuers > 1 or n_rsync_caDirectory > 1 or n_rsync_rpkiManifest > 1 or n_rsync_signedObjectRepository > 1: status.add(codes.MULTIPLE_RSYNC_URIS_IN_EXTENSION) if self.aia is not None and n_rsync_caIssuers == 0: status.add(codes.MALFORMED_AIA_EXTENSION) if self.is_ca: ok = n_rsync_caDirectory != 0 and n_rsync_rpkiManifest != 0 and n_rsync_signedObjectRepository == 0 elif not is_routercert: ok = n_rsync_caDirectory == 0 and n_rsync_rpkiManifest == 0 and n_rsync_signedObjectRepository != 0 else: ok = self.sia is None if not ok: status.add(codes.MALFORMED_SIA_EXTENSION) if not is_ta and self.count_uris(self.crldp) == 0: status.add(codes.MALFORMED_CRLDP_EXTENSION) self.checkRPKIConformance(status = status, eku = id_kp_bgpsec_router if is_routercert else None) try: self.verify(trusted = [self] if trusted is None else trusted, crl = crl, policy = "1.3.6.1.5.5.7.14.2", context_class = X509StoreCTX.subclass(status = status)) except rpki.POW.ValidationError as e: logger.debug("%r rejected: %s", self, e) status.add(codes.OBJECT_REJECTED) codes.normalize(status) #logger.debug("Finished checks for %r", self) return not any(s.kind == "bad" for s in status) class CRL(rpki.POW.CRL, POW_Mixin): def __repr__(self): try: return "".format(self.uri, id(self)) except: return "".format(id(self)) def get_hex_SKI_AKI(self): aki = self.getAKI() return "", aki.encode("hex") if aki else "" @classmethod def load(cls, obj): self = cls.derRead(obj.der) self.obj = obj self.thisUpdate = self.getThisUpdate() self.nextUpdate = self.getNextUpdate() self.number = self.getCRLNumber() return self def check(self, issuer): status = Status.update(self.uri) self.checkRPKIConformance(status = status, issuer = issuer) try: self.verify(issuer) except rpki.POW.ValidationError as e: logger.debug("%r rejected: %s", self, e) status.add(codes.OBJECT_REJECTED) codes.normalize(status) if self.getVersion() != 1: status.add(codes.WRONG_OBJECT_VERSION) now = rpki.sundial.now() if self.thisUpdate > now: status.add(codes.CRL_NOT_YET_VALID) if self.nextUpdate < now: status.add(codes.STALE_CRL_OR_MANIFEST) if self.number is None: status.add(codes.CRL_NUMBER_EXTENSION_MISSING) if self.number < 0: status.add(codes.CRL_NUMBER_IS_NEGATIVE) if self.number > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF: status.add(codes.CRL_NUMBER_OUT_OF_RANGE) if self.getIssuer() != issuer.getSubject(): status.add(codes.CRL_ISSUER_NAME_MISMATCH) if not self.aki: status.add(codes.AKI_EXTENSION_MISSING) elif self.aki != issuer.ski: status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH) return not any(s.kind == "bad" for s in status) class Ghostbuster(rpki.POW.CMS, POW_Mixin): def __repr__(self): try: return "".format(self.uri, id(self)) except: return "".format(id(self)) @classmethod def load(cls, obj): self = cls.derRead(obj.der) self.obj = obj self.ee = X509.load(obj, self) self.vcard = None return self def check(self, trusted, crl): status = Status.update(self.uri) self.ee.check(trusted = trusted, crl = crl) try: self.vcard = self.verify() except rpki.POW.ValidationError as e: logger.debug("%r rejected: %s", self, e) status.add(codes.OBJECT_REJECTED) self.checkRPKIConformance(status) codes.normalize(status) return not any(s.kind == "bad" for s in status) class Manifest(rpki.POW.Manifest, POW_Mixin): def __repr__(self): try: return "".format(self.uri, id(self)) except: return "".format(id(self)) @classmethod def load(cls, obj): self = cls.derRead(obj.der) self.obj = obj self.ee = X509.load(obj, self) self.fah = None self.thisUpdate = None self.nextUpdate = None self.number = None return self def check(self, trusted, crl): status = Status.update(self.uri) self.ee.check(trusted = trusted, crl = crl) try: self.verify() except rpki.POW.ValidationError as e: logger.debug("%r rejected: %s", self, e) status.add(codes.OBJECT_REJECTED) self.checkRPKIConformance(status) self.thisUpdate = self.getThisUpdate() self.nextUpdate = self.getNextUpdate() self.number = self.getManifestNumber() self.fah = self.getFiles() self.notBefore = self.ee.getNotBefore() self.notAfter = self.ee.getNotAfter() if self.thisUpdate < self.notBefore or self.nextUpdate > self.notAfter: status.add(codes.MANIFEST_INTERVAL_OVERRUNS_CERT) now = rpki.sundial.now() if self.thisUpdate > now: status.add(codes.MANIFEST_NOT_YET_VALID) if self.nextUpdate < now: status.add(codes.STALE_CRL_OR_MANIFEST) codes.normalize(status) return not any(s.kind == "bad" for s in status) def find_crl_candidate_hashes(self): for fn, digest in self.fah: if fn.endswith(".crl"): yield digest.encode("hex") class ROA(rpki.POW.ROA, POW_Mixin): def __repr__(self): try: return "".format(self.uri, id(self)) except: return "".format(id(self)) @classmethod def load(cls, obj): self = cls.derRead(obj.der) self.obj = obj self.ee = X509.load(obj, self) self.asn = None self.prefixes = None return self def check(self, trusted, crl): status = Status.update(self.uri) self.ee.check(trusted = trusted, crl = crl) try: vcard = self.verify() except rpki.POW.ValidationError: status.add(codes.OBJECT_REJECTED) self.checkRPKIConformance(status) self.asn = self.getASID() self.prefixes = self.getPrefixes() codes.normalize(status) return not any(s.kind == "bad" for s in status) class_dispatch = dict(cer = X509, crl = CRL, gbr = Ghostbuster, mft = Manifest, roa = ROA) def uri_to_class(uri): cls = class_dispatch.get(uri[-3:]) if len(uri) > 4 and uri[-4] == "." else None if cls is None: Status.add(uri, codes.UNKNOWN_OBJECT_TYPE_SKIPPED) return cls # If we find ourselves using this same ordering for every retrieval from the RPKIObjects model, we # can add it as a Meta option for the model and omit it in the query expressions, like this: # # class RPKIObjects(models.Model): # ... # class Meta: # ordering = ["-retrieved__started"] # # https://docs.djangoproject.com/en/1.8/ref/models/querysets/#order-by # https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering def fetch_objects(**kwargs): for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"): cls = uri_to_class(obj.uri) if cls is not None: yield cls.load(obj) class WalkFrame(object): """ Certificate tree walk stack frame. This is basically just a preamble and a loop, broken out into several separate methods so that we can fork new tasks in the middle then resume processing of the current state machine (ie, this frame) when appropriate (eg, after an rsync or RRDP fetch completes). """ def __init__(self, cer): self.cer = cer self.state = self.initial def __repr__(self): try: return "".format(self.cer.uri, id(self)) except: return "".format(id(self)) @tornado.gen.coroutine def __call__(self, wsk): yield self.state(wsk) @tornado.gen.coroutine def initial(self, wsk): rsync_uri = first_rsync_uri(self.cer.caDirectory) rrdp_uri = first_https_uri(self.cer.rpkiNotify) if args.prefer_rsync: uri = rsync_uri or rrdp_uri else: uri = rrdp_uri or rsync_uri self.fetcher = Fetcher(uri) if not self.fetcher.needed(): self.state = self.ready elif not args.spawn_on_fetch: self.state = self.fetch else: self.state = self.fetch yield task_queue.put(wsk.clone()) wsk.pop() @tornado.gen.coroutine def fetch(self, wsk): yield self.fetcher.fetch() self.state = self.ready @tornado.gen.coroutine def ready(self, wsk): self.trusted = wsk.trusted() logger.debug("%r scanning products", self) # NB: CRL checks on manifest EE certificates deferred until we've picked a CRL. mft_candidates = [] crl_candidates = [] crl_candidate_hashes = set() for mft in fetch_objects(aki = self.cer.ski, uri__endswith = ".mft"): if mft.check(trusted = self.trusted, crl = None): mft_candidates.append(mft) crl_candidate_hashes.update(mft.find_crl_candidate_hashes()) if not mft_candidates: wsk.pop() return for crl in fetch_objects(aki = self.cer.ski, uri__endswith = ".crl", sha256__in = crl_candidate_hashes): if crl.check(self.trusted[0]): crl_candidates.append(crl) mft_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started)) crl_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started)) if not crl_candidates: wsk.pop() return self.crl = crl_candidates[0] install_object(self.crl) Status.add(self.crl.uri, codes.OBJECT_ACCEPTED) #logger.debug("Picked CRL %s", self.crl.uri) for mft in mft_candidates: if self.crl.isRevoked(mft.ee): Status.add(mft.obj.uri, codes.MANIFEST_EE_REVOKED) continue self.mft = mft break else: wsk.pop() return install_object(self.mft) Status.add(self.mft.obj.uri, codes.OBJECT_ACCEPTED) self.stale_crl = Status.test(self.crl.uri, codes.STALE_CRL_OR_MANIFEST) self.stale_mft = Status.test(self.mft.uri, codes.STALE_CRL_OR_MANIFEST) # Issue warnings on mft and crl URI mismatches? # Use an explicit iterator so we can resume it; run loop in separate method, same reason. self.mft_iterator = iter(self.mft.getFiles()) self.state = self.loop @tornado.gen.coroutine def loop(self, wsk): #logger.debug("Processing %s", self.mft.uri) for fn, digest in self.mft_iterator: yield tornado.gen.moment uri = self.mft.uri[:self.mft.uri.rindex("/") + 1] + fn # Need general URI validator here? if uri == self.crl.uri: continue cls = uri_to_class(uri) if cls is None: continue if cls in (Manifest, CRL): Status.add(uri, codes.INAPPROPRIATE_OBJECT_TYPE_SKIPPED) continue for obj in fetch_objects(sha256 = digest.encode("hex")): if self.stale_crl: Status.add(uri, codes.TAINTED_BY_STALE_CRL) if self.stale_mft: Status.add(uri, codes.TAINTED_BY_STALE_MANIFEST) if not obj.check(trusted = self.trusted, crl = self.crl): Status.add(uri, codes.OBJECT_REJECTED) continue install_object(obj) Status.add(uri, codes.OBJECT_ACCEPTED) if cls is not X509 or not obj.is_ca: break wsk.push(obj) return wsk.pop() class WalkTask(object): """ Task corresponding to one walk stack, roughly analgous to STACK_OF(walk_ctx_t) in rcynic:tos. """ def __init__(self, wsk = None, cer = None): self.wsk = [] if wsk is None else wsk if cer is not None: self.push(cer) def __repr__(self): try: return "".format(self.wsk[-1].cer.uri, id(self)) except: return "".format(id(self)) @tornado.gen.coroutine def __call__(self): while self.wsk: yield self.wsk[-1](wsk = self) def push(self, cer): self.wsk.append(WalkFrame(cer)) def pop(self): return self.wsk.pop() def clone(self): return WalkTask(wsk = list(self.wsk)) def trusted(self): stack = [w.cer for w in self.wsk] stack.reverse() return stack def read_tals(): for head, dirs, files in os.walk(args.trust_anchor_locators): for fn in files: if fn.endswith(".tal"): furi = "file://" + os.path.abspath(os.path.join(head, fn)) try: with open(os.path.join(head, fn), "r") as f: lines = [line.strip() for line in f] blank = lines.index("") uris = lines[:blank] key = rpki.POW.Asymmetric.derReadPublic("".join(lines[blank:]).decode("base64")) if not uris or not all(uri.endswith(".cer") for uri in uris): Status.add(furi, codes.MALFORMED_TAL_URI) yield uris, key except: Status.add(furi, codes.UNREADABLE_TRUST_ANCHOR_LOCATOR) def uri_to_filename(uri, base = None): fn = uri[uri.index("://")+3:] if base is not None: fn = os.path.join(base, fn) return fn def first_uri(uris, scheme): if uris is not None: for uri in uris: if uri.startswith(scheme): return uri return None def first_rsync_uri(uris): return first_uri(uris, "rsync://") def first_https_uri(uris): return first_uri(uris, "https://") def sha256hex(bytes): d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) d.update(bytes) return d.digest().encode("hex") class RRDP_ParseFailure(Exception): "Failure parsing RRDP message." class DeadHost(Exception): "Host recently tried and known to be unavailable." class Fetcher(object): """ Network transfer methods and history database. At the moment this is rsync-only; eventually it will include support for HTTPS and RRDP. """ # Internal protocol: # # - Instances which have just gotten to the query stage are not registered # # - Instances which are in progress are listed in .history and # have a Condition object in .pending; instances which depend on # this should wait for the condition, then return. # # - Instances which have completed are listed in .history and have # .pending set to None. _rsync_deadhosts = set() _rsync_history = dict() _https_deadhosts = set() _https_history = dict() _https_invalid = set() def __init__(self, uri, ta = False): self.uri = uri self.ta = ta self.pending = None self.status = None def _rsync_split_uri(self): return tuple(self.uri.rstrip("/").split("/")[2:]) def _rsync_find(self, path): for i in xrange(1, len(path)): target = path[:i+1] try: return self._rsync_history[target] except KeyError: continue return None def needed(self): if not args.fetch: return False if self.uri.startswith("rsync://"): return self._rsync_needed() if self.uri.startswith("https://"): return self._https_needed() raise ValueError def _rsync_needed(self): path = self._rsync_split_uri() if path[0] in self._rsync_deadhosts: return False entry = self._rsync_find(path) return entry is None or entry.pending is not None def _https_needed(self): netloc = urlparse.urlparse(self.uri).netloc if netloc in self._https_deadhosts: return False entry = self._https_history.get(self.uri) return entry is None or entry.pending is not None def fetch(self): if self.uri.startswith("rsync://"): return self._rsync_fetch() if self.uri.startswith("https://"): return self._https_fetch_ta() if self.ta else self._rrdp_fetch() raise ValueError @tornado.gen.coroutine def _rsync_fetch(self): assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") if self.ta else self.uri.endswith("/")) if not args.fetch: return path = self._rsync_split_uri() dead = path[0] in self._rsync_deadhosts other = self._rsync_find(path) if not dead and other is not None and other.pending is not None: yield other.pending.wait() if dead or other is not None: return self.pending = tornado.locks.Condition() self._rsync_history[path] = self try: path = uri_to_filename(self.uri, args.unauthenticated) cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes"] if self.uri.endswith("/"): cmd.append("--recursive") cmd.append("--delete") cmd.append(self.uri) cmd.append(path) dn = os.path.dirname(path) if not os.path.exists(dn): os.makedirs(dn) # We use the stdout close from rsync to detect when the subprocess has finished. # There's a lovely tornado.process.Subprocess.wait_for_exit() method which does # exactly what one would think we'd want -- but Unix signal handling still hasn't # caught up to the software interrupt architecture ITS had forty years ago, so # signals still cause random "system call interrupted" failures in other libraries. # Nothing Tornado can do about this, so we avoid signals entirely and collect the # process exit status directly from the operating system. In theory, the WNOHANG # isn't necessary here, we use it anyway to be safe in case theory is wrong. # If we need to add a timeout here to guard against rsync processes taking too long # (which has happened in the past with, eg, LACNIC), see tornado.gen.with_timeout() # (documented in the utility functions section of the tornado.gen page), which wraps # any future in a timeout. t0 = time.time() rsync = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT) logger.debug("rsync[%s] started \"%s\"", rsync.pid, " ".join(cmd)) output = yield rsync.stdout.read_until_close() pid, self.status = os.waitpid(rsync.pid, os.WNOHANG) t1 = time.time() if (pid, self.status) == (0, 0): logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", rsync.pid) for line in output.splitlines(): logger.debug("rsync[%s] %s", rsync.pid, line) logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, t1 - t0, self.status) # Should do something with rsync result and validation status database here. retrieval = Retrieval.objects.create( uri = self.uri, started = rpki.sundial.datetime.fromtimestamp(t0), finished = rpki.sundial.datetime.fromtimestamp(t1), successful = self.status == 0) for fn in self._rsync_walk(path): yield tornado.gen.moment uri = "rsync://" + fn[len(args.unauthenticated):].lstrip("/") cls = uri_to_class(uri) if cls is not None: try: with open(fn, "rb") as f: cls.store_if_new(f.read(), uri, retrieval) except: Status.add(uri, codes.UNREADABLE_OBJECT) logger.exception("Couldn't read %s from rsync tree", uri) finally: pending = self.pending self.pending = None pending.notify_all() def _rsync_walk(self, path): if self.uri.endswith("/"): for head, dirs, files in os.walk(path): for fn in files: yield os.path.join(head, fn) elif os.path.exists(path): yield path @tornado.gen.coroutine def _https_fetch_url(self, url, streaming_callback = None): netloc = urlparse.urlparse(url).netloc if netloc in self._https_deadhosts: raise DeadHost # Should do something with deadhost processing below. Looks # like errors such as HTTP timeout show up as # tornado.httpclient.HTTPError exceptions (which we could # suppress if we wanted to do so, but we probably don't). # HTTP timeout shows up in the logs as "HTTP 599". See doc for: # # tornado.httpclient.AsyncHTTPClient.fetch() # tornado.httpclient.HTTPError # Might need to do something with If-Modified-Since support # See if_modified_since argument to # http://www.tornadoweb.org/en/stable/httpclient.html#request-objects # (which we can pass to client.fetch(), below). Not sure how # "you don't need to retrieve this" result comes back, # probably a benign exception we need to catch. Supporting # this means adding another null-able timestamp field to the # RRDPSnapshot model (which probably should be named the # RRDPZone model instead), and storing a datetime there. # Would also need to pull timestamp from the Last-Modified # header in the response object. try: ok = False t0 = time.time() client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size) validate = args.validate_https and netloc not in self._https_invalid try: response = yield client.fetch(url, streaming_callback = streaming_callback, validate_cert = validate, connect_timeout = args.https_timeout, request_timeout = args.https_timeout) except ssl.SSLError as e: if not validate or e.reason != "CERTIFICATE_VERIFY_FAILED": raise logger.info("HTTPS validation failure for %s, retrying with validation disabled", url) response = yield client.fetch(url, streaming_callback = streaming_callback, validate_cert = False, connect_timeout = args.https_timeout, request_timeout = args.https_timeout) self._https_invalid.add(netloc) # Might want to check response Content-Type here ok = True except tornado.httpclient.HTTPError as e: # Might want to check e.response here to figure out whether to add to _https_deadhosts. logger.info("HTTP error for %s: %s", url, e) raise except (socket.error, IOError, ssl.SSLError) as e: # Might want to check e.errno here to figure out whether to add to _https_deadhosts. logger.info("Network I/O error for %s: %s", url, e) raise except Exception as e: logger.exception("Error (%r) for %s", type(e), url) raise finally: t1 = time.time() logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) retrieval = Retrieval.objects.create( uri = url, started = rpki.sundial.datetime.fromtimestamp(t0), finished = rpki.sundial.datetime.fromtimestamp(t1), successful = ok) if ok: raise tornado.gen.Return((retrieval, response)) @tornado.gen.coroutine def _https_fetch_ta(self): if not args.fetch: return other = self._https_history.get(self.uri) if other is not None and other.pending is not None: yield other.pending.wait() return self.pending = tornado.locks.Condition() self._rsync_history[self.uri] = self try: retrieval, response = yield self._https_fetch_url(self.uri) X509.store_if_new(response.body, self.uri, retrieval) except: logger.exception("Couldn't load %s", self.uri) finally: pending = self.pending self.pending = None pending.notify_all() @tornado.gen.coroutine def _rrdp_fetch_notification(self, url): retrieval, response = yield self._https_fetch_url(url) notification = ElementTree(file = response.buffer).getroot() rpki.relaxng.rrdp.schema.assertValid(notification) if notification.tag != tag_notification: raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag)) raise tornado.gen.Return((retrieval, notification)) @tornado.gen.coroutine def _rrdp_fetch_data_file(self, url, expected_hash): sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) xml_file = tempfile.SpooledTemporaryFile() retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data))) received_hash = sha256.digest().encode("hex") xml_file.seek(0) if received_hash != expected_hash.lower(): raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash)) raise tornado.gen.Return((retrieval, response, xml_file)) @tornado.gen.coroutine def _rrdp_bulk_create(self, new_objs, existing_objs): from django.db import IntegrityError #logger.debug("Bulk creation of new RPKIObjects") try: RPKIObject.objects.bulk_create(new_objs) except IntegrityError: #logger.debug("Some objects already existed, weeding and retrying") i = 0 while i < len(new_objs): yield tornado.gen.moment try: existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der)) logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s", new_objs[i].uri, new_objs[i].sha256) except RPKIObject.DoesNotExist: i += 1 else: del new_objs[i] RPKIObject.objects.bulk_create(new_objs) del new_objs[:] @tornado.gen.coroutine def _rrdp_fetch(self): from django.db import transaction if not args.fetch: return other = self._https_history.get(self.uri) if other is not None and other.pending is not None: yield other.pending.wait() return self.pending = tornado.locks.Condition() self._https_history[self.uri] = self try: retrieval, notification = yield self._rrdp_fetch_notification(url = self.uri) session_id = notification.get("session_id") serial = long(notification.get("serial")) snapshot = RRDPSnapshot.objects.filter( session_id = session_id).order_by("-retrieved__started").first() logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r", self.uri, session_id, serial, snapshot) if snapshot is not None and snapshot.serial == serial: logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri) return deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) for delta in notification.iterchildren(tag_delta)) if snapshot is None or snapshot.serial + 1 not in deltas: existing_rpkiobject_map = dict() if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk")) x = notification.find(tag_snapshot) url, hash = x.get("uri"), x.get("hash") logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) retrieval, response, xml_file = yield self._rrdp_fetch_data_file(url, hash) snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in # a single bulk addition, which is faster ... unless one or more of them isn't really new, in # which case we have to check everything in that batch when we get the IntegrityError, so # the smaller the batch, the faster that check. No single good answer. root = None existing_rpkiobjects = [] new_rpkiobjects = [] chunk = 2000 for event, node in iterparse(xml_file): if node is root: continue if root is None: root = node.getparent() if root is None or root.tag != tag_snapshot \ or root.get("version") != "1" \ or any(a not in ("version", "session_id", "serial") for a in root.attrib): raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) if root.get("session_id") != session_id: raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( session_id, url, root.get("session_id"))) if long(root.get("serial")) != long(serial): raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( serial, url, root.get("serial"))) if node.tag != tag_publish or node.getparent() is not root \ or any(a != "uri" for a in node.attrib): raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) uri = node.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) der = node.text.decode("base64") sha256 = sha256hex(der) try: existing_rpkiobjects.append(existing_rpkiobject_map[sha256]) except KeyError: ski, aki = cls.derRead(der).get_hex_SKI_AKI() new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, retrieved = retrieval, sha256 = sha256)) node.clear() while node.getprevious() is not None: del root[0] if len(new_rpkiobjects) > chunk: yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) yield tornado.gen.moment if len(new_rpkiobjects) > 0: yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) RPKIObject.snapshot.through.objects.bulk_create([ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) for i in retrieval.rpkiobject_set.values_list("pk", flat = True)]) RPKIObject.snapshot.through.objects.bulk_create([ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) for i in existing_rpkiobjects]) snapshot.retrieved = retrieval snapshot.save() xml_file.close() else: logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, (serial - snapshot.serial), snapshot.serial, serial) deltas = [(serial, deltas[serial][0], deltas[serial][1]) for serial in xrange(snapshot.serial + 1, serial + 1)] futures = [] while deltas or futures: while deltas and len(futures) < args.fetch_ahead_goal: serial, url, hash = deltas.pop(0) logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url) futures.append(self._rrdp_fetch_data_file(url, hash)) retrieval, response, xml_file = yield futures.pop(0) root = None with transaction.atomic(): snapshot.serial += 1 snapshot.save() logger.debug("RRDP %s serial %s loading", self.uri, snapshot.serial) for event, node in iterparse(xml_file): if node is root: continue if root is None: root = node.getparent() if root is None or root.tag != tag_delta \ or root.get("version") != "1" \ or any(a not in ("version", "session_id", "serial") for a in root.attrib): raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) if root.get("session_id") != session_id: raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( session_id, url, root.get("session_id"))) if long(root.get("serial")) != snapshot.serial: raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( snapshot.serial, url, root.get("serial"))) hash = node.get("hash") if node.getparent() is not root or node.tag not in (tag_publish, tag_withdraw) \ or (node.tag == tag_withdraw and hash is None) \ or any(a not in ("uri", "hash") for a in node.attrib): raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) if node.tag == tag_withdraw or node.get("hash") is not None: snapshot.rpkiobject_set.remove(snapshot.rpkiobject_set.get(sha256 = node.get("hash").lower())) if node.tag == tag_publish: uri = node.get("uri") cls = uri_to_class(uri) if cls is None: raise RRDP_ParseFailure("Unexpected URI %s" % uri) obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) obj.snapshot.add(snapshot) node.clear() while node.getprevious() is not None: del root[0] #yield tornado.gen.moment xml_file.close() logger.debug("RRDP %s done processing deltas", self.uri) except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError): pass # Already logged except RRDP_ParseFailure as e: logger.info("RRDP parse failure: %s", e) except: logger.exception("Couldn't load %s", self.uri) finally: pending = self.pending self.pending = None pending.notify_all() class CheckTALTask(object): def __init__(self, uris, key): rsync_uri = first_rsync_uri(uris) https_uri = first_https_uri(uris) if args.prefer_rsync: self.uri = rsync_uri or https_uri else: self.uri = https_uri or rsync_uri self.key = key def __repr__(self): return "".format(self.uri) @tornado.gen.coroutine def __call__(self): yield Fetcher(self.uri, ta = True).fetch() for cer in fetch_objects(uri = self.uri): if self.check(cer): yield task_queue.put(WalkTask(cer = cer)) break else: Status.add(self.uri, codes.TRUST_ANCHOR_SKIPPED) def check(self, cer): if self.key.derWritePublic() != cer.getPublicKey().derWritePublic(): Status.add(self.uri, codes.TRUST_ANCHOR_KEY_MISMATCH) ok = False else: ok = cer.check(trusted = None, crl = None) if ok: install_object(cer) Status.add(self.uri, codes.OBJECT_ACCEPTED) else: Status.add(self.uri, codes.OBJECT_REJECTED) return ok @tornado.gen.coroutine def worker(meself): # # NB: This particular style of control loop REQUIRES an except # clause, even if that except clause is just a pass statement. # while True: task = yield task_queue.get() name = repr(task) try: logger.debug("Worker %s starting %s, queue length %s", meself, name, task_queue.qsize()) yield task() except: logger.exception("Worker %s caught unhandled exception from %s", meself, name) finally: task_queue.task_done() logger.debug("Worker %s finished %s, queue length %s", meself, name, task_queue.qsize()) def final_report(): # Clean up a bit to avoid confusing the user unnecessarily. for s in Status.db.itervalues(): if codes.OBJECT_ACCEPTED in s.status: s.status.discard(codes.OBJECT_REJECTED) doc = Element("rcynic-summary", date = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) doc.set("reporting-hostname", socket.getfqdn()) doc.set("rcynic-version", "rcynicng") doc.set("summary-version", "1") labels = SubElement(doc, "labels") for code in codes.all(): SubElement(labels, code.name, kind = code.kind).text = code.text for uri in Status.db: for sym in sorted(Status.db[uri].status): SubElement(doc, "validation_status", timestamp = str(Status.db[uri].timestamp), status = str(sym), generation = "None" # Historical relic, remove eventually ).text = uri # # Should generate elements here too, later # ElementTree(doc).write(file = argparse.FileType("w")(args.xml_file), pretty_print = True) def final_cleanup(): from django.db import transaction, models def report(when): logger.debug("Database %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval", when, Authenticated.objects.all().count(), RRDPSnapshot.objects.all().count(), RPKIObject.objects.all().count(), Retrieval.objects.all().count()) report("before") with transaction.atomic(): #logger.debug("Flushing incomplete RRDP snapshots") q = RRDPSnapshot.objects q = q.filter(retrieved__isnull = True) q.delete() #logger.debug("Flushing old authenticated sets") q = Authenticated.objects q = q.exclude(id = authenticated.id) q.delete() #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set") q = RPKIObject.objects q = q.filter(authenticated = authenticated.id) q = q.exclude(snapshot = None) q = q.order_by("snapshot__id") q = q.values_list("snapshot__id", flat = True) q = q.distinct() q = RRDPSnapshot.objects.exclude(id__in = q) q.delete() #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") q = RPKIObject.objects q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id) q = q.filter(snapshot = None) q.delete() #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot") q = RPKIObject.objects q = q.order_by("retrieved__id") q = q.values_list("retrieved__id", flat = True) q = q.distinct() q = Retrieval.objects.exclude(id__in = q) q = q.filter(rrdpsnapshot = None) q.delete() report("after") @tornado.gen.coroutine def launcher(): for i in xrange(args.workers): tornado.ioloop.IOLoop.current().spawn_callback(worker, i) yield [task_queue.put(CheckTALTask(uris, key)) for uris, key in read_tals()] yield task_queue.join() class posint(int): def __init__(self, value): if self <= 0: raise ValueError def main(): global rpki os.environ.update(TZ = "UTC", DJANGO_SETTINGS_MODULE = "rpki.django_settings.rcynic") time.tzset() cfg = rpki.config.argparser(section = "rcynic", doc = __doc__, cfg_optional = True) cfg.add_logging_arguments() cfg.add_argument("-u", "--unauthenticated", help = "where to store unauthenticated data retrieved via rsycnc", default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "unauthenticated")) cfg.add_argument("-x", "--xml-file", help = "where to write XML log of validation results", default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "rcynic.xml")) cfg.add_argument("-t", "--trust-anchor-locators", "--tals", help = "where to find trust anchor locators", default = os.path.join(rpki.autoconf.sysconfdir, "rpki", "trust-anchors")) cfg.add_argument("-w", "--workers", type = posint, help = "number of worker pseudo-threads to allow", default = 10) cfg.add_argument("--fetch-ahead-goal", type = posint, help = "how many deltas we want in the fetch-ahead pipe", default = 2) cfg.add_argument("--https-timeout", type = posint, help = "HTTPS connection timeout, in seconds", default = 300) cfg.add_argument("--max-https-body-size", type = posint, help = "upper limit on byte length of HTTPS message body", default = 512 * 1024 * 1024) cfg.add_boolean_argument("--fetch", default = True, help = "whether to fetch data at all") cfg.add_boolean_argument("--spawn-on-fetch", default = True, help = "whether to spawn new pseudo-threads on fetch") cfg.add_boolean_argument("--migrate", default = True, help = "whether to migrate the ORM database on startup") cfg.add_boolean_argument("--prefer-rsync", default = False, help = "whether to prefer rsync over RRDP") cfg.add_boolean_argument("--validate-https", default = False, help = "whether to validate HTTPS server certificates") global args args = cfg.argparser.parse_args() cfg.configure_logging(args = args, ident = "rcynic") import django django.setup() if args.migrate: # Not sure we should be doing this on every run, but sure simplifies things. import django.core.management django.core.management.call_command("migrate", verbosity = 0, interactive = False) import rpki.rcynicdb global Retrieval global Authenticated global RRDPSnapshot global RPKIObject Retrieval = rpki.rcynicdb.models.Retrieval Authenticated = rpki.rcynicdb.models.Authenticated RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot RPKIObject = rpki.rcynicdb.models.RPKIObject global authenticated authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now()) global task_queue task_queue = tornado.queues.Queue() tornado.ioloop.IOLoop.current().run_sync(launcher) authenticated.finished = rpki.sundial.datetime.now() authenticated.save() final_report() final_cleanup() if __name__ == "__main__": main()