diff options
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-x | rp/rcynic/rcynicng | 1478 |
1 files changed, 1478 insertions, 0 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng new file mode 100755 index 00000000..eccd247f --- /dev/null +++ b/rp/rcynic/rcynicng @@ -0,0 +1,1478 @@ +#!/usr/bin/env python + +# $Id$ + +""" +Reimplementation of rcynic in Python. Work in progress. +""" + +import os +import sys +import ssl +import time +import copy +import errno +import shutil +import socket +import logging +import argparse +import tempfile +import urlparse +import subprocess + +import tornado.gen +import tornado.locks +import tornado.ioloop +import tornado.queues +import tornado.process +import tornado.httpclient + +import rpki.POW +import rpki.log +import rpki.config +import rpki.sundial +import rpki.relaxng +import rpki.autoconf + +from rpki.oids import id_kp_bgpsec_router + +from lxml.etree import (ElementTree, Element, SubElement, Comment, + XML, DocumentInvalid, XMLSyntaxError, iterparse) + +logger = logging.getLogger("rcynicng") + +xmlns = rpki.relaxng.rrdp.xmlns + +tag_delta = xmlns + "delta" +tag_notification = xmlns + "notification" +tag_publish = xmlns + "publish" +tag_snapshot = xmlns + "snapshot" +tag_withdraw = xmlns + "withdraw" + +codes = rpki.POW.validation_status + + +class Status(object): + """ + Validation status database, like validation_status_t in rcynic:tos. + + rcynic:tos version of this data structure is stored as an AVL + tree, because the OpenSSL STACK_OF() sort-and-bsearch turned out + to be a very poor choice for the input data. Remains to be seen + whether we need to do something like that here too. + """ + + db = dict() + + def __init__(self, uri): + self.uri = uri + self._timestamp = None + self.status = set() + + def __str__(self): + return "{my.timestamp} {my.uri} {status}".format( + my = self, status = ",".join(str(s) for s in sorted(self.status))) + + @property + def timestamp(self): + return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self._timestamp)) + + @classmethod + def get(cls, uri): + try: + return cls.db[uri].status + except KeyError: + return None + + @classmethod + def update(cls, uri): + try: + self = cls.db[uri] + except KeyError: + self = cls.db[uri] = cls(uri) + self._timestamp = time.time() + return self.status + + @classmethod + def add(cls, uri, *codes): + status = cls.update(uri) + for code in codes: + status.add(code) + + @classmethod + def remove(cls, uri, *codes): + if uri in cls.db: + for code in codes: + cls.db[uri].status.discard(code) + + @classmethod + def test(cls, uri, code): + return uri in cls.db and code in cls.db[uri].status + + +def install_object(obj): + obj.obj.authenticated.add(authenticated) + obj.obj.save() + + +class X509StoreCTX(rpki.POW.X509StoreCTX): + + @classmethod + def subclass(cls, **kwargs): + return type(cls.__name__, (cls,), kwargs) + + status = None + + def verify_callback(self, ok): + err = self.getError() + if err in (codes.X509_V_OK.code, codes.X509_V_ERR_SUBJECT_ISSUER_MISMATCH.code): + return ok + elif err == codes.X509_V_ERR_CRL_HAS_EXPIRED.code: + return True + elif err == codes.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT.code: + self.status.add(codes.TRUST_ANCHOR_NOT_SELF_SIGNED) + return ok + else: + self.status.add(codes.find(err)) + return ok + + +class POW_Mixin(object): + + @classmethod + def store_if_new(cls, der, uri, retrieval): + self = cls.derRead(der) + ski, aki = self.get_hex_SKI_AKI() + return RPKIObject.objects.get_or_create( + der = der, + defaults = dict(uri = uri, + aki = aki, + ski = ski, + sha256 = sha256hex(der), + retrieved = retrieval)) + + def get_hex_SKI_AKI(self): + cer = self.certs()[0] + ski = cer.getSKI() + aki = cer.getAKI() + return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" + + @property + def uri(self): + return self.obj.uri + + @property + def aki(self): + return self.obj.aki + + @property + def ski(self): + return self.obj.ski + + +class X509(rpki.POW.X509, POW_Mixin): + + def __repr__(self): + try: + return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<X509 at 0x{:x}>".format(id(self)) + + def get_hex_SKI_AKI(self): + ski = self.getSKI() + aki = self.getAKI() + return ski.encode("hex") if ski else "", aki.encode("hex") if aki else "" + + @classmethod + def load(cls, obj, cms = None): + if cms is not None: + # XXX Kludge to work around lack of subclass support in rpki.POW.CMS.certs(). + der = cms.certs()[0].derWrite() + else: + der = obj.der + self = cls.derRead(der) + self.obj = obj + self.bc = self.getBasicConstraints() + self.eku = self.getEKU() + self.aia = self.getAIA() + self.sia = self.getSIA() + self.crldp = self.getCRLDP() + self.is_ca = self.bc is not None and self.bc[0] + self.caDirectory, self.rpkiManifest, self.signedObjectRepository, self.rpkiNotify \ + = self.sia or (None, None, None, None) + return self + + @staticmethod + def count_uris(uris, scheme = "rsync://"): + count = 0 + if uris is not None: + for uri in uris: + if uri.startswith(scheme): + count += 1 + return count + + def check(self, trusted, crl): + #logger.debug("Starting checks for %r", self) + status = Status.update(self.uri) + is_ta = trusted is None + is_routercert = (self.eku is not None and id_kp_bgpsec_router in self.eku and + not self.is_ca and self.uri.endswith(".cer")) + if self.eku is not None and (self.is_ca or not self.uri.endswith(".cer")): + status.add(codes.INAPPROPRIATE_EKU_EXTENSION) + if is_ta and not self.is_ca: + status.add(codes.MALFORMED_TRUST_ANCHOR) + if is_ta and self.aia is not None: + status.add(codes.AIA_EXTENSION_FORBIDDEN) + if not is_ta and self.aia is None: + status.add(codes.AIA_EXTENSION_MISSING) + if is_routercert and self.sia is not None: + status.add(codes.SIA_EXTENSION_FORBIDDEN) + if not is_routercert and self.sia is None: + status.add(codes.SIA_EXTENSION_MISSING) + if is_ta and self.crldp is not None: + status.add(codes.CRLDP_EXTENSION_FORBIDDEN) + if not is_ta and self.crldp is None: + status.add(codes.CRLDP_EXTENSION_MISSING) + if not is_ta and not self.aki: + status.add(codes.AKI_EXTENSION_MISSING) + elif not is_ta and self.aki != trusted[0].ski: + status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH) + serial = self.getSerial() + if serial <= 0 or serial > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF: + status.add(codes.BAD_CERTIFICATE_SERIAL_NUMBER) + if self.getVersion() != 2: + status.add(codes.WRONG_OBJECT_VERSION) + n_rsync_caIssuers = self.count_uris(self.aia) + n_rsync_caDirectory = self.count_uris(self.caDirectory) + n_rsync_rpkiManifest = self.count_uris(self.rpkiManifest) + n_rsync_signedObjectRepository = self.count_uris(self.signedObjectRepository) + if n_rsync_caIssuers > 1 or n_rsync_caDirectory > 1 or n_rsync_rpkiManifest > 1 or n_rsync_signedObjectRepository > 1: + status.add(codes.MULTIPLE_RSYNC_URIS_IN_EXTENSION) + if self.aia is not None and n_rsync_caIssuers == 0: + status.add(codes.MALFORMED_AIA_EXTENSION) + if self.is_ca: + ok = n_rsync_caDirectory != 0 and n_rsync_rpkiManifest != 0 and n_rsync_signedObjectRepository == 0 + elif not is_routercert: + ok = n_rsync_caDirectory == 0 and n_rsync_rpkiManifest == 0 and n_rsync_signedObjectRepository != 0 + else: + ok = self.sia is None + if not ok: + status.add(codes.MALFORMED_SIA_EXTENSION) + if not is_ta and self.count_uris(self.crldp) == 0: + status.add(codes.MALFORMED_CRLDP_EXTENSION) + self.checkRPKIConformance(status = status, eku = id_kp_bgpsec_router if is_routercert else None) + try: + self.verify(trusted = [self] if trusted is None else trusted, crl = crl, policy = "1.3.6.1.5.5.7.14.2", + context_class = X509StoreCTX.subclass(status = status)) + except rpki.POW.ValidationError as e: + logger.debug("%r rejected: %s", self, e) + status.add(codes.OBJECT_REJECTED) + codes.normalize(status) + #logger.debug("Finished checks for %r", self) + return not any(s.kind == "bad" for s in status) + + +class CRL(rpki.POW.CRL, POW_Mixin): + + def __repr__(self): + try: + return "<CRL \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<CRL at 0x{:x}>".format(id(self)) + + def get_hex_SKI_AKI(self): + aki = self.getAKI() + return "", aki.encode("hex") if aki else "" + + @classmethod + def load(cls, obj): + self = cls.derRead(obj.der) + self.obj = obj + self.thisUpdate = self.getThisUpdate() + self.nextUpdate = self.getNextUpdate() + self.number = self.getCRLNumber() + return self + + def check(self, issuer): + status = Status.update(self.uri) + self.checkRPKIConformance(status = status, issuer = issuer) + try: + self.verify(issuer) + except rpki.POW.ValidationError as e: + logger.debug("%r rejected: %s", self, e) + status.add(codes.OBJECT_REJECTED) + codes.normalize(status) + if self.getVersion() != 1: + status.add(codes.WRONG_OBJECT_VERSION) + now = rpki.sundial.now() + if self.thisUpdate > now: + status.add(codes.CRL_NOT_YET_VALID) + if self.nextUpdate < now: + status.add(codes.STALE_CRL_OR_MANIFEST) + if self.number is None: + status.add(codes.CRL_NUMBER_EXTENSION_MISSING) + if self.number < 0: + status.add(codes.CRL_NUMBER_IS_NEGATIVE) + if self.number > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF: + status.add(codes.CRL_NUMBER_OUT_OF_RANGE) + if self.getIssuer() != issuer.getSubject(): + status.add(codes.CRL_ISSUER_NAME_MISMATCH) + if not self.aki: + status.add(codes.AKI_EXTENSION_MISSING) + elif self.aki != issuer.ski: + status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH) + + return not any(s.kind == "bad" for s in status) + + +class Ghostbuster(rpki.POW.CMS, POW_Mixin): + + def __repr__(self): + try: + return "<Ghostbuster \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<Ghostbuster at 0x{:x}>".format(id(self)) + + @classmethod + def load(cls, obj): + self = cls.derRead(obj.der) + self.obj = obj + self.ee = X509.load(obj, self) + self.vcard = None + return self + + def check(self, trusted, crl): + status = Status.update(self.uri) + self.ee.check(trusted = trusted, crl = crl) + try: + self.vcard = self.verify() + except rpki.POW.ValidationError as e: + logger.debug("%r rejected: %s", self, e) + status.add(codes.OBJECT_REJECTED) + self.checkRPKIConformance(status) + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + +class Manifest(rpki.POW.Manifest, POW_Mixin): + + def __repr__(self): + try: + return "<Manifest \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<Manifest at 0x{:x}>".format(id(self)) + + @classmethod + def load(cls, obj): + self = cls.derRead(obj.der) + self.obj = obj + self.ee = X509.load(obj, self) + self.fah = None + self.thisUpdate = None + self.nextUpdate = None + self.number = None + return self + + def check(self, trusted, crl): + status = Status.update(self.uri) + self.ee.check(trusted = trusted, crl = crl) + try: + self.verify() + except rpki.POW.ValidationError as e: + logger.debug("%r rejected: %s", self, e) + status.add(codes.OBJECT_REJECTED) + self.checkRPKIConformance(status) + self.thisUpdate = self.getThisUpdate() + self.nextUpdate = self.getNextUpdate() + self.number = self.getManifestNumber() + self.fah = self.getFiles() + self.notBefore = self.ee.getNotBefore() + self.notAfter = self.ee.getNotAfter() + if self.thisUpdate < self.notBefore or self.nextUpdate > self.notAfter: + status.add(codes.MANIFEST_INTERVAL_OVERRUNS_CERT) + now = rpki.sundial.now() + if self.thisUpdate > now: + status.add(codes.MANIFEST_NOT_YET_VALID) + if self.nextUpdate < now: + status.add(codes.STALE_CRL_OR_MANIFEST) + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + def find_crl_candidate_hashes(self): + for fn, digest in self.fah: + if fn.endswith(".crl"): + yield digest.encode("hex") + + +class ROA(rpki.POW.ROA, POW_Mixin): + + def __repr__(self): + try: + return "<ROA \"{}\" at 0x{:x}>".format(self.uri, id(self)) + except: + return "<ROA at 0x{:x}>".format(id(self)) + + @classmethod + def load(cls, obj): + self = cls.derRead(obj.der) + self.obj = obj + self.ee = X509.load(obj, self) + self.asn = None + self.prefixes = None + return self + + def check(self, trusted, crl): + status = Status.update(self.uri) + self.ee.check(trusted = trusted, crl = crl) + try: + vcard = self.verify() + except rpki.POW.ValidationError: + status.add(codes.OBJECT_REJECTED) + self.checkRPKIConformance(status) + self.asn = self.getASID() + self.prefixes = self.getPrefixes() + codes.normalize(status) + return not any(s.kind == "bad" for s in status) + + +class_dispatch = dict(cer = X509, + crl = CRL, + gbr = Ghostbuster, + mft = Manifest, + roa = ROA) + +def uri_to_class(uri): + cls = class_dispatch.get(uri[-3:]) if len(uri) > 4 and uri[-4] == "." else None + if cls is None: + Status.add(uri, None, codes.UNKNOWN_OBJECT_TYPE_SKIPPED) + return cls + + +# If we find ourselves using this same ordering for every retrieval from the RPKIObjects model, we +# can add it as a Meta option for the model and omit it in the query expressions, like this: +# +# class RPKIObjects(models.Model): +# ... +# class Meta: +# ordering = ["-retrieved__started"] +# +# https://docs.djangoproject.com/en/1.8/ref/models/querysets/#order-by +# https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering + +def fetch_objects(**kwargs): + for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"): + cls = uri_to_class(obj.uri) + if cls is not None: + yield cls.load(obj) + + +class WalkFrame(object): + """ + Certificate tree walk stack frame. This is basically just a + preamble and a loop, broken out into several separate methods so + that we can fork new tasks in the middle then resume processing of + the current state machine (ie, this frame) when appropriate (eg, + after an rsync or RRDP fetch completes). + """ + + def __init__(self, cer): + self.cer = cer + self.state = self.initial + + def __repr__(self): + try: + return "<WalkFrame \"{}\" at 0x{:x}>".format(self.cer.uri, id(self)) + except: + return "<WalkFrame at 0x{:x}>".format(id(self)) + + @tornado.gen.coroutine + def __call__(self, wsk): + yield self.state(wsk) + + @tornado.gen.coroutine + def initial(self, wsk): + + rsync_uri = first_rsync_uri(self.cer.caDirectory) + rrdp_uri = first_https_uri(self.cer.rpkiNotify) + + if args.prefer_rsync: + uri = rsync_uri or rrdp_uri + else: + uri = rrdp_uri or rsync_uri + + self.fetcher = Fetcher(uri) + + if not self.fetcher.needed(): + self.state = self.ready + elif not args.spawn_on_fetch: + self.state = self.fetch + else: + self.state = self.fetch + yield task_queue.put(wsk.clone()) + wsk.pop() + + @tornado.gen.coroutine + def fetch(self, wsk): + yield self.fetcher.fetch() + self.state = self.ready + + @tornado.gen.coroutine + def ready(self, wsk): + self.trusted = wsk.trusted() + + logger.debug("%r scanning products", self) + + # NB: CRL checks on manifest EE certificates deferred until we've picked a CRL. + + mft_candidates = [] + crl_candidates = [] + crl_candidate_hashes = set() + + for mft in fetch_objects(aki = self.cer.ski, uri__endswith = ".mft"): + if mft.check(trusted = self.trusted, crl = None): + mft_candidates.append(mft) + crl_candidate_hashes.update(mft.find_crl_candidate_hashes()) + + if not mft_candidates: + wsk.pop() + return + + for crl in fetch_objects(aki = self.cer.ski, uri__endswith = ".crl", sha256__in = crl_candidate_hashes): + if crl.check(self.trusted[0]): + crl_candidates.append(crl) + + mft_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started)) + crl_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started)) + + if not crl_candidates: + wsk.pop() + return + + self.crl = crl_candidates[0] + + install_object(self.crl) + Status.add(self.crl.uri, codes.OBJECT_ACCEPTED) + + #logger.debug("Picked CRL %s", self.crl.uri) + + for mft in mft_candidates: + if self.crl.isRevoked(mft.ee): + Status.add(mft.obj.uri, codes.MANIFEST_EE_REVOKED) + continue + self.mft = mft + break + else: + wsk.pop() + return + + install_object(self.mft) + Status.add(self.mft.obj.uri, codes.OBJECT_ACCEPTED) + + self.stale_crl = Status.test(self.crl.uri, codes.STALE_CRL_OR_MANIFEST) + self.stale_mft = Status.test(self.mft.uri, codes.STALE_CRL_OR_MANIFEST) + + # Issue warnings on mft and crl URI mismatches? + + # Use an explicit iterator so we can resume it; run loop in separate method, same reason. + + self.mft_iterator = iter(self.mft.getFiles()) + self.state = self.loop + + @tornado.gen.coroutine + def loop(self, wsk): + + #logger.debug("Processing %s", self.mft.uri) + + for fn, digest in self.mft_iterator: + + yield tornado.gen.moment + + uri = self.mft.uri[:self.mft.uri.rindex("/") + 1] + fn + + # Need general URI validator here? + + if uri == self.crl.uri: + continue + + cls = uri_to_class(uri) + + if cls is None: + continue + + if cls in (Manifest, CRL): + Status.add(uri, None, codes.INAPPROPRIATE_OBJECT_TYPE_SKIPPED) + continue + + for obj in fetch_objects(sha256 = digest.encode("hex")): + + if self.stale_crl: + Status.add(uri, codes.TAINTED_BY_STALE_CRL) + if self.stale_mft: + Status.add(uri, codes.TAINTED_BY_STALE_MANIFEST) + + if not obj.check(trusted = self.trusted, crl = self.crl): + Status.add(uri, codes.OBJECT_REJECTED) + continue + + install_object(obj) + Status.add(uri, codes.OBJECT_ACCEPTED) + + if cls is not X509 or not obj.is_ca: + break + + wsk.push(obj) + return + + wsk.pop() + + +class WalkTask(object): + """ + Task corresponding to one walk stack, roughly analgous to + STACK_OF(walk_ctx_t) in rcynic:tos. + """ + + def __init__(self, wsk = None, cer = None): + self.wsk = [] if wsk is None else wsk + if cer is not None: + self.push(cer) + + def __repr__(self): + try: + return "<WalkTask \"{}\" at 0x{:x}>".format(self.wsk[-1].cer.uri, id(self)) + except: + return "<WalkTask at 0x{:x}>".format(id(self)) + + @tornado.gen.coroutine + def __call__(self): + while self.wsk: + yield self.wsk[-1](wsk = self) + + def push(self, cer): + self.wsk.append(WalkFrame(cer)) + + def pop(self): + return self.wsk.pop() + + def clone(self): + return WalkTask(wsk = list(self.wsk)) + + def trusted(self): + stack = [w.cer for w in self.wsk] + stack.reverse() + return stack + + +def read_tals(): + for head, dirs, files in os.walk(args.trust_anchor_locators): + for fn in files: + if fn.endswith(".tal"): + furi = "file://" + os.path.abspath(os.path.join(head, fn)) + try: + with open(os.path.join(head, fn), "r") as f: + lines = [line.strip() for line in f] + blank = lines.index("") + uris = lines[:blank] + key = rpki.POW.Asymmetric.derReadPublic("".join(lines[blank:]).decode("base64")) + if not uris or not all(uri.endswith(".cer") for uri in uris): + Status.add(furi, None, codes.MALFORMED_TAL_URI) + yield uris, key + except: + Status.add(furi, None, codes.UNREADABLE_TRUST_ANCHOR_LOCATOR) + + +def uri_to_filename(uri, base = None): + fn = uri[uri.index("://")+3:] + if base is not None: + fn = os.path.join(base, fn) + return fn + +def first_uri(uris, scheme): + if uris is not None: + for uri in uris: + if uri.startswith(scheme): + return uri + return None + +def first_rsync_uri(uris): + return first_uri(uris, "rsync://") + +def first_https_uri(uris): + return first_uri(uris, "https://") + +def sha256hex(bytes): + d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + d.update(bytes) + return d.digest().encode("hex") + + +class RRDP_ParseFailure(Exception): + "Failure parsing RRDP message." + +class DeadHost(Exception): + "Host recently tried and known to be unavailable." + + +class Fetcher(object): + """ + Network transfer methods and history database. + + At the moment this is rsync-only; eventually it will include + support for HTTPS and RRDP. + """ + + # Internal protocol: + # + # - Instances which have just gotten to the query stage are not registered + # + # - Instances which are in progress are listed in .history and + # have a Condition object in .pending; instances which depend on + # this should wait for the condition, then return. + # + # - Instances which have completed are listed in .history and have + # .pending set to None. + + _rsync_deadhosts = set() + _rsync_history = dict() + + _https_deadhosts = set() + _https_history = dict() + + def __init__(self, uri, ta = False): + self.uri = uri + self.ta = ta + self.pending = None + self.status = None + + def _rsync_split_uri(self): + return tuple(self.uri.rstrip("/").split("/")[2:]) + + def _rsync_find(self, path): + for i in xrange(1, len(path)): + target = path[:i+1] + try: + return self._rsync_history[target] + except KeyError: + continue + return None + + def needed(self): + if not args.fetch: + return False + if self.uri.startswith("rsync://"): + return self._rsync_needed() + if self.uri.startswith("https://"): + return self._https_needed() + raise ValueError + + def _rsync_needed(self): + path = self._rsync_split_uri() + if path[0] in self._rsync_deadhosts: + return False + entry = self._rsync_find(path) + return entry is None or entry.pending is not None + + def _https_needed(self): + netloc = urlparse.urlparse(self.uri).netloc + if netloc in self._https_deadhosts: + return False + entry = self._https_history.get(self.uri) + return entry is None or entry.pending is not None + + def fetch(self): + if self.uri.startswith("rsync://"): + return self._rsync_fetch() + if self.uri.startswith("https://"): + return self._https_fetch_ta() if self.ta else self._rrdp_fetch() + raise ValueError + + @tornado.gen.coroutine + def _rsync_fetch(self): + assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") if self.ta else self.uri.endswith("/")) + + if not args.fetch: + return + path = self._rsync_split_uri() + dead = path[0] in self._rsync_deadhosts + other = self._rsync_find(path) + if not dead and other is not None and other.pending is not None: + yield other.pending.wait() + if dead or other is not None: + return + + self.pending = tornado.locks.Condition() + self._rsync_history[path] = self + + try: + path = uri_to_filename(self.uri, args.unauthenticated) + cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes"] + if self.uri.endswith("/"): + cmd.append("--recursive") + cmd.append("--delete") + cmd.append(self.uri) + cmd.append(path) + + dn = os.path.dirname(path) + if not os.path.exists(dn): + os.makedirs(dn) + + # We use the stdout close from rsync to detect when the subprocess has finished. + # There's a lovely tornado.process.Subprocess.wait_for_exit() method which does + # exactly what one would think we'd want -- but Unix signal handling still hasn't + # caught up to the software interrupt architecture ITS had forty years ago, so + # signals still cause random "system call interrupted" failures in other libraries. + # Nothing Tornado can do about this, so we avoid signals entirely and collect the + # process exit status directly from the operating system. In theory, the WNOHANG + # isn't necessary here, we use it anyway to be safe in case theory is wrong. + + # If we need to add a timeout here to guard against rsync processes taking too long + # (which has happened in the past with, eg, LACNIC), see tornado.gen.with_timeout() + # (documented in the utility functions section of the tornado.gen page), which wraps + # any future in a timeout. + + t0 = time.time() + rsync = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT) + logger.debug("rsync[%s] started \"%s\"", rsync.pid, " ".join(cmd)) + output = yield rsync.stdout.read_until_close() + pid, self.status = os.waitpid(rsync.pid, os.WNOHANG) + t1 = time.time() + if (pid, self.status) == (0, 0): + logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", rsync.pid) + for line in output.splitlines(): + logger.debug("rsync[%s] %s", rsync.pid, line) + logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, t1 - t0, self.status) + + # Should do something with rsync result and validation status database here. + + retrieval = Retrieval.objects.create( + uri = self.uri, + started = rpki.sundial.datetime.fromtimestamp(t0), + finished = rpki.sundial.datetime.fromtimestamp(t1), + successful = self.status == 0) + + for fn in self._rsync_walk(path): + yield tornado.gen.moment + uri = "rsync://" + fn[len(args.unauthenticated):].lstrip("/") + cls = uri_to_class(uri) + if cls is not None: + try: + with open(fn, "rb") as f: + cls.store_if_new(f.read(), uri, retrieval) + except: + Status.add(uri, codes.UNREADABLE_OBJECT) + logger.exception("Couldn't read %s from rsync tree", uri) + + finally: + pending = self.pending + self.pending = None + pending.notify_all() + + def _rsync_walk(self, path): + if self.uri.endswith("/"): + for head, dirs, files in os.walk(path): + for fn in files: + yield os.path.join(head, fn) + elif os.path.exists(path): + yield path + + @tornado.gen.coroutine + def _https_fetch_url(self, url, streaming_callback = None): + + if urlparse.urlparse(url).netloc in self._https_deadhosts: + raise DeadHost + + # Should do something with deadhost processing below. Looks + # like errors such as HTTP timeout show up as + # tornado.httpclient.HTTPError exceptions (which we could + # suppress if we wanted to do so, but we probably don't). + # HTTP timeout shows up in the logs as "HTTP 599". See doc for: + # + # tornado.httpclient.AsyncHTTPClient.fetch() + # tornado.httpclient.HTTPError + + # Might need to do something with If-Modified-Since support + # See if_modified_since argument to + # http://www.tornadoweb.org/en/stable/httpclient.html#request-objects + # (which we can pass to client.fetch(), below). Not sure how + # "you don't need to retrieve this" result comes back, + # probably a benign exception we need to catch. Supporting + # this means adding another null-able timestamp field to the + # RRDPSnapshot model (which probably should be named the + # RRDPZone model instead), and storing a datetime there. + # Would also need to pull timestamp from the Last-Modified + # header in the response object. + + try: + ok = False + t0 = time.time() + client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size) + response = yield client.fetch(url, + streaming_callback = streaming_callback, + validate_cert = args.validate_https, + connect_timeout = args.https_timeout, + request_timeout = args.https_timeout) + # Might want to check response Content-Type here + ok = True + + except tornado.httpclient.HTTPError as e: + # Might want to check e.response here to figure out whether to add to _https_deadhosts. + logger.info("HTTP error for %s: %s", url, e) + raise + + except (socket.error, IOError, ssl.SSLError) as e: + # Might want to check e.errno here to figure out whether to add to _https_deadhosts. + logger.info("Network I/O error for %s: %s", url, e) + raise + + except Exception as e: + logger.exception("Error (%r) for %s", type(e), url) + raise + + finally: + t1 = time.time() + logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0) + retrieval = Retrieval.objects.create( + uri = url, + started = rpki.sundial.datetime.fromtimestamp(t0), + finished = rpki.sundial.datetime.fromtimestamp(t1), + successful = ok) + if ok: + raise tornado.gen.Return((retrieval, response)) + + @tornado.gen.coroutine + def _https_fetch_ta(self): + + if not args.fetch: + return + + other = self._https_history.get(self.uri) + if other is not None and other.pending is not None: + yield other.pending.wait() + return + + self.pending = tornado.locks.Condition() + self._rsync_history[self.uri] = self + + try: + retrieval, response = yield self._https_fetch_url(self.uri) + X509.store_if_new(response.body, self.uri, retrieval) + except: + logger.exception("Couldn't load %s", self.uri) + + finally: + pending = self.pending + self.pending = None + pending.notify_all() + + @tornado.gen.coroutine + def _rrdp_fetch_notification(self, url): + + retrieval, response = yield self._https_fetch_url(url) + + notification = ElementTree(file = response.buffer).getroot() + + rpki.relaxng.rrdp.schema.assertValid(notification) + + if notification.tag != tag_notification: + raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag)) + + raise tornado.gen.Return((retrieval, notification)) + + @tornado.gen.coroutine + def _rrdp_fetch_data_file(self, url, expected_hash): + + sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST) + xml_file = tempfile.SpooledTemporaryFile() + + retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data))) + + received_hash = sha256.digest().encode("hex") + xml_file.seek(0) + + if received_hash != expected_hash.lower(): + raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash)) + + raise tornado.gen.Return((retrieval, response, xml_file)) + + @tornado.gen.coroutine + def _rrdp_bulk_create(self, new_objs, existing_objs): + from django.db import IntegrityError + + #logger.debug("Bulk creation of new RPKIObjects") + + try: + RPKIObject.objects.bulk_create(new_objs) + + except IntegrityError: + #logger.debug("Some objects already existed, weeding and retrying") + i = 0 + while i < len(new_objs): + yield tornado.gen.moment + try: + existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der)) + logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s", + new_objs[i].uri, new_objs[i].sha256) + except RPKIObject.DoesNotExist: + i += 1 + else: + del new_objs[i] + RPKIObject.objects.bulk_create(new_objs) + + del new_objs[:] + + @tornado.gen.coroutine + def _rrdp_fetch(self): + from django.db import transaction + + if not args.fetch: + return + + other = self._https_history.get(self.uri) + if other is not None and other.pending is not None: + yield other.pending.wait() + return + + self.pending = tornado.locks.Condition() + self._https_history[self.uri] = self + + try: + retrieval, notification = yield self._rrdp_fetch_notification(url = self.uri) + + session_id = notification.get("session_id") + serial = long(notification.get("serial")) + + snapshot = RRDPSnapshot.objects.filter( + session_id = session_id).order_by("-retrieved__started").first() + + logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r", + self.uri, session_id, serial, snapshot) + + if snapshot is not None and snapshot.serial == serial: + logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri) + return + + deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) + for delta in notification.iterchildren(tag_delta)) + + if snapshot is None or snapshot.serial + 1 not in deltas: + + existing_rpkiobject_map = dict() + + if snapshot is not None: + logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) + existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk")) + + x = notification.find(tag_snapshot) + + url, hash = x.get("uri"), x.get("hash") + + logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial) + + retrieval, response, xml_file = yield self._rrdp_fetch_data_file(url, hash) + + snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial) + + # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in + # a single bulk addition, which is faster ... unless one or more of them isn't really new, in + # which case we have to check everything in that batch when we get the IntegrityError, so + # the smaller the batch, the faster that check. No single good answer. + + root = None + existing_rpkiobjects = [] + new_rpkiobjects = [] + chunk = 2000 + + for event, node in iterparse(xml_file): + if node is root: + continue + + if root is None: + root = node.getparent() + if root is None or root.tag != tag_snapshot \ + or root.get("version") != "1" \ + or any(a not in ("version", "session_id", "serial") for a in root.attrib): + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != long(serial): + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + serial, url, root.get("serial"))) + + if node.tag != tag_publish or node.getparent() is not root \ + or any(a != "uri" for a in node.attrib): + raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url)) + + uri = node.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI {}".format(uri)) + + der = node.text.decode("base64") + sha256 = sha256hex(der) + try: + existing_rpkiobjects.append(existing_rpkiobject_map[sha256]) + except KeyError: + ski, aki = cls.derRead(der).get_hex_SKI_AKI() + new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki, + retrieved = retrieval, sha256 = sha256)) + + node.clear() + while node.getprevious() is not None: + del root[0] + + if len(new_rpkiobjects) > chunk: + yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) + + yield tornado.gen.moment + + if len(new_rpkiobjects) > 0: + yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects) + + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in retrieval.rpkiobject_set.values_list("pk", flat = True)]) + + RPKIObject.snapshot.through.objects.bulk_create([ + RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i) + for i in existing_rpkiobjects]) + + snapshot.retrieved = retrieval + snapshot.save() + + xml_file.close() + + else: + logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, + (serial - snapshot.serial), snapshot.serial, serial) + + deltas = [(serial, deltas[serial][0], deltas[serial][1]) + for serial in xrange(snapshot.serial + 1, serial + 1)] + futures = [] + + while deltas or futures: + + while deltas and len(futures) < args.fetch_ahead_goal: + serial, url, hash = deltas.pop(0) + logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url) + futures.append(self._rrdp_fetch_data_file(url, hash)) + + retrieval, response, xml_file = yield futures.pop(0) + + root = None + + with transaction.atomic(): + snapshot.serial += 1 + snapshot.save() + logger.debug("RRDP %s serial %s loading", self.uri, snapshot.serial) + + for event, node in iterparse(xml_file): + if node is root: + continue + + if root is None: + root = node.getparent() + if root is None or root.tag != tag_delta \ + or root.get("version") != "1" \ + or any(a not in ("version", "session_id", "serial") for a in root.attrib): + raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) + if root.get("session_id") != session_id: + raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format( + session_id, url, root.get("session_id"))) + if long(root.get("serial")) != snapshot.serial: + raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format( + snapshot.serial, url, root.get("serial"))) + + hash = node.get("hash") + + if node.getparent() is not root or node.tag not in (tag_publish, tag_withdraw) \ + or (node.tag == tag_withdraw and hash is None) \ + or any(a not in ("uri", "hash") for a in node.attrib): + raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url)) + + if node.tag == tag_withdraw or node.get("hash") is not None: + snapshot.rpkiobject_set.remove(snapshot.rpkiobject_set.get(sha256 = node.get("hash").lower())) + + if node.tag == tag_publish: + uri = node.get("uri") + cls = uri_to_class(uri) + if cls is None: + raise RRDP_ParseFailure("Unexpected URI %s" % uri) + obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval) + obj.snapshot.add(snapshot) + + node.clear() + while node.getprevious() is not None: + del root[0] + + #yield tornado.gen.moment + + xml_file.close() + + logger.debug("RRDP %s done processing deltas", self.uri) + + except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError): + pass # Already logged + + except RRDP_ParseFailure as e: + logger.info("RRDP parse failure: %s", e) + + except: + logger.exception("Couldn't load %s", self.uri) + + finally: + pending = self.pending + self.pending = None + pending.notify_all() + + +class CheckTALTask(object): + + def __init__(self, uris, key): + rsync_uri = first_rsync_uri(uris) + https_uri = first_https_uri(uris) + + if args.prefer_rsync: + self.uri = rsync_uri or https_uri + else: + self.uri = https_uri or rsync_uri + + self.key = key + + def __repr__(self): + return "<CheckTALTask: \"{}\">".format(self.uri) + + @tornado.gen.coroutine + def __call__(self): + yield Fetcher(self.uri, ta = True).fetch() + for cer in fetch_objects(uri = self.uri): + if self.check(cer): + yield task_queue.put(WalkTask(cer = cer)) + break + else: + Status.add(self.uri, codes.TRUST_ANCHOR_SKIPPED) + + def check(self, cer): + if self.key.derWritePublic() != cer.getPublicKey().derWritePublic(): + Status.add(self.uri, codes.TRUST_ANCHOR_KEY_MISMATCH) + ok = False + else: + ok = cer.check(trusted = None, crl = None) + if ok: + install_object(cer) + Status.add(self.uri, codes.OBJECT_ACCEPTED) + else: + Status.add(self.uri, codes.OBJECT_REJECTED) + return ok + + +@tornado.gen.coroutine +def worker(meself): + # + # NB: This particular style of control loop REQUIRES an except + # clause, even if that except clause is just a pass statement. + # + while True: + task = yield task_queue.get() + name = repr(task) + try: + logger.debug("Worker %s starting %s, queue length %s", meself, name, task_queue.qsize()) + yield task() + except: + logger.exception("Worker %s caught unhandled exception from %s", meself, name) + finally: + task_queue.task_done() + logger.debug("Worker %s finished %s, queue length %s", meself, name, task_queue.qsize()) + + +def final_report(): + # Clean up a bit to avoid confusing the user unnecessarily. + for s in Status.db.itervalues(): + if codes.OBJECT_ACCEPTED in s.status: + s.status.discard(codes.OBJECT_REJECTED) + doc = Element("rcynic-summary", date = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) + doc.set("reporting-hostname", socket.getfqdn()) + doc.set("rcynic-version", "rcynicng") + doc.set("summary-version", "1") + labels = SubElement(doc, "labels") + for code in codes.all(): + SubElement(labels, code.name, kind = code.kind).text = code.text + for uri in Status.db: + for sym in sorted(Status.db[uri].status): + SubElement(doc, "validation_status", + timestamp = str(Status.db[uri].timestamp), + status = str(sym), + generation = "None" # Historical relic, remove eventually + ).text = uri + # + # Should generate <rsync_history/> elements here too, later + # + ElementTree(doc).write(file = argparse.FileType("w")(args.xml_file), + pretty_print = True) + + +def final_cleanup(): + from django.db import transaction, models + + def report(when): + logger.debug("Database %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval", when, + Authenticated.objects.all().count(), RRDPSnapshot.objects.all().count(), + RPKIObject.objects.all().count(), Retrieval.objects.all().count()) + + report("before") + + with transaction.atomic(): + + #logger.debug("Flushing incomplete RRDP snapshots") + + q = RRDPSnapshot.objects + q = q.filter(retrieved__isnull = True) + q.delete() + + #logger.debug("Flushing old authenticated sets") + + q = Authenticated.objects + q = q.exclude(id = authenticated.id) + q.delete() + + #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set") + + q = RPKIObject.objects + q = q.filter(authenticated = authenticated.id) + q = q.exclude(snapshot = None) + q = q.order_by("snapshot__id") + q = q.values_list("snapshot__id", flat = True) + q = q.distinct() + q = RRDPSnapshot.objects.exclude(id__in = q) + q.delete() + + #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot") + + q = RPKIObject.objects + q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id) + q = q.filter(snapshot = None) + q.delete() + + #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot") + + q = RPKIObject.objects + q = q.order_by("retrieved__id") + q = q.values_list("retrieved__id", flat = True) + q = q.distinct() + q = Retrieval.objects.exclude(id__in = q) + q = q.filter(rrdpsnapshot = None) + q.delete() + + report("after") + + +@tornado.gen.coroutine +def launcher(): + for i in xrange(args.workers): + tornado.ioloop.IOLoop.current().spawn_callback(worker, i) + + yield [task_queue.put(CheckTALTask(uris, key)) for uris, key in read_tals()] + yield task_queue.join() + + +class posint(int): + def __init__(self, value): + if self <= 0: + raise ValueError + + +def main(): + global rpki + + os.environ.update(TZ = "UTC", + DJANGO_SETTINGS_MODULE = "rpki.django_settings.rcynic") + time.tzset() + + cfg = rpki.config.argparser(section = "rcynic", doc = __doc__, cfg_optional = True) + + cfg.add_logging_arguments() + + cfg.add_argument("-u", "--unauthenticated", + help = "where to store unauthenticated data retrieved via rsycnc", + default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "unauthenticated")) + + cfg.add_argument("-x", "--xml-file", + help = "where to write XML log of validation results", + default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "rcynic.xml")) + + cfg.add_argument("-t", "--trust-anchor-locators", "--tals", + help = "where to find trust anchor locators", + default = os.path.join(rpki.autoconf.sysconfdir, "rpki", "trust-anchors")) + + cfg.add_argument("-w", "--workers", type = posint, + help = "number of worker pseudo-threads to allow", + default = 10) + + cfg.add_argument("--fetch-ahead-goal", type = posint, + help = "how many deltas we want in the fetch-ahead pipe", + default = 2) + + cfg.add_argument("--https-timeout", type = posint, + help = "HTTPS connection timeout, in seconds", + default = 300) + + cfg.add_argument("--max-https-body-size", type = posint, + help = "upper limit on byte length of HTTPS message body", + default = 512 * 1024 * 1024) + + cfg.add_boolean_argument("--fetch", default = True, + help = "whether to fetch data at all") + + cfg.add_boolean_argument("--spawn-on-fetch", default = True, + help = "whether to spawn new pseudo-threads on fetch") + + cfg.add_boolean_argument("--migrate", default = True, + help = "whether to migrate the ORM database on startup") + + cfg.add_boolean_argument("--prefer-rsync", default = False, + help = "whether to prefer rsync over RRDP") + + cfg.add_boolean_argument("--validate-https", default = False, + help = "whether to validate HTTPS server certificates") + + global args + args = cfg.argparser.parse_args() + + cfg.configure_logging(args = args, ident = "rcynic") + + import django + django.setup() + + if args.migrate: + # Not sure we should be doing this on every run, but sure simplifies things. + import django.core.management + django.core.management.call_command("migrate", verbosity = 0, interactive = False) + + import rpki.rcynicdb + global Retrieval + global Authenticated + global RRDPSnapshot + global RPKIObject + Retrieval = rpki.rcynicdb.models.Retrieval + Authenticated = rpki.rcynicdb.models.Authenticated + RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot + RPKIObject = rpki.rcynicdb.models.RPKIObject + + + global authenticated + authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now()) + + global task_queue + task_queue = tornado.queues.Queue() + tornado.ioloop.IOLoop.current().run_sync(launcher) + + authenticated.finished = rpki.sundial.datetime.now() + authenticated.save() + + final_report() + + final_cleanup() + + +if __name__ == "__main__": + main() |