aboutsummaryrefslogtreecommitdiff
path: root/rp/rcynic
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-11-22 08:55:25 +0000
committerRob Austein <sra@hactrn.net>2015-11-22 08:55:25 +0000
commit72eb9c012c6d4cdf5f470607e5eacf6e36114457 (patch)
tree457c1f922021fb84426b2f71255b9c9fb90778b7 /rp/rcynic
parent6ec036bfdceadc4ca72f27d4369dfded0c194a19 (diff)
Convert certificate tree walking code to use new tasking model.
Get full rsync code working, history cache and all. svn path=/branches/tk705/; revision=6184
Diffstat (limited to 'rp/rcynic')
-rwxr-xr-xrp/rcynic/rcynicng694
1 files changed, 480 insertions, 214 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 1c7d6de8..7e136ca8 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -20,16 +20,20 @@ import argparse
import subprocess
import tornado.gen
+import tornado.locks
import tornado.ioloop
import tornado.queues
import tornado.process
import rpki.POW
+from rpki.oids import id_kp_bgpsec_router
+
from lxml.etree import ElementTree, Element, SubElement, Comment
logger = logging.getLogger("rcynicng")
+codes = rpki.POW.validation_status
class Generation(object):
@@ -72,8 +76,8 @@ class Status(object):
self.status = set()
def __str__(self):
- return "{self.timestamp} {self.uri} {status} {self.generation}".format(
- self = self, status = ",".join(str(s) for s in sorted(self.status)))
+ return "{my.timestamp} {my.uri} {status} {my.generation}".format(
+ my = self, status = ",".join(str(s) for s in sorted(self.status)))
@property
def timestamp(self):
@@ -94,75 +98,341 @@ class Status(object):
return self.status
-class CertInfo(object):
- """
- Certificate info, like rcynic:tos certinfo_t. Not sure we really
- need this, but follow rcynic:tos model for now, clean up later.
- """
+class X509(rpki.POW.X509):
- def __init__(self, **kwargs):
- bools = ("ca", "ta")
- nones = ("generation", "uri", "caIssuers", "crldp",
- "caDirectory", "rpkiManifest", "signedObjectRepository", "rpkiNotify")
- for name in bools:
- setattr(self, name, False)
- for name in nones:
- setattr(self, name, None)
- for name in kwargs:
- if name in bools + nones:
- setattr(self, name, kwargs[name])
- else:
- raise ValueError
+ @classmethod
+ def derReadURI(cls, uri, generation, cms = None):
+ fn = uri_to_filename(uri, generation.tree)
+ if not os.path.exists(fn):
+ Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND)
+ return None
+ if cms is None:
+ with open(fn, "rb") as f:
+ der = f.read()
+ else:
+ # XXX awful kludge to work around current lack of subclass
+ # support in rpki.POW.CMS.certs().
+ der = cms.certs()[0].derWrite()
+ self = cls.derRead(der)
+ self.uri = uri
+ self.fn = fn
+ self.generation = generation
+ self.sha256 = sha256(der) if cms is None else None
+ self.bc = self.getBasicConstraints()
+ self.aki = self.getAKI()
+ self.ski = self.getSKI()
+ self.eku = self.getEKU()
+ self.aia = self.getAIA()
+ self.sia = self.getSIA()
+ self.crldp = self.getCRLDP()
+ self.is_ca = self.bc is not None and self.bc[0]
+ self.caDirectory, self.rpkiManifest, self.signedObjectRepository, self.rpkiNotify \
+ = self.sia or (None, None, None, None)
+ return self
+
+ @staticmethod
+ def count_uris(uris, scheme = "rsync://"):
+ count = 0
+ if uris is not None:
+ for uri in uris:
+ if uri.startswith(scheme):
+ count += 1
+ return count
+
+ def check(self, trusted = None, crl = None):
+ status = Status.update(self.uri, self.generation)
+ is_ta = trusted is None and crl is None
+ is_routercert = (self.eku is not None and id_kp_bgpsec_router in self.eku and
+ not self.is_ca and self.uri.endswith(".cer"))
+ if self.eku is not None and (self.is_ca or not self.uri.endswith(".cer")):
+ status.add(codes.INAPPROPRIATE_EKU_EXTENSION)
+ if is_ta and not self.is_ca:
+ status.add(codes.MALFORMED_TRUST_ANCHOR)
+ if is_ta and self.aia is not None:
+ status.add(codes.AIA_EXTENSION_FORBIDDEN)
+ if not is_ta and self.aia is None:
+ status.add(codes.AIA_EXTENSION_MISSING)
+ if is_routercert and self.sia is not None:
+ status.add(codes.SIA_EXTENSION_FORBIDDEN)
+ if not is_routercert and self.sia is None:
+ status.add(codes.SIA_EXTENSION_MISSING)
+ if is_ta and self.crldp is not None:
+ status.add(codes.CRLDP_EXTENSION_FORBIDDEN)
+ if not is_ta and self.crldp is None:
+ status.add(codes.CRLDP_EXTENSION_MISSING)
+ n_rsync_caIssuers = self.count_uris(self.aia)
+ n_rsync_caDirectory = self.count_uris(self.caDirectory)
+ n_rsync_rpkiManifest = self.count_uris(self.rpkiManifest)
+ n_rsync_signedObjectRepository = self.count_uris(self.signedObjectRepository)
+ if n_rsync_caIssuers > 1 or n_rsync_caDirectory > 1 or n_rsync_rpkiManifest > 1 or n_rsync_signedObjectRepository > 1:
+ status.add(codes.MULTIPLE_RSYNC_URIS_IN_EXTENSION)
+ if self.aia is not None and n_rsync_caIssuers == 0:
+ status.add(codes.MALFORMED_AIA_EXTENSION)
+ if self.is_ca:
+ ok = n_rsync_caDirectory != 0 and n_rsync_rpkiManifest != 0 and n_rsync_signedObjectRepository == 0
+ elif not is_routercert:
+ ok = n_rsync_caDirectory == 0 and n_rsync_rpkiManifest == 0 and n_rsync_signedObjectRepository != 0
+ else:
+ ok = self.sia is None
+ if not ok:
+ status.add(codes.MALFORMED_SIA_EXTENSION)
+ if not is_ta and self.count_uris(self.crldp) == 0:
+ status.add(codes.MALFORMED_CRLDP_EXTENSION)
+ try:
+ self.verify(trusted = [self] if trusted is None else trusted, crl = crl, status = status)
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
-class WalkFrame(object):
- """
- Certificate tree walk stack frame, like rcynic:tos walk_ctx_t.
+class CRL(rpki.POW.CRL):
+
+ @classmethod
+ def derReadURI(cls, uri, generation = None):
+ fn = uri_to_filename(uri, generation.tree)
+ if not os.path.exists(fn):
+ Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND)
+ return None
+ with open(fn, "rb") as f:
+ der = f.read()
+ self = cls.derRead(der)
+ self.uri = uri
+ self.fn = fn
+ self.generation = generation
+ self.sha256 = sha256(der)
+ self.aki = self.getAKI()
+ self.thisUpdate = self.getThisUpdate()
+ self.nextUpdate = self.getNextUpdate()
+ self.number = self.getCRLNumber()
+ return self
+
+ def check(self, issuer):
+ status = Status.update(self.uri, self.generation)
+ try:
+ self.verify(issuer, status)
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
- Try using bound method in place of explicit state enum, see how
- that works.
- Not sure how much of this is really needed, follow rcynic:tos for
- now and prune later.
+class Ghostbuster(rpki.POW.CMS):
+
+ @classmethod
+ def derReadURI(cls, uri, generation):
+ fn = uri_to_filename(uri, generation.tree)
+ if not os.path.exists(fn):
+ Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND)
+ return None
+ with open(fn, "rb") as f:
+ der = f.read()
+ self = cls.derRead(der)
+ self.uri = uri
+ self.fn = fn
+ self.generation = generation
+ self.sha256 = sha256(der)
+ self.ee = X509.derReadURI(uri, generation, self)
+ self.vcard = None
+ return self
+
+ def check(self, trusted = None, crl = None):
+ status = Status.update(self.uri, self.generation)
+ self.ee.check(trusted, crl)
+ try:
+ self.vcard = self.verify()
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+
+class Manifest(rpki.POW.Manifest):
+
+ @classmethod
+ def derReadURI(cls, uri, generation):
+ fn = uri_to_filename(uri, generation.tree)
+ if not os.path.exists(fn):
+ Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND)
+ return None
+ with open(fn, "rb") as f:
+ der = f.read()
+ self = cls.derRead(der)
+ self.uri = uri
+ self.fn = fn
+ self.ee = X509.derReadURI(uri, generation, self)
+ self.fah = None
+ self.generation = generation
+ self.thisUpdate = None
+ self.nextUpdate = None
+ self.number = None
+ return self
+
+ def check(self, trusted = None, crl = None):
+ status = Status.update(self.uri, self.generation)
+ self.ee.check(trusted, crl)
+ try:
+ self.verify()
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ self.thisUpdate = self.getThisUpdate()
+ self.nextUpdate = self.getNextUpdate()
+ self.number = self.getManifestNumber()
+ self.fah = self.getFiles()
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+
+class ROA(rpki.POW.ROA):
+
+ @classmethod
+ def derReadURI(cls, uri, generation):
+ fn = uri_to_filename(uri, generation.tree)
+ if not os.path.exists(fn):
+ Status.update(uri, generation).add(codes.OBJECT_NOT_FOUND)
+ return None
+ with open(fn, "rb") as f:
+ der = f.read()
+ self = cls.derRead(der)
+ self.uri = uri
+ self.fn = fn
+ self.generation = generation
+ self.sha256 = sha256(der)
+ self.ee = X509.derReadURI(uri, generation, self)
+ self.asn = None
+ self.prefixes = None
+ return self
+
+ def check(self, trusted = None, crl = None):
+ status = Status.update(self.uri, self.generation)
+ self.ee.check(trusted, crl)
+ try:
+ vcard = self.verify()
+ except rpki.POW.ValidationError:
+ status.add(codes.OBJECT_REJECTED)
+ self.asn = self.getASID()
+ self.prefixes = self.getPrefixes()
+ codes.normalize(status)
+ return not any(s.kind == "bad" for s in status)
+
+
+class WalkFrame(object):
+ """
+ Certificate tree walk stack frame. This is basically just a
+ preamble and a loop, broken out into several separate methods so
+ that we can fork new tasks in the middle then resume processing of
+ the current state machine (ie, this frame) when appropriate (eg,
+ after an rsync or RRDP fetch completes).
"""
- def __init__(self):
- self.certinfo = CertInfo()
- self.cert = None
- self.manifest = None
- self.manifest_generation = None
- self.filenames = []
- self.manifest_iteration = None
- self.filename_iteration = None
- self.stale_manifest = False
- self.crldp = None
- self.certs = []
- self.crls = []
+ def __init__(self, cer):
+ self.cer = cer
self.state = self.initial
@tornado.gen.coroutine
- def initial(self):
- raise NotImplementedError
+ def __call__(self, wsk):
+ yield self.state(wsk)
@tornado.gen.coroutine
- def fetch(self):
- raise NotImplementedError
+ def initial(self, wsk):
+ self.diruri = first_rsync_uri(self.cer.caDirectory)
- @tornado.gen.coroutine
- def ready(self):
- raise NotImplementedError
+ fetch = Fetcher(self.diruri)
- @tornado.gen.coroutine
- def current(self):
- raise NotImplementedError
+ if fetch.needed():
+
+ # This is where we'd fork another wsk task so we can keep
+ # busy while rsync is running. Defer that until linear
+ # version is working, so for now just wait for rsync.
+
+ yield fetch.fetch()
+
+ self.state = self.ready
@tornado.gen.coroutine
- def backup(self):
- raise NotImplementedError
+ def ready(self, wsk):
+ self.trusted = wsk.trusted()
+ self.generation = Generation.current # XXX Wrong, but it's what walk_tree() was doing, fix later
+
+ self.mfturi = first_rsync_uri(self.cer.rpkiManifest)
+ self.mft = Manifest.derReadURI(self.mfturi, self.generation)
+
+ if self.mft is None:
+ wsk.pop()
+ return
+
+ self.crluri = first_rsync_uri(self.mft.ee.crldp)
+ self.crl = CRL.derReadURI(self.crluri, self.generation)
+
+ if self.crl is None or not self.crl.check(self.cer) or self.mft is None or not self.mft.check(self.trusted, self.crl):
+ wsk.pop()
+ return
+
+ # CRL.check() probably ought to check issuer's caDirectory
+ # against CRL's URI in any case. If we do that, then we know
+ # that crluri.startswith(diruri) and can just simplify the
+ # following loop by setting crlfn = crluri[len(diruri):].
+
+ for name, digest in self.mft.fah:
+ if self.crluri == self.diruri + name:
+ if digest != self.crl.sha256:
+ status.add(codes.DIGEST_MISMATCH)
+ break
+ else:
+ Status.update(self.crluri, self.generation).add(codes.CRL_NOT_IN_MANIFEST)
+
+ # Use an explicit iterator so we can resume it later.
+ # Run the loop in a separate method for the same reason.
+
+ self.mft_iterator = iter(self.mft.getFiles())
+ self.state = self.loop
@tornado.gen.coroutine
- def done(self):
- raise NotImplementedError
+ def loop(self, wsk):
+
+ counter = 0
+ counter_max_before_yield = 50
+
+ for fn, digest in self.mft_iterator:
+
+ counter += 1
+ if counter > counter_max_before_yield:
+ yield tornado.gen.moment
+ counter = 0
+
+ uri = self.diruri + fn
+
+ if uri == self.crluri:
+ continue
+
+ if fn.endswith(".roa"):
+ roa = ROA.derReadURI(uri, self.generation)
+ roa.check() # XXX Do something with result
+ continue
+
+ if fn.endswith(".gbr"):
+ gbr = Ghostbuster.derReadURI(uri, self.generation)
+ gbr.check() # XXX Do something with result
+ continue
+
+ if fn.endswith(".cer"):
+ cer = X509.derReadURI(uri, self.generation)
+ cer.check() # XXX Do something with result
+ if cer.is_ca:
+ wsk.push(cer)
+
+ # XXX Temporary: Need to integrate with FSM
+ # looping, rsync fetching, etc -- this is just a
+ # hack to preserve old walk_tree() behavior
+ # temporarily for testing.
+
+ return
+
+ continue
+
+ Status.update(uri, self.generation).add(codes.UNKNOWN_OBJECT_TYPE_SKIPPED)
+
+ wsk.pop()
class WalkTask(object):
@@ -171,24 +441,34 @@ class WalkTask(object):
STACK_OF(walk_ctx_t) in rcynic:tos.
"""
- def __init__(self):
- self.stack = []
+ def __init__(self, wsk = None):
+ self.wsk = [] if wsk is None else wsk
@tornado.gen.coroutine
def __call__(self):
- yield self.stack[-1].state()
+ while self.wsk:
+ yield self.wsk[-1](wsk = self)
+
+ def push(self, cer):
+ self.wsk.append(WalkFrame(cer))
+
+ def pop(self):
+ return self.wsk.pop()
+
+ def clone(self):
+ return WalkTask(wsk = list(self.wsk))
+
+ def trusted(self):
+ stack = [w.cer for w in self.wsk]
+ stack.reverse()
+ return stack
-# Not sure we need all the rsync_status_t, rsync_state_t, and
-# rsync_ctx_t stuff from rcynic:tos, add later if we do.
# Probably do need some analogue of the rsync_history database, for
# tracking down hosts, URIs already fetched, and (perhaps) limiting
# simultaneous connection attempts to a single host.
-# Analogue to rcynic:tos STACK_OF(task_t): queue of tasks to be
-# invoked by the next available worker. Various fun options here, go
-# with simplest (unbounded simple queue) initially.
task_queue = tornado.queues.Queue()
@@ -206,7 +486,6 @@ def parse_arguments():
raise argparse.ArgumentTypeError("%r is not a positive integer " % s)
return i
-
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("--unauthenticated", type = check_dir, default = "rcynic-data/unauthenticated")
parser.add_argument("--old-authenticated", type = check_dir, default = "rcynic-data/authenticated.old")
@@ -228,10 +507,10 @@ def read_tals():
b64 = "".join(lines[lines.index("\n"):])
key = rpki.POW.Asymmetric.derReadPublic(b64.decode("base64"))
if not uri.endswith(".cer"):
- Status.update(furi, None).add(rpki.POW.validation_status.MALFORMED_TAL_URI)
+ Status.update(furi, None).add(codes.MALFORMED_TAL_URI)
yield uri, key
except:
- Status.update(furi, None).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR)
+ Status.update(furi, None).add(codes.UNREADABLE_TRUST_ANCHOR_LOCATOR)
def uri_to_filename(uri, base = None):
@@ -240,13 +519,11 @@ def uri_to_filename(uri, base = None):
fn = os.path.join(base, fn)
return fn
-def uri_to_basename(uri):
- return uri.rpartition("/")[2]
-
def first_uri(uris, scheme):
- for uri in uris:
- if uri.startswith(scheme):
- return uri
+ if uris is not None:
+ for uri in uris:
+ if uri.startswith(scheme):
+ return uri
return None
def first_rsync_uri(uris):
@@ -258,124 +535,117 @@ def sha256(bytes):
return d.digest()
-@tornado.gen.coroutine
-def walk_tree(cauri, ca, trusted, crl, basedir):
- trusted.insert(0, ca)
-
- sia = ca.getSIA()
- diruri = first_rsync_uri(sia[0])
- mfturi = first_rsync_uri(sia[1])
- try:
- mft = rpki.POW.Manifest.derReadFile(uri_to_filename(mfturi, basedir))
- except rpki.POW.Error as e:
- logger.warn("Couldn't read %s: %s", mfturi, e)
- return
- ee = mft.certs()[0]
- crldp = ee.getCRLDP()
- crluri = first_rsync_uri(crldp)
- try:
- crl = rpki.POW.CRL.derReadFile(uri_to_filename(crluri, basedir))
- except rpki.POW.Error as e:
- logger.warn("Couldn't read %s: %s", crluri, e)
- return
-
- crl_status = Status.update(crluri)
- crl.verify(ca, crl_status)
-
- mft_status = Status.update(mfturi)
- ee.verify(trusted = trusted, crl = crl, status = mft_status)
- mft.verify(status = mft_status)
-
- crl_status.add(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST)
-
- for fn, digest in mft.getFiles():
-
- # XXX There are enough objects in the RIPE repository that we
- # need to let the I/O loop run occasionally. Probably not on
- # every object as this does, but without this it takes about
- # 18 seconds to check the whole RIPE database, which makes
- # APNIC look even slower than they are. Maybe yield every N
- # objects for some tunable value of N, or something like that.
-
- yield tornado.gen.moment
-
- uri = diruri + fn
- status = Status.update(uri)
-
- if uri == crluri:
- if digest != sha256(crl.derWrite()):
- status.add(rpki.POW.validation_status.DIGEST_MISMATCH)
- status.remove(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST)
- continue
-
- with open(os.path.join(uri_to_filename(diruri, basedir), fn), "rb") as f:
- der = f.read()
- if sha256(der) != digest:
- status.add(rpki.POW.validation_status.DIGEST_MISMATCH)
-
- if fn.endswith(".roa"):
- roa = rpki.POW.ROA.derRead(der)
- ee = roa.certs()[0]
- ee.verify(trusted = trusted, crl = crl, status = status)
- roa.verify(status = status)
- continue
-
- if fn.endswith(".gbr"):
- gbr = rpki.POW.CMS.derRead(der)
- ee = gbr.certs()[0]
- ee.verify(trusted = trusted, crl = crl, status = status)
- vcard = gbr.verify(status = status)
- continue
-
- if fn.endswith(".cer"):
- cer = rpki.POW.X509.derRead(der)
- cer.verify(trusted = trusted, crl = crl, status = status)
- is_ca = (cer.getBasicConstraints() or (False, None))[0]
- if is_ca:
- yield walk_tree(diruri + fn, cer, trusted, crl, basedir)
- continue
-
- status.add(rpki.POW.validation_status.UNKNOWN_OBJECT_TYPE_SKIPPED)
+class Fetcher(object):
+ """
+ Network transfer methods and history database.
+ At the moment this is rsync-only; eventually it will include
+ support for HTTPS and RRDP.
+ """
-@tornado.gen.coroutine
-def rsync_ta(uri):
- assert uri.endswith(".cer")
- yield rsync(uri, ())
+ # Internal protocol:
+ #
+ # - Instances which have just gotten to the query stage are not registered
+ #
+ # - Instances which are in progress are listed in .history and
+ # have a Condition object in .pending; instances which depend on
+ # this should wait for the condition, then return.
+ #
+ # - Instances which have completed are listed in .history and have
+ # .pending set to None.
-@tornado.gen.coroutine
-def rsync_sia(uri):
- assert uri.endswith("/")
- yield rsync(uri, ("--recursive", "--delete"))
+ _rsync_deadhosts = set()
+ _rsync_history = dict()
-@tornado.gen.coroutine
-def rsync(uri, cmd_extra):
+ def __init__(self, uri):
+ self.uri = uri
+ self.pending = None
+ self.status = None
+
+ def _rsync_split_uri(self):
+ return tuple(self.uri.rstrip("/").split("/")[2:])
- # This is where we should check our internal history database to
- # see if we've already fetched this URI or something covering it;
- # if so, just return success at this point, we've done all the fetching
- # we're going to do for this URI on this rcynic run.
+ def _rsync_find(self, path):
+ for i in xrange(1, len(path)):
+ target = path[:i+1]
+ try:
+ return self._rsync_history[target]
+ except KeyError:
+ continue
+ return None
+
+ def needed(self):
+ if self.uri.startswith("rsync://"):
+ return self._rsync_needed()
+ raise ValueError
+
+ def _rsync_needed(self):
+ path = self._rsync_split_uri()
+ if path[0] in self._rsync_deadhosts:
+ return False
+ entry = self._rsync_find(path)
+ return entry is None or entry.pending is not None
- cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes", "-4"]
- cmd.extend(cmd_extra)
- cmd.append(uri)
- cmd.append(uri_to_filename(uri, args.unauthenticated))
+ def fetch(self):
+ if self.uri.startswith("rsync://"):
+ return self._rsync_fetch()
+ raise ValueError
- dn = os.path.dirname(cmd[-1])
- if not os.path.exists(dn):
- os.makedirs(dn)
+ @tornado.gen.coroutine
+ def _rsync_fetch(self):
+ assert self.uri.startswith("rsync://") and (self.uri.endswith(".cer") or self.uri.endswith("/"))
- t0 = time.time()
- proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT)
- logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd))
- ret, out = yield [proc.wait_for_exit(raise_error = False), proc.stdout.read_until_close()]
- t1 = time.time()
- for line in out.splitlines():
- logger.debug("rsync[%s] %s", proc.pid, line)
- logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, ret)
+ path = self._rsync_split_uri()
+ if path[0] in self._rsync_deadhosts:
+ return
+ other = self._rsync_find(path)
+ if other is not None:
+ if other.pending is not None:
+ yield other.pending.wait()
+ return
- # Should do something with rsync result and validation status database here.
+ self.pending = tornado.locks.Condition()
+ self._rsync_history[path] = self
+ try:
+ cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes"]
+ if self.uri.endswith("/"):
+ cmd.append("--recursive")
+ cmd.append("--delete")
+ cmd.append(self.uri)
+ cmd.append(uri_to_filename(self.uri, args.unauthenticated))
+
+ dn = os.path.dirname(cmd[-1])
+ if not os.path.exists(dn):
+ os.makedirs(dn)
+
+ # We use the stdout close from rsync to detect when the subprocess has finished.
+ # There's a lovely tornado.process.Subprocess.wait_for_exit() method which does
+ # exactly what one might want -- but Unix signal handling still hasn't caught up
+ # to the software interrupt architecture ITS had forty years ago, so signals still
+ # cause random "system call interrupted" failures in other libraries. Nothing
+ # Tornado can do about this, so we avoid signals entirely and collect the process
+ # exit status directly from the operating system. In theory, the WNOHANG isn't
+ # really necessary, we use it anyway to be safe in case theory is wrong this week.
+
+ t0 = time.time()
+ proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT)
+ logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd))
+ output = yield proc.stdout.read_until_close()
+ pid, self.status = os.waitpid(proc.pid, os.WNOHANG)
+ if (pid, self.status) == (0, 0):
+ logger.warn("rsync[%s] Couldn't get real exit status without blocking, sorry", proc.pid)
+ t1 = time.time()
+ for line in output.splitlines():
+ logger.debug("rsync[%s] %s", proc.pid, line)
+ logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, self.status)
+
+ # Should do something with rsync result and validation status database here.
+
+ finally:
+ pending = self.pending
+ self.pending = None
+ pending.notify_all()
class CheckTALTask(object):
@@ -388,42 +658,33 @@ class CheckTALTask(object):
@tornado.gen.coroutine
def __call__(self):
- yield rsync_ta(self.uri)
+ yield Fetcher(self.uri).fetch()
for generation in (Generation.current, Generation.backup):
- try:
- yield self.check(generation)
-
- # Need to convert this into initialization of a
- # WalkTask object and insertion of that task into the
- # task_queue. Check rcynic:tos to see how it handles
- # the walk_ctx initialization.
-
+ if (yield self.check(generation)):
logger.debug("Starting walk of %s", self.uri)
- yield walk_tree(self.uri, self.cer, [self.cer], None, generation.tree)
-
+ wsk = WalkTask()
+ wsk.push(self.cer)
+ task_queue.put(wsk)
return
- except rpki.POW.Error:
- pass
- Status.update(self.uri, None).add(rpki.POW.validation_status.TRUST_ANCHOR_SKIPPED)
+ else:
+ Status.update(self.uri, None).add(codes.TRUST_ANCHOR_SKIPPED)
@tornado.gen.coroutine
def check(self, generation):
status = Status.update(self.uri, generation)
- try:
- self.cer = rpki.POW.X509.derReadFile(uri_to_filename(self.uri, generation.tree))
- except rpki.POW.OpenSSLError:
- status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR)
- status.add(rpki.POW.validation_status.OBJECT_REJECTED)
- raise rpki.POW.ValidationError
- if self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic():
- status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH)
- status.add(rpki.POW.validation_status.OBJECT_REJECTED)
- raise rpki.POW.ValidationError
- try:
- self.cer.verify(trusted = [self.cer], status = status)
- except rpki.POW.ValidationError:
- status.add(rpki.POW.validation_status.OBJECT_REJECTED)
- raise
+ self.cer = X509.derReadURI(self.uri, generation)
+ ok = False
+ if self.cer is None:
+ status.add(codes.UNREADABLE_TRUST_ANCHOR)
+ status.add(codes.OBJECT_REJECTED)
+ elif self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic():
+ status.add(codes.TRUST_ANCHOR_KEY_MISMATCH)
+ status.add(codes.OBJECT_REJECTED)
+ elif not self.cer.check():
+ status.add(codes.OBJECT_REJECTED)
+ else:
+ ok = True
+ raise tornado.gen.Return(ok)
@tornado.gen.coroutine
@@ -435,13 +696,13 @@ def worker(meself):
while True:
task = yield task_queue.get()
try:
- logger.info("Worker %s starting %s", meself, task)
+ #logger.debug("Worker %s starting %s", meself, task)
yield task()
except:
logger.exception("Worker %s caught unhandled exception from %s", meself, task)
finally:
task_queue.task_done()
- logger.info("Worker %s finished %s", meself, task)
+ #logger.debug("Worker %s finished %s", meself, task)
@tornado.gen.coroutine
@@ -452,6 +713,15 @@ def main():
yield task_queue.join()
+def final_report():
+ # Should be doing something to generate rcynic.xml here.
+
+ if False:
+ for uri in sorted(Status.db):
+ print Status.db[uri]
+ else:
+ print "{} entries in status database".format(len(Status.db))
+
os.putenv("TZ", "UTC")
time.tzset()
@@ -465,8 +735,4 @@ logging.getLogger().setLevel(logging.DEBUG)
tornado.ioloop.IOLoop.current().run_sync(main)
-if False:
- for uri in sorted(Status.db):
- print Status.db[uri]
-
-# Should be doing something to generate rcynic.xml here.
+final_report()