aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic/rcynicng
diff options
context:
space:
mode:
Diffstat (limited to 'rp/rcynic/rcynicng')
-rwxr-xr-xrp/rcynic/rcynicng1478
1 files changed, 1478 insertions, 0 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
new file mode 100755
index 00000000..eccd247f
--- /dev/null
+++ b/rp/rcynic/rcynicng
@@ -0,0 +1,1478 @@
+#!/usr/bin/env python
+
+# $Id$
+
+"""
+Reimplementation of rcynic in Python. Work in progress.
+"""
+
+import os
+import sys
+import ssl
+import time
+import copy
+import errno
+import shutil
+import socket
+import logging
+import argparse
+import tempfile
+import urlparse
+import subprocess
+
+import tornado.gen
+import tornado.locks
+import tornado.ioloop
+import tornado.queues
+import tornado.process
+import tornado.httpclient
+
+import rpki.POW
+import rpki.log
+import rpki.config
+import rpki.sundial
+import rpki.relaxng
+import rpki.autoconf
+
+from rpki.oids import id_kp_bgpsec_router
+
+from lxml.etree import (ElementTree, Element, SubElement, Comment,
+ XML, DocumentInvalid, XMLSyntaxError, iterparse)
+
+logger = logging.getLogger("rcynicng")
+
+xmlns = rpki.relaxng.rrdp.xmlns
+
+tag_delta = xmlns + "delta"
+tag_notification = xmlns + "notification"
+tag_publish = xmlns + "publish"
+tag_snapshot = xmlns + "snapshot"
+tag_withdraw = xmlns + "withdraw"
+
+codes = rpki.POW.validation_status
+
+
+class Status(object):
+ """
+ Validation status database, like validation_status_t in rcynic:tos.
+
+ rcynic:tos version of this data structure is stored as an AVL
+ tree, because the OpenSSL STACK_OF() sort-and-bsearch turned out
+ to be a very poor choice for the input data. Remains to be seen
+ whether we need to do something like that here too.
+ """
+
+ db = dict()
+
+ def __init__(self, uri):
+ self.uri = uri
+ self._timestamp = None
+ self.status = set()
+
+ def __str__(self):
+ return "{my.timestamp} {my.uri} {status}".format(
+ my = self, status = ",".join(str(s) for s in sorted(self.status)))
+
+ @property
+ def timestamp(self):
+ return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self._timestamp))
+
+ @classmethod
+ def get(cls, uri):
+ try:
+ return cls.db[uri].status
+ except KeyError:
+ return None
+
+ @classmethod
+ def update(cls, uri):
+ try:
+ self = cls.db[uri]
+ except KeyError:
+ self = cls.db[uri] = cls(uri)
+ self._timestamp = time.time()
+ return self.status
+
+ @classmethod
+ def add(cls, uri, *codes):
+ status = cls.update(uri)
+ for code in codes:
+ status.add(code)
+
+ @classmethod
+ def remove(cls, uri, *codes):
+ if uri in cls.db:
+ for code in codes:
+ cls.db[uri].status.discard(code)
+
+ @classmethod
+ def test(cls, uri, code):
+ return uri in cls.db and code in cls.db[uri].status
+
+
+def install_object(obj):
+ obj.obj.authenticated.add(authenticated)
+ obj.obj.save()
+
+
+class X509StoreCTX(rpki.POW.X509StoreCTX):
+
+ @classmethod
+ def subclass(cls, **kwargs):
+ return type(cls.__name__, (cls,), kwargs)
+
+ status = None
+
+ def verify_callback(self, ok):
+ err = self.getError()
+ if err in (codes.X509_V_OK.code, codes.X509_V_ERR_SUBJECT_ISSUER_MISMATCH.code):
+ return ok
+ elif err == codes.X509_V_ERR_CRL_HAS_EXPIRED.code:
+ return True
+ elif err == codes.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT.code:
+ self.status.add(codes.TRUST_ANCHOR_NOT_SELF_SIGNED)
+ return ok
+ else:
+ self.status.add(codes.find(err))
+ return ok
+
+
+class POW_Mixin(object):
+
+ @classmethod
+ def store_if_new(cls, der, uri, retrieval):
+ self = cls.derRead(der)
+ ski, aki = self.get_hex_SKI_AKI()
+ return RPKIObject.objects.get_or_create(
+ der = der,
+ defaults = dict(uri = uri,
+ aki = aki,
+ ski = ski,
+ sha256 = sha256hex(der),
+ retrieved = retrieval))
+
+ def get_hex_SKI_AKI(self):
+ cer = self.certs()[0]
+ ski = cer.getSKI()
+ aki = cer.getAKI()
+ return ski.encode("hex") if ski else "", aki.encode("hex") if aki else ""
+
+ @property
+ def uri(self):
+ return self.obj.uri
+
+ @property
+ def aki(self):
+ return self.obj.aki
+
+ @property
+ def ski(self):
+ return self.obj.ski
+
+
+class X509(rpki.POW.X509, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<X509 \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<X509 at 0x{:x}>".format(id(self))
+
+ def get_hex_SKI_AKI(self):
+ ski = self.getSKI()
+ aki = self.getAKI()
+ return ski.encode("hex") if ski else "", aki.encode("hex") if aki else ""
+
+ @classmethod
+ def load(cls, obj, cms = None):
+ if cms is not None:
+ # XXX Kludge to work around lack of subclass support in rpki.POW.CMS.certs().
+ der = cms.certs()[0].derWrite()
+ else:
+ der = obj.der
+ self = cls.derRead(der)
+ self.obj = obj
+ self.bc = self.getBasicConstraints()
+ self.eku = self.getEKU()
+ self.aia = self.getAIA()
+ self.sia = self.getSIA()
+ self.crldp = self.getCRLDP()
+ self.is_ca = self.bc is not None and self.bc[0]
+ self.caDirectory, self.rpkiManifest, self.signedObjectRepository, self.rpkiNotify \
+ = self.sia or (None, None, None, None)
+ return self
+
+ @staticmethod
+ def count_uris(uris, scheme = "rsync://"):
+ count = 0
+ if uris is not None:
+ for uri in uris:
+ if uri.startswith(scheme):
+ count += 1
+ return count
+
+ def check(self, trusted, crl):
+ #logger.debug("Starting checks for %r", self)
+ status = Status.update(self.uri)
+ is_ta = trusted is None
+ is_routercert = (self.eku is not None and id_kp_bgpsec_router in self.eku and
+ not self.is_ca and self.uri.endswith(".cer"))
+ if self.eku is not None and (self.is_ca or not self.uri.endswith(".cer")):
+ status.add(codes.INAPPROPRIATE_EKU_EXTENSION)
+ if is_ta and not self.is_ca:
+ status.add(codes.MALFORMED_TRUST_ANCHOR)
+ if is_ta and self.aia is not None:
+ status.add(codes.AIA_EXTENSION_FORBIDDEN)
+ if not is_ta and self.aia is None:
+ status.add(codes.AIA_EXTENSION_MISSING)
+ if is_routercert and self.sia is not None:
+ status.add(codes.SIA_EXTENSION_FORBIDDEN)
+ if not is_routercert and self.sia is None:
+ status.add(codes.SIA_EXTENSION_MISSING)
+ if is_ta and self.crldp is not None:
+ status.add(codes.CRLDP_EXTENSION_FORBIDDEN)
+ if not is_ta and self.crldp is None:
+ status.add(codes.CRLDP_EXTENSION_MISSING)
+ if not is_ta and not self.aki:
+ status.add(codes.AKI_EXTENSION_MISSING)
+ elif not is_ta and self.aki != trusted[0].ski:
+ status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH)
+ serial = self.getSerial()
+ if serial <= 0 or serial > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF:
+ status.add(codes.BAD_CERTIFICATE_SERIAL_NUMBER)
+ if self.getVersion() != 2:
+ status.add(codes.WRONG_OBJECT_VERSION)
+ n_rsync_caIssuers = self.count_uris(self.aia)
+ n_rsync_caDirectory = self.count_uris(self.caDirectory)
+ n_rsync_rpkiManifest = self.count_uris(self.rpkiManifest)
+ n_rsync_signedObjectRepository = self.count_uris(self.signedObjectRepository)
+ if n_rsync_caIssuers > 1 or n_rsync_caDirectory > 1 or n_rsync_rpkiManifest > 1 or n_rsync_signedObjectRepository > 1:
+ status.add(codes.MULTIPLE_RSYNC_URIS_IN_EXTENSION)
+ if self.aia is not None and n_rsync_caIssuers == 0:
+ status.add(codes.MALFORMED_AIA_EXTENSION)
+ if self.is_ca:
+ ok = n_rsync_caDirectory != 0 and n_rsync_rpkiManifest != 0 and n_rsync_signedObjectRepository == 0
+ elif not is_routercert:
+ ok = n_rsync_caDirectory == 0 and n_rsync_rpkiManifest == 0 and n_rsync_signedObjectRepository != 0
+ else:
+ ok = self.sia is None
+ if not ok:
+ status.add(codes.MALFORMED_SIA_EXTENSION)
+ if not is_ta and self.count_uris(self.crldp) == 0:
+ status.add(codes.MALFORMED_CRLDP_EXTENSION)
+ self.checkRPKIConformance(status = status, eku = id_kp_bgpsec_router if is_routercert else None)
+ try:
+ self.verify(trusted = [self] if trusted is None else trusted, crl = crl, policy = "1.3.6.1.5.5.7.14.2",
+ context_class = X509StoreCTX.subclass(status = status))
+ except rpki.POW.ValidationError as e:
+ logger.debug("%r rejected: %s", self, e)
+ status.add(codes.OBJECT_REJECTED)
+ codes.normalize(status)
+ #logger.debug("Finished checks for %r", self)
+ return not any(s.kind == "bad" for s in status)
+
+
+class CRL(rpki.POW.CRL, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<CRL \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<CRL at 0x{:x}>".format(id(self))
+
+ def get_hex_SKI_AKI(self):
+ aki = self.getAKI()
+ return "", aki.encode("hex") if aki else ""
+
+ @classmethod
+ def load(cls, obj):
+ self = cls.derRead(obj.der)
+ self.obj = obj
+ self.thisUpdate = self.getThisUpdate()
+ self.nextUpdate = self.getNextUpdate()
+ self.number = self.getCRLNumber()
+ return self
+
+ def check(self, issuer):
+ status = Status.update(self.uri)
+ self.checkRPKIConformance(status = status, issuer = issuer)
+ try:
+ self.verify(issuer)
+ except rpki.POW.ValidationError as e:
+ logger.debug("%r rejected: %s", self, e)
+ status.add(codes.OBJECT_REJECTED)
+ codes.normalize(status)
+ if self.getVersion() != 1:
+ status.add(codes.WRONG_OBJECT_VERSION)
+ now = rpki.sundial.now()
+ if self.thisUpdate > now:
+ status.add(codes.CRL_NOT_YET_VALID)
+ if self.nextUpdate < now:
+ status.add(codes.STALE_CRL_OR_MANIFEST)
+ if self.number is None:
+ status.add(codes.CRL_NUMBER_EXTENSION_MISSING)
+ if self.number < 0:
+ status.add(codes.CRL_NUMBER_IS_NEGATIVE)
+ if self.number > 0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF:
+ status.add(codes.CRL_NUMBER_OUT_OF_RANGE)
+ if self.getIssuer() != issuer.getSubject():
+ status.add(codes.CRL_ISSUER_NAME_MISMATCH)
+ if not self.aki:
+ status.add(codes.AKI_EXTENSION_MISSING)
+ elif self.aki != issuer.ski:
+ status.add(codes.AKI_EXTENSION_ISSUER_MISMATCH)
+
+ return not any(s.kind == "bad" for s in status)
+
+
+class Ghostbuster(rpki.POW.CMS, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<Ghostbuster \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<Ghostbuster at 0x{:x}>".format(id(self))
+
+ @classmethod
+ def load(cls, obj):
+ self = cls.derRead(obj.der)
+ self.obj = obj
+ self.ee = X509.load(obj, self)
+ self.vcard = None
+ return self
+
+ def check(self, trusted, crl):
+ status = Status.update(self.uri)
+ self.ee.check(trusted = trusted, crl = crl)
+ try:
+ self.vcard = self.verify()
+ except rpki.POW.ValidationError as e:
+ logger.debug("%r rejected: %s", self, e)
+ status.add(codes.OBJECT_REJECTED)
+ self.checkRPKIConformance(status)
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+
+class Manifest(rpki.POW.Manifest, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<Manifest \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<Manifest at 0x{:x}>".format(id(self))
+
+ @classmethod
+ def load(cls, obj):
+ self = cls.derRead(obj.der)
+ self.obj = obj
+ self.ee = X509.load(obj, self)
+ self.fah = None
+ self.thisUpdate = None
+ self.nextUpdate = None
+ self.number = None
+ return self
+
+ def check(self, trusted, crl):
+ status = Status.update(self.uri)
+ self.ee.check(trusted = trusted, crl = crl)
+ try:
+ self.verify()
+ except rpki.POW.ValidationError as e:
+ logger.debug("%r rejected: %s", self, e)
+ status.add(codes.OBJECT_REJECTED)
+ self.checkRPKIConformance(status)
+ self.thisUpdate = self.getThisUpdate()
+ self.nextUpdate = self.getNextUpdate()
+ self.number = self.getManifestNumber()
+ self.fah = self.getFiles()
+ self.notBefore = self.ee.getNotBefore()
+ self.notAfter = self.ee.getNotAfter()
+ if self.thisUpdate < self.notBefore or self.nextUpdate > self.notAfter:
+ status.add(codes.MANIFEST_INTERVAL_OVERRUNS_CERT)
+ now = rpki.sundial.now()
+ if self.thisUpdate > now:
+ status.add(codes.MANIFEST_NOT_YET_VALID)
+ if self.nextUpdate < now:
+ status.add(codes.STALE_CRL_OR_MANIFEST)
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+ def find_crl_candidate_hashes(self):
+ for fn, digest in self.fah:
+ if fn.endswith(".crl"):
+ yield digest.encode("hex")
+
+
+class ROA(rpki.POW.ROA, POW_Mixin):
+
+ def __repr__(self):
+ try:
+ return "<ROA \"{}\" at 0x{:x}>".format(self.uri, id(self))
+ except:
+ return "<ROA at 0x{:x}>".format(id(self))
+
+ @classmethod
+ def load(cls, obj):
+ self = cls.derRead(obj.der)
+ self.obj = obj
+ self.ee = X509.load(obj, self)
+ self.asn = None
+ self.prefixes = None
+ return self
+
+ def check(self, trusted, crl):
+ status = Status.update(self.uri)
+ self.ee.check(trusted = trusted, crl = crl)
+ try:
+ vcard = self.verify()
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ self.checkRPKIConformance(status)
+ self.asn = self.getASID()
+ self.prefixes = self.getPrefixes()
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+
+class_dispatch = dict(cer = X509,
+ crl = CRL,
+ gbr = Ghostbuster,
+ mft = Manifest,
+ roa = ROA)
+
+def uri_to_class(uri):
+ cls = class_dispatch.get(uri[-3:]) if len(uri) > 4 and uri[-4] == "." else None
+ if cls is None:
+ Status.add(uri, None, codes.UNKNOWN_OBJECT_TYPE_SKIPPED)
+ return cls
+
+
+# If we find ourselves using this same ordering for every retrieval from the RPKIObjects model, we
+# can add it as a Meta option for the model and omit it in the query expressions, like this:
+#
+# class RPKIObjects(models.Model):
+# ...
+# class Meta:
+# ordering = ["-retrieved__started"]
+#
+# https://docs.djangoproject.com/en/1.8/ref/models/querysets/#order-by
+# https://docs.djangoproject.com/en/1.8/ref/models/options/#django.db.models.Options.ordering
+
+def fetch_objects(**kwargs):
+ for obj in RPKIObject.objects.filter(**kwargs).order_by("-retrieved__started"):
+ cls = uri_to_class(obj.uri)
+ if cls is not None:
+ yield cls.load(obj)
+
+
+class WalkFrame(object):
+ """
+ Certificate tree walk stack frame. This is basically just a
+ preamble and a loop, broken out into several separate methods so
+ that we can fork new tasks in the middle then resume processing of
+ the current state machine (ie, this frame) when appropriate (eg,
+ after an rsync or RRDP fetch completes).
+ """
+
+ def __init__(self, cer):
+ self.cer = cer
+ self.state = self.initial
+
+ def __repr__(self):
+ try:
+ return "<WalkFrame \"{}\" at 0x{:x}>".format(self.cer.uri, id(self))
+ except:
+ return "<WalkFrame at 0x{:x}>".format(id(self))
+
+ @tornado.gen.coroutine
+ def __call__(self, wsk):
+ yield self.state(wsk)
+
+ @tornado.gen.coroutine
+ def initial(self, wsk):
+
+ rsync_uri = first_rsync_uri(self.cer.caDirectory)
+ rrdp_uri = first_https_uri(self.cer.rpkiNotify)
+
+ if args.prefer_rsync:
+ uri = rsync_uri or rrdp_uri
+ else:
+ uri = rrdp_uri or rsync_uri
+
+ self.fetcher = Fetcher(uri)
+
+ if not self.fetcher.needed():
+ self.state = self.ready
+ elif not args.spawn_on_fetch:
+ self.state = self.fetch
+ else:
+ self.state = self.fetch
+ yield task_queue.put(wsk.clone())
+ wsk.pop()
+
+ @tornado.gen.coroutine
+ def fetch(self, wsk):
+ yield self.fetcher.fetch()
+ self.state = self.ready
+
+ @tornado.gen.coroutine
+ def ready(self, wsk):
+ self.trusted = wsk.trusted()
+
+ logger.debug("%r scanning products", self)
+
+ # NB: CRL checks on manifest EE certificates deferred until we've picked a CRL.
+
+ mft_candidates = []
+ crl_candidates = []
+ crl_candidate_hashes = set()
+
+ for mft in fetch_objects(aki = self.cer.ski, uri__endswith = ".mft"):
+ if mft.check(trusted = self.trusted, crl = None):
+ mft_candidates.append(mft)
+ crl_candidate_hashes.update(mft.find_crl_candidate_hashes())
+
+ if not mft_candidates:
+ wsk.pop()
+ return
+
+ for crl in fetch_objects(aki = self.cer.ski, uri__endswith = ".crl", sha256__in = crl_candidate_hashes):
+ if crl.check(self.trusted[0]):
+ crl_candidates.append(crl)
+
+ mft_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started))
+ crl_candidates.sort(reverse = True, key = lambda x: (x.number, x.thisUpdate, x.obj.retrieved.started))
+
+ if not crl_candidates:
+ wsk.pop()
+ return
+
+ self.crl = crl_candidates[0]
+
+ install_object(self.crl)
+ Status.add(self.crl.uri, codes.OBJECT_ACCEPTED)
+
+ #logger.debug("Picked CRL %s", self.crl.uri)
+
+ for mft in mft_candidates:
+ if self.crl.isRevoked(mft.ee):
+ Status.add(mft.obj.uri, codes.MANIFEST_EE_REVOKED)
+ continue
+ self.mft = mft
+ break
+ else:
+ wsk.pop()
+ return
+
+ install_object(self.mft)
+ Status.add(self.mft.obj.uri, codes.OBJECT_ACCEPTED)
+
+ self.stale_crl = Status.test(self.crl.uri, codes.STALE_CRL_OR_MANIFEST)
+ self.stale_mft = Status.test(self.mft.uri, codes.STALE_CRL_OR_MANIFEST)
+
+ # Issue warnings on mft and crl URI mismatches?
+
+ # Use an explicit iterator so we can resume it; run loop in separate method, same reason.
+
+ self.mft_iterator = iter(self.mft.getFiles())
+ self.state = self.loop
+
+ @tornado.gen.coroutine
+ def loop(self, wsk):
+
+ #logger.debug("Processing %s", self.mft.uri)
+
+ for fn, digest in self.mft_iterator:
+
+ yield tornado.gen.moment
+
+ uri = self.mft.uri[:self.mft.uri.rindex("/") + 1] + fn
+
+ # Need general URI validator here?
+
+ if uri == self.crl.uri:
+ continue
+
+ cls = uri_to_class(uri)
+
+ if cls is None:
+ continue
+
+ if cls in (Manifest, CRL):
+ Status.add(uri, None, codes.INAPPROPRIATE_OBJECT_TYPE_SKIPPED)
+ continue
+
+ for obj in fetch_objects(sha256 = digest.encode("hex")):
+
+ if self.stale_crl:
+ Status.add(uri, codes.TAINTED_BY_STALE_CRL)
+ if self.stale_mft:
+ Status.add(uri, codes.TAINTED_BY_STALE_MANIFEST)
+
+ if not obj.check(trusted = self.trusted, crl = self.crl):
+ Status.add(uri, codes.OBJECT_REJECTED)
+ continue
+
+ install_object(obj)
+ Status.add(uri, codes.OBJECT_ACCEPTED)
+
+ if cls is not X509 or not obj.is_ca:
+ break
+
+ wsk.push(obj)
+ return
+
+ wsk.pop()
+
+
+class WalkTask(object):
+ """
+ Task corresponding to one walk stack, roughly analgous to
+ STACK_OF(walk_ctx_t) in rcynic:tos.
+ """
+
+ def __init__(self, wsk = None, cer = None):
+ self.wsk = [] if wsk is None else wsk
+ if cer is not None:
+ self.push(cer)
+
+ def __repr__(self):
+ try:
+ return "<WalkTask \"{}\" at 0x{:x}>".format(self.wsk[-1].cer.uri, id(self))
+ except:
+ return "<WalkTask at 0x{:x}>".format(id(self))
+
+ @tornado.gen.coroutine
+ def __call__(self):
+ while self.wsk:
+ yield self.wsk[-1](wsk = self)
+
+ def push(self, cer):
+ self.wsk.append(WalkFrame(cer))
+
+ def pop(self):
+ return self.wsk.pop()
+
+ def clone(self):
+ return WalkTask(wsk = list(self.wsk))
+
+ def trusted(self):
+ stack = [w.cer for w in self.wsk]
+ stack.reverse()
+ return stack
+
+
+def read_tals():
+ for head, dirs, files in os.walk(args.trust_anchor_locators):
+ for fn in files:
+ if fn.endswith(".tal"):
+ furi = "file://" + os.path.abspath(os.path.join(head, fn))
+ try:
+ with open(os.path.join(head, fn), "r") as f:
+ lines = [line.strip() for line in f]
+ blank = lines.index("")
+ uris = lines[:blank]
+ key = rpki.POW.Asymmetric.derReadPublic("".join(lines[blank:]).decode("base64"))
+ if not uris or not all(uri.endswith(".cer") for uri in uris):
+ Status.add(furi, None, codes.MALFORMED_TAL_URI)
+ yield uris, key
+ except:
+ Status.add(furi, None, codes.UNREADABLE_TRUST_ANCHOR_LOCATOR)
+
+
+def uri_to_filename(uri, base = None):
+ fn = uri[uri.index("://")+3:]
+ if base is not None:
+ fn = os.path.join(base, fn)
+ return fn
+
+def first_uri(uris, scheme):
+ if uris is not None:
+ for uri in uris:
+ if uri.startswith(scheme):
+ return uri
+ return None
+
+def first_rsync_uri(uris):
+ return first_uri(uris, "rsync://")
+
+def first_https_uri(uris):
+ return first_uri(uris, "https://")
+
+def sha256hex(bytes):
+ d = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
+ d.update(bytes)
+ return d.digest().encode("hex")
+
+
+class RRDP_ParseFailure(Exception):
+ "Failure parsing RRDP message."
+
+class DeadHost(Exception):
+ "Host recently tried and known to be unavailable."
+
+
+class Fetcher(object):
+ """
+ Network transfer methods and history database.
+
+ At the moment this is rsync-only; eventually it will include
+ support for HTTPS and RRDP.
+ """
+
+ # Internal protocol:
+ #
+ # - Instances which have just gotten to the query stage are not registered
+ #
+ # - Instances which are in progress are listed in .history and
+ # have a Condition object in .pending; instances which depend on
+ # this should wait for the condition, then return.
+ #
+ # - Instances which have completed are listed in .history and have
+ # .pending set to None.
+
+ _rsync_deadhosts = set()
+ _rsync_history = dict()
+
+ _https_deadhosts = set()
+ _https_history = dict()
+
+ def __init__(self, uri, ta = False):
+ self.uri = uri
+ self.ta = ta
+ self.pending = None
+ self.status = None
+
+ def _rsync_split_uri(self):
+ return tuple(self.uri.rstrip("/").split("/")[2:])
+
+ def _rsync_find(self, path):
+ for i in xrange(1, len(path)):
+ target = path[:i+1]
+ try:
+ return self._rsync_history[target]
+ except KeyError:
+ continue
+ return None
+
+ def needed(self):
+ if not args.fetch:
+ return False
+ if self.uri.startswith("rsync://"):
+ return self._rsync_needed()
+ if self.uri.startswith("https://"):
+ return self._https_needed()
+ raise ValueError
+
+ def _rsync_needed(self):
+ path = self._rsync_split_uri()
+ if path[0] in self._rsync_deadhosts:
+ return False
+ entry = self._rsync_find(path)
+ return entry is None or entry.pending is not None
+
+ def _https_needed(self):
+ netloc = urlparse.urlparse(self.uri).netloc
+ if netloc in self._https_deadhosts:
+ return False
+ entry = self._https_history.get(self.uri)
+ return entry is None or entry.pending is not None
+
+ def fetch(self):
+ if self.uri.startswith("rsync://"):
+ return self._rsync_fetch()
+ if self.uri.startswith("https://"):
+ return self._https_fetch_ta() if self.ta else self._rrdp_fetch()
+ raise ValueError
+
+ @tornado.gen.coroutine
+ def _rsync_fetch(self):
+ assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") if self.ta else self.uri.endswith("/"))
+
+ if not args.fetch:
+ return
+ path = self._rsync_split_uri()
+ dead = path[0] in self._rsync_deadhosts
+ other = self._rsync_find(path)
+ if not dead and other is not None and other.pending is not None:
+ yield other.pending.wait()
+ if dead or other is not None:
+ return
+
+ self.pending = tornado.locks.Condition()
+ self._rsync_history[path] = self
+
+ try:
+ path = uri_to_filename(self.uri, args.unauthenticated)
+ cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes"]
+ if self.uri.endswith("/"):
+ cmd.append("--recursive")
+ cmd.append("--delete")
+ cmd.append(self.uri)
+ cmd.append(path)
+
+ dn = os.path.dirname(path)
+ if not os.path.exists(dn):
+ os.makedirs(dn)
+
+ # We use the stdout close from rsync to detect when the subprocess has finished.
+ # There's a lovely tornado.process.Subprocess.wait_for_exit() method which does
+ # exactly what one would think we'd want -- but Unix signal handling still hasn't
+ # caught up to the software interrupt architecture ITS had forty years ago, so
+ # signals still cause random "system call interrupted" failures in other libraries.
+ # Nothing Tornado can do about this, so we avoid signals entirely and collect the
+ # process exit status directly from the operating system. In theory, the WNOHANG
+ # isn't necessary here, we use it anyway to be safe in case theory is wrong.
+
+ # If we need to add a timeout here to guard against rsync processes taking too long
+ # (which has happened in the past with, eg, LACNIC), see tornado.gen.with_timeout()
+ # (documented in the utility functions section of the tornado.gen page), which wraps
+ # any future in a timeout.
+
+ t0 = time.time()
+ rsync = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT)
+ logger.debug("rsync[%s] started \"%s\"", rsync.pid, " ".join(cmd))
+ output = yield rsync.stdout.read_until_close()
+ pid, self.status = os.waitpid(rsync.pid, os.WNOHANG)
+ t1 = time.time()
+ if (pid, self.status) == (0, 0):
+ logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", rsync.pid)
+ for line in output.splitlines():
+ logger.debug("rsync[%s] %s", rsync.pid, line)
+ logger.debug("rsync[%s] finished after %s seconds with status 0x%x", rsync.pid, t1 - t0, self.status)
+
+ # Should do something with rsync result and validation status database here.
+
+ retrieval = Retrieval.objects.create(
+ uri = self.uri,
+ started = rpki.sundial.datetime.fromtimestamp(t0),
+ finished = rpki.sundial.datetime.fromtimestamp(t1),
+ successful = self.status == 0)
+
+ for fn in self._rsync_walk(path):
+ yield tornado.gen.moment
+ uri = "rsync://" + fn[len(args.unauthenticated):].lstrip("/")
+ cls = uri_to_class(uri)
+ if cls is not None:
+ try:
+ with open(fn, "rb") as f:
+ cls.store_if_new(f.read(), uri, retrieval)
+ except:
+ Status.add(uri, codes.UNREADABLE_OBJECT)
+ logger.exception("Couldn't read %s from rsync tree", uri)
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
+
+ def _rsync_walk(self, path):
+ if self.uri.endswith("/"):
+ for head, dirs, files in os.walk(path):
+ for fn in files:
+ yield os.path.join(head, fn)
+ elif os.path.exists(path):
+ yield path
+
+ @tornado.gen.coroutine
+ def _https_fetch_url(self, url, streaming_callback = None):
+
+ if urlparse.urlparse(url).netloc in self._https_deadhosts:
+ raise DeadHost
+
+ # Should do something with deadhost processing below. Looks
+ # like errors such as HTTP timeout show up as
+ # tornado.httpclient.HTTPError exceptions (which we could
+ # suppress if we wanted to do so, but we probably don't).
+ # HTTP timeout shows up in the logs as "HTTP 599". See doc for:
+ #
+ # tornado.httpclient.AsyncHTTPClient.fetch()
+ # tornado.httpclient.HTTPError
+
+ # Might need to do something with If-Modified-Since support
+ # See if_modified_since argument to
+ # http://www.tornadoweb.org/en/stable/httpclient.html#request-objects
+ # (which we can pass to client.fetch(), below). Not sure how
+ # "you don't need to retrieve this" result comes back,
+ # probably a benign exception we need to catch. Supporting
+ # this means adding another null-able timestamp field to the
+ # RRDPSnapshot model (which probably should be named the
+ # RRDPZone model instead), and storing a datetime there.
+ # Would also need to pull timestamp from the Last-Modified
+ # header in the response object.
+
+ try:
+ ok = False
+ t0 = time.time()
+ client = tornado.httpclient.AsyncHTTPClient(max_body_size = args.max_https_body_size)
+ response = yield client.fetch(url,
+ streaming_callback = streaming_callback,
+ validate_cert = args.validate_https,
+ connect_timeout = args.https_timeout,
+ request_timeout = args.https_timeout)
+ # Might want to check response Content-Type here
+ ok = True
+
+ except tornado.httpclient.HTTPError as e:
+ # Might want to check e.response here to figure out whether to add to _https_deadhosts.
+ logger.info("HTTP error for %s: %s", url, e)
+ raise
+
+ except (socket.error, IOError, ssl.SSLError) as e:
+ # Might want to check e.errno here to figure out whether to add to _https_deadhosts.
+ logger.info("Network I/O error for %s: %s", url, e)
+ raise
+
+ except Exception as e:
+ logger.exception("Error (%r) for %s", type(e), url)
+ raise
+
+ finally:
+ t1 = time.time()
+ logger.debug("Fetch of %s finished after %s seconds", url, t1 - t0)
+ retrieval = Retrieval.objects.create(
+ uri = url,
+ started = rpki.sundial.datetime.fromtimestamp(t0),
+ finished = rpki.sundial.datetime.fromtimestamp(t1),
+ successful = ok)
+ if ok:
+ raise tornado.gen.Return((retrieval, response))
+
+ @tornado.gen.coroutine
+ def _https_fetch_ta(self):
+
+ if not args.fetch:
+ return
+
+ other = self._https_history.get(self.uri)
+ if other is not None and other.pending is not None:
+ yield other.pending.wait()
+ return
+
+ self.pending = tornado.locks.Condition()
+ self._rsync_history[self.uri] = self
+
+ try:
+ retrieval, response = yield self._https_fetch_url(self.uri)
+ X509.store_if_new(response.body, self.uri, retrieval)
+ except:
+ logger.exception("Couldn't load %s", self.uri)
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch_notification(self, url):
+
+ retrieval, response = yield self._https_fetch_url(url)
+
+ notification = ElementTree(file = response.buffer).getroot()
+
+ rpki.relaxng.rrdp.schema.assertValid(notification)
+
+ if notification.tag != tag_notification:
+ raise RRDP_ParseFailure("Expected RRDP notification for {}, got {}".format(url, notification.tag))
+
+ raise tornado.gen.Return((retrieval, notification))
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch_data_file(self, url, expected_hash):
+
+ sha256 = rpki.POW.Digest(rpki.POW.SHA256_DIGEST)
+ xml_file = tempfile.SpooledTemporaryFile()
+
+ retrieval, response = yield self._https_fetch_url(url, lambda data: (sha256.update(data), xml_file.write(data)))
+
+ received_hash = sha256.digest().encode("hex")
+ xml_file.seek(0)
+
+ if received_hash != expected_hash.lower():
+ raise RRDP_ParseFailure("Expected RRDP hash {} for {}, got {}".format(expected_hash.lower(), url, received_hash))
+
+ raise tornado.gen.Return((retrieval, response, xml_file))
+
+ @tornado.gen.coroutine
+ def _rrdp_bulk_create(self, new_objs, existing_objs):
+ from django.db import IntegrityError
+
+ #logger.debug("Bulk creation of new RPKIObjects")
+
+ try:
+ RPKIObject.objects.bulk_create(new_objs)
+
+ except IntegrityError:
+ #logger.debug("Some objects already existed, weeding and retrying")
+ i = 0
+ while i < len(new_objs):
+ yield tornado.gen.moment
+ try:
+ existing_objs.append(RPKIObject.objects.values_list("pk", flat = True).get(der = new_objs[i].der))
+ logger.debug("Object existed in SQL but, apparently, not in prior copy of snapshot: uri %s sha256 %s",
+ new_objs[i].uri, new_objs[i].sha256)
+ except RPKIObject.DoesNotExist:
+ i += 1
+ else:
+ del new_objs[i]
+ RPKIObject.objects.bulk_create(new_objs)
+
+ del new_objs[:]
+
+ @tornado.gen.coroutine
+ def _rrdp_fetch(self):
+ from django.db import transaction
+
+ if not args.fetch:
+ return
+
+ other = self._https_history.get(self.uri)
+ if other is not None and other.pending is not None:
+ yield other.pending.wait()
+ return
+
+ self.pending = tornado.locks.Condition()
+ self._https_history[self.uri] = self
+
+ try:
+ retrieval, notification = yield self._rrdp_fetch_notification(url = self.uri)
+
+ session_id = notification.get("session_id")
+ serial = long(notification.get("serial"))
+
+ snapshot = RRDPSnapshot.objects.filter(
+ session_id = session_id).order_by("-retrieved__started").first()
+
+ logger.debug("RRDP notification for %s session_id %s serial %s current snapshot %r",
+ self.uri, session_id, serial, snapshot)
+
+ if snapshot is not None and snapshot.serial == serial:
+ logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
+ return
+
+ deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
+ for delta in notification.iterchildren(tag_delta))
+
+ if snapshot is None or snapshot.serial + 1 not in deltas:
+
+ existing_rpkiobject_map = dict()
+
+ if snapshot is not None:
+ logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
+ existing_rpkiobject_map.update(snapshot.rpkiobject_set.values_list("sha256", "pk"))
+
+ x = notification.find(tag_snapshot)
+
+ url, hash = x.get("uri"), x.get("hash")
+
+ logger.debug("RRDP %s loading from snapshot %s serial %s", self.uri, url, serial)
+
+ retrieval, response, xml_file = yield self._rrdp_fetch_data_file(url, hash)
+
+ snapshot = RRDPSnapshot.objects.create(session_id = session_id, serial = serial)
+
+ # Value of "chunk" here may need to be configurable. Larger numbers batch more objects in
+ # a single bulk addition, which is faster ... unless one or more of them isn't really new, in
+ # which case we have to check everything in that batch when we get the IntegrityError, so
+ # the smaller the batch, the faster that check. No single good answer.
+
+ root = None
+ existing_rpkiobjects = []
+ new_rpkiobjects = []
+ chunk = 2000
+
+ for event, node in iterparse(xml_file):
+ if node is root:
+ continue
+
+ if root is None:
+ root = node.getparent()
+ if root is None or root.tag != tag_snapshot \
+ or root.get("version") != "1" \
+ or any(a not in ("version", "session_id", "serial") for a in root.attrib):
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url))
+ if root.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(
+ session_id, url, root.get("session_id")))
+ if long(root.get("serial")) != long(serial):
+ raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(
+ serial, url, root.get("serial")))
+
+ if node.tag != tag_publish or node.getparent() is not root \
+ or any(a != "uri" for a in node.attrib):
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP snapshot file".format(url))
+
+ uri = node.get("uri")
+ cls = uri_to_class(uri)
+ if cls is None:
+ raise RRDP_ParseFailure("Unexpected URI {}".format(uri))
+
+ der = node.text.decode("base64")
+ sha256 = sha256hex(der)
+ try:
+ existing_rpkiobjects.append(existing_rpkiobject_map[sha256])
+ except KeyError:
+ ski, aki = cls.derRead(der).get_hex_SKI_AKI()
+ new_rpkiobjects.append(RPKIObject(der = der, uri = uri, ski = ski, aki = aki,
+ retrieved = retrieval, sha256 = sha256))
+
+ node.clear()
+ while node.getprevious() is not None:
+ del root[0]
+
+ if len(new_rpkiobjects) > chunk:
+ yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects)
+
+ yield tornado.gen.moment
+
+ if len(new_rpkiobjects) > 0:
+ yield self._rrdp_bulk_create(new_rpkiobjects, existing_rpkiobjects)
+
+ RPKIObject.snapshot.through.objects.bulk_create([
+ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i)
+ for i in retrieval.rpkiobject_set.values_list("pk", flat = True)])
+
+ RPKIObject.snapshot.through.objects.bulk_create([
+ RPKIObject.snapshot.through(rrdpsnapshot_id = snapshot.id, rpkiobject_id = i)
+ for i in existing_rpkiobjects])
+
+ snapshot.retrieved = retrieval
+ snapshot.save()
+
+ xml_file.close()
+
+ else:
+ logger.debug("RRDP %s %s deltas (%s--%s)", self.uri,
+ (serial - snapshot.serial), snapshot.serial, serial)
+
+ deltas = [(serial, deltas[serial][0], deltas[serial][1])
+ for serial in xrange(snapshot.serial + 1, serial + 1)]
+ futures = []
+
+ while deltas or futures:
+
+ while deltas and len(futures) < args.fetch_ahead_goal:
+ serial, url, hash = deltas.pop(0)
+ logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url)
+ futures.append(self._rrdp_fetch_data_file(url, hash))
+
+ retrieval, response, xml_file = yield futures.pop(0)
+
+ root = None
+
+ with transaction.atomic():
+ snapshot.serial += 1
+ snapshot.save()
+ logger.debug("RRDP %s serial %s loading", self.uri, snapshot.serial)
+
+ for event, node in iterparse(xml_file):
+ if node is root:
+ continue
+
+ if root is None:
+ root = node.getparent()
+ if root is None or root.tag != tag_delta \
+ or root.get("version") != "1" \
+ or any(a not in ("version", "session_id", "serial") for a in root.attrib):
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url))
+ if root.get("session_id") != session_id:
+ raise RRDP_ParseFailure("Expected RRDP session_id {} for {}, got {}".format(
+ session_id, url, root.get("session_id")))
+ if long(root.get("serial")) != snapshot.serial:
+ raise RRDP_ParseFailure("Expected RRDP serial {} for {}, got {}".format(
+ snapshot.serial, url, root.get("serial")))
+
+ hash = node.get("hash")
+
+ if node.getparent() is not root or node.tag not in (tag_publish, tag_withdraw) \
+ or (node.tag == tag_withdraw and hash is None) \
+ or any(a not in ("uri", "hash") for a in node.attrib):
+ raise RRDP_ParseFailure("{} doesn't look like an RRDP delta file".format(url))
+
+ if node.tag == tag_withdraw or node.get("hash") is not None:
+ snapshot.rpkiobject_set.remove(snapshot.rpkiobject_set.get(sha256 = node.get("hash").lower()))
+
+ if node.tag == tag_publish:
+ uri = node.get("uri")
+ cls = uri_to_class(uri)
+ if cls is None:
+ raise RRDP_ParseFailure("Unexpected URI %s" % uri)
+ obj, created = cls.store_if_new(node.text.decode("base64"), uri, retrieval)
+ obj.snapshot.add(snapshot)
+
+ node.clear()
+ while node.getprevious() is not None:
+ del root[0]
+
+ #yield tornado.gen.moment
+
+ xml_file.close()
+
+ logger.debug("RRDP %s done processing deltas", self.uri)
+
+ except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError):
+ pass # Already logged
+
+ except RRDP_ParseFailure as e:
+ logger.info("RRDP parse failure: %s", e)
+
+ except:
+ logger.exception("Couldn't load %s", self.uri)
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
+
+
+class CheckTALTask(object):
+
+ def __init__(self, uris, key):
+ rsync_uri = first_rsync_uri(uris)
+ https_uri = first_https_uri(uris)
+
+ if args.prefer_rsync:
+ self.uri = rsync_uri or https_uri
+ else:
+ self.uri = https_uri or rsync_uri
+
+ self.key = key
+
+ def __repr__(self):
+ return "<CheckTALTask: \"{}\">".format(self.uri)
+
+ @tornado.gen.coroutine
+ def __call__(self):
+ yield Fetcher(self.uri, ta = True).fetch()
+ for cer in fetch_objects(uri = self.uri):
+ if self.check(cer):
+ yield task_queue.put(WalkTask(cer = cer))
+ break
+ else:
+ Status.add(self.uri, codes.TRUST_ANCHOR_SKIPPED)
+
+ def check(self, cer):
+ if self.key.derWritePublic() != cer.getPublicKey().derWritePublic():
+ Status.add(self.uri, codes.TRUST_ANCHOR_KEY_MISMATCH)
+ ok = False
+ else:
+ ok = cer.check(trusted = None, crl = None)
+ if ok:
+ install_object(cer)
+ Status.add(self.uri, codes.OBJECT_ACCEPTED)
+ else:
+ Status.add(self.uri, codes.OBJECT_REJECTED)
+ return ok
+
+
+@tornado.gen.coroutine
+def worker(meself):
+ #
+ # NB: This particular style of control loop REQUIRES an except
+ # clause, even if that except clause is just a pass statement.
+ #
+ while True:
+ task = yield task_queue.get()
+ name = repr(task)
+ try:
+ logger.debug("Worker %s starting %s, queue length %s", meself, name, task_queue.qsize())
+ yield task()
+ except:
+ logger.exception("Worker %s caught unhandled exception from %s", meself, name)
+ finally:
+ task_queue.task_done()
+ logger.debug("Worker %s finished %s, queue length %s", meself, name, task_queue.qsize())
+
+
+def final_report():
+ # Clean up a bit to avoid confusing the user unnecessarily.
+ for s in Status.db.itervalues():
+ if codes.OBJECT_ACCEPTED in s.status:
+ s.status.discard(codes.OBJECT_REJECTED)
+ doc = Element("rcynic-summary", date = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
+ doc.set("reporting-hostname", socket.getfqdn())
+ doc.set("rcynic-version", "rcynicng")
+ doc.set("summary-version", "1")
+ labels = SubElement(doc, "labels")
+ for code in codes.all():
+ SubElement(labels, code.name, kind = code.kind).text = code.text
+ for uri in Status.db:
+ for sym in sorted(Status.db[uri].status):
+ SubElement(doc, "validation_status",
+ timestamp = str(Status.db[uri].timestamp),
+ status = str(sym),
+ generation = "None" # Historical relic, remove eventually
+ ).text = uri
+ #
+ # Should generate <rsync_history/> elements here too, later
+ #
+ ElementTree(doc).write(file = argparse.FileType("w")(args.xml_file),
+ pretty_print = True)
+
+
+def final_cleanup():
+ from django.db import transaction, models
+
+ def report(when):
+ logger.debug("Database %s cleanup: %s Authenticated %s RRDPSnapshot %s RPKIObject %s Retrieval", when,
+ Authenticated.objects.all().count(), RRDPSnapshot.objects.all().count(),
+ RPKIObject.objects.all().count(), Retrieval.objects.all().count())
+
+ report("before")
+
+ with transaction.atomic():
+
+ #logger.debug("Flushing incomplete RRDP snapshots")
+
+ q = RRDPSnapshot.objects
+ q = q.filter(retrieved__isnull = True)
+ q.delete()
+
+ #logger.debug("Flushing old authenticated sets")
+
+ q = Authenticated.objects
+ q = q.exclude(id = authenticated.id)
+ q.delete()
+
+ #logger.debug("Flushing RRDP snapshots which don't contain anything in the (remaining) authenticated set")
+
+ q = RPKIObject.objects
+ q = q.filter(authenticated = authenticated.id)
+ q = q.exclude(snapshot = None)
+ q = q.order_by("snapshot__id")
+ q = q.values_list("snapshot__id", flat = True)
+ q = q.distinct()
+ q = RRDPSnapshot.objects.exclude(id__in = q)
+ q.delete()
+
+ #logger.debug("Flushing RPKI objects which are in neither current authenticated set nor current RRDP snapshot")
+
+ q = RPKIObject.objects
+ q = q.filter(authenticated = None) # was: q = q.exclude(authenticated = authenticated.id)
+ q = q.filter(snapshot = None)
+ q.delete()
+
+ #logger.debug("Flushing retrieval objects which are no longer related to any RPKI objects or RRDP snapshot")
+
+ q = RPKIObject.objects
+ q = q.order_by("retrieved__id")
+ q = q.values_list("retrieved__id", flat = True)
+ q = q.distinct()
+ q = Retrieval.objects.exclude(id__in = q)
+ q = q.filter(rrdpsnapshot = None)
+ q.delete()
+
+ report("after")
+
+
+@tornado.gen.coroutine
+def launcher():
+ for i in xrange(args.workers):
+ tornado.ioloop.IOLoop.current().spawn_callback(worker, i)
+
+ yield [task_queue.put(CheckTALTask(uris, key)) for uris, key in read_tals()]
+ yield task_queue.join()
+
+
+class posint(int):
+ def __init__(self, value):
+ if self <= 0:
+ raise ValueError
+
+
+def main():
+ global rpki
+
+ os.environ.update(TZ = "UTC",
+ DJANGO_SETTINGS_MODULE = "rpki.django_settings.rcynic")
+ time.tzset()
+
+ cfg = rpki.config.argparser(section = "rcynic", doc = __doc__, cfg_optional = True)
+
+ cfg.add_logging_arguments()
+
+ cfg.add_argument("-u", "--unauthenticated",
+ help = "where to store unauthenticated data retrieved via rsycnc",
+ default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "unauthenticated"))
+
+ cfg.add_argument("-x", "--xml-file",
+ help = "where to write XML log of validation results",
+ default = os.path.join(rpki.autoconf.RCYNIC_DIR, "data", "rcynic.xml"))
+
+ cfg.add_argument("-t", "--trust-anchor-locators", "--tals",
+ help = "where to find trust anchor locators",
+ default = os.path.join(rpki.autoconf.sysconfdir, "rpki", "trust-anchors"))
+
+ cfg.add_argument("-w", "--workers", type = posint,
+ help = "number of worker pseudo-threads to allow",
+ default = 10)
+
+ cfg.add_argument("--fetch-ahead-goal", type = posint,
+ help = "how many deltas we want in the fetch-ahead pipe",
+ default = 2)
+
+ cfg.add_argument("--https-timeout", type = posint,
+ help = "HTTPS connection timeout, in seconds",
+ default = 300)
+
+ cfg.add_argument("--max-https-body-size", type = posint,
+ help = "upper limit on byte length of HTTPS message body",
+ default = 512 * 1024 * 1024)
+
+ cfg.add_boolean_argument("--fetch", default = True,
+ help = "whether to fetch data at all")
+
+ cfg.add_boolean_argument("--spawn-on-fetch", default = True,
+ help = "whether to spawn new pseudo-threads on fetch")
+
+ cfg.add_boolean_argument("--migrate", default = True,
+ help = "whether to migrate the ORM database on startup")
+
+ cfg.add_boolean_argument("--prefer-rsync", default = False,
+ help = "whether to prefer rsync over RRDP")
+
+ cfg.add_boolean_argument("--validate-https", default = False,
+ help = "whether to validate HTTPS server certificates")
+
+ global args
+ args = cfg.argparser.parse_args()
+
+ cfg.configure_logging(args = args, ident = "rcynic")
+
+ import django
+ django.setup()
+
+ if args.migrate:
+ # Not sure we should be doing this on every run, but sure simplifies things.
+ import django.core.management
+ django.core.management.call_command("migrate", verbosity = 0, interactive = False)
+
+ import rpki.rcynicdb
+ global Retrieval
+ global Authenticated
+ global RRDPSnapshot
+ global RPKIObject
+ Retrieval = rpki.rcynicdb.models.Retrieval
+ Authenticated = rpki.rcynicdb.models.Authenticated
+ RRDPSnapshot = rpki.rcynicdb.models.RRDPSnapshot
+ RPKIObject = rpki.rcynicdb.models.RPKIObject
+
+
+ global authenticated
+ authenticated = Authenticated.objects.create(started = rpki.sundial.datetime.now())
+
+ global task_queue
+ task_queue = tornado.queues.Queue()
+ tornado.ioloop.IOLoop.current().run_sync(launcher)
+
+ authenticated.finished = rpki.sundial.datetime.now()
+ authenticated.save()
+
+ final_report()
+
+ final_cleanup()
+
+
+if __name__ == "__main__":
+ main()