aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-11-20 08:17:23 +0000
committerRob Austein <sra@hactrn.net>2015-11-20 08:17:23 +0000
commit17d892bb1939a5c7421cf69119858b471a5c5539 (patch)
treeb2e217d09144134c42416baecf95ff5524e48a7a
parent63f731051c9ff62346ec2464bce59ca483c50a2d (diff)
Checkpoint. Not useful yet, but starting to come together. Tornado is awesome.
svn path=/branches/tk705/; revision=6182
-rwxr-xr-xrp/rcynic/rcynicng337
1 files changed, 300 insertions, 37 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index d6aaf56e..1c7d6de8 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -15,31 +15,73 @@ RP code. Gotta start somewhere.
import os
import sys
import time
+import logging
import argparse
+import subprocess
+
+import tornado.gen
+import tornado.ioloop
+import tornado.queues
+import tornado.process
import rpki.POW
from lxml.etree import ElementTree, Element, SubElement, Comment
+logger = logging.getLogger("rcynicng")
+
+
+class Generation(object):
+
+ all = []
+
+ def __init__(self, name, tree):
+ self.name = name
+ self.tree = tree
+ self.all.append(self)
+ self.pos = len(self.all)
+ setattr(self.__class__, name, self)
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def __cmp__(self, other):
+ return cmp(self.pos, 0 if other is None else other.pos)
+
+ def __str__(self):
+ return self.name
+
+
class Status(object):
"""
Validation status database, like validation_status_t in rcynic:tos.
+
+ rcynic:tos version of this data structure is stored as an AVL
+ tree, because the OpenSSL STACK_OF() sort-and-bsearch turned out
+ to be a very poor choice for the input data. Remains to be seen
+ whether we need to do something like that here too.
"""
db = dict()
- def __init__(self, uri, generation = None):
- assert generation in ("current", "backup", None)
+ def __init__(self, uri, generation):
+ assert generation is None or isinstance(generation, Generation)
self.uri = uri
- self.generation = generation
- self.timestamp = None
+ self._generation = generation
+ self._timestamp = None
self.status = set()
def __str__(self):
- return "{time} {self.uri} {status} {self.generation}".format(
- self = self,
- time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.timestamp)),
- status = ",".join(str(s) for s in sorted(self.status)))
+ return "{self.timestamp} {self.uri} {status} {self.generation}".format(
+ self = self, status = ",".join(str(s) for s in sorted(self.status)))
+
+ @property
+ def timestamp(self):
+ return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self._timestamp))
+
+ @property
+ def generation(self):
+ return str(self._generation)
@classmethod
def update(cls, uri, generation = None):
@@ -48,10 +90,109 @@ class Status(object):
self = cls.db[key]
except KeyError:
self = cls.db[key] = cls(uri, generation)
- self.timestamp = time.time()
+ self._timestamp = time.time()
return self.status
+class CertInfo(object):
+ """
+ Certificate info, like rcynic:tos certinfo_t. Not sure we really
+ need this, but follow rcynic:tos model for now, clean up later.
+ """
+
+ def __init__(self, **kwargs):
+ bools = ("ca", "ta")
+ nones = ("generation", "uri", "caIssuers", "crldp",
+ "caDirectory", "rpkiManifest", "signedObjectRepository", "rpkiNotify")
+ for name in bools:
+ setattr(self, name, False)
+ for name in nones:
+ setattr(self, name, None)
+ for name in kwargs:
+ if name in bools + nones:
+ setattr(self, name, kwargs[name])
+ else:
+ raise ValueError
+
+
+class WalkFrame(object):
+ """
+ Certificate tree walk stack frame, like rcynic:tos walk_ctx_t.
+
+ Try using bound method in place of explicit state enum, see how
+ that works.
+
+ Not sure how much of this is really needed, follow rcynic:tos for
+ now and prune later.
+ """
+
+ def __init__(self):
+ self.certinfo = CertInfo()
+ self.cert = None
+ self.manifest = None
+ self.manifest_generation = None
+ self.filenames = []
+ self.manifest_iteration = None
+ self.filename_iteration = None
+ self.stale_manifest = False
+ self.crldp = None
+ self.certs = []
+ self.crls = []
+ self.state = self.initial
+
+ @tornado.gen.coroutine
+ def initial(self):
+ raise NotImplementedError
+
+ @tornado.gen.coroutine
+ def fetch(self):
+ raise NotImplementedError
+
+ @tornado.gen.coroutine
+ def ready(self):
+ raise NotImplementedError
+
+ @tornado.gen.coroutine
+ def current(self):
+ raise NotImplementedError
+
+ @tornado.gen.coroutine
+ def backup(self):
+ raise NotImplementedError
+
+ @tornado.gen.coroutine
+ def done(self):
+ raise NotImplementedError
+
+
+class WalkTask(object):
+ """
+ Task corresponding to one walk stack, roughly analgous to
+ STACK_OF(walk_ctx_t) in rcynic:tos.
+ """
+
+ def __init__(self):
+ self.stack = []
+
+ @tornado.gen.coroutine
+ def __call__(self):
+ yield self.stack[-1].state()
+
+
+# Not sure we need all the rsync_status_t, rsync_state_t, and
+# rsync_ctx_t stuff from rcynic:tos, add later if we do.
+
+# Probably do need some analogue of the rsync_history database, for
+# tracking down hosts, URIs already fetched, and (perhaps) limiting
+# simultaneous connection attempts to a single host.
+
+# Analogue to rcynic:tos STACK_OF(task_t): queue of tasks to be
+# invoked by the next available worker. Various fun options here, go
+# with simplest (unbounded simple queue) initially.
+
+task_queue = tornado.queues.Queue()
+
+
def parse_arguments():
def check_dir(s):
@@ -59,11 +200,19 @@ def parse_arguments():
raise argparse.ArgumentTypeError("%r is not a directory" % s)
return s
+ def posint(s):
+ i = int(s)
+ if i <= 0:
+ raise argparse.ArgumentTypeError("%r is not a positive integer " % s)
+ return i
+
+
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("--unauthenticated", type = check_dir, default = "rcynic-data/unauthenticated")
parser.add_argument("--old-authenticated", type = check_dir, default = "rcynic-data/authenticated.old")
parser.add_argument("--tals", type = check_dir, default = "sample-trust-anchors")
parser.add_argument("--output", default = "rcynic-data/rcynicng-output")
+ parser.add_argument("--workers", type = posint, default = 10)
return parser.parse_args()
@@ -79,10 +228,10 @@ def read_tals():
b64 = "".join(lines[lines.index("\n"):])
key = rpki.POW.Asymmetric.derReadPublic(b64.decode("base64"))
if not uri.endswith(".cer"):
- Status.update(furi).add(rpki.POW.validation_status.MALFORMED_TAL_URI)
+ Status.update(furi, None).add(rpki.POW.validation_status.MALFORMED_TAL_URI)
yield uri, key
except:
- Status.update(furi).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR)
+ Status.update(furi, None).add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR_LOCATOR)
def uri_to_filename(uri, base = None):
@@ -109,6 +258,7 @@ def sha256(bytes):
return d.digest()
+@tornado.gen.coroutine
def walk_tree(cauri, ca, trusted, crl, basedir):
trusted.insert(0, ca)
@@ -118,7 +268,7 @@ def walk_tree(cauri, ca, trusted, crl, basedir):
try:
mft = rpki.POW.Manifest.derReadFile(uri_to_filename(mfturi, basedir))
except rpki.POW.Error as e:
- print mfturi, e
+ logger.warn("Couldn't read %s: %s", mfturi, e)
return
ee = mft.certs()[0]
crldp = ee.getCRLDP()
@@ -126,7 +276,7 @@ def walk_tree(cauri, ca, trusted, crl, basedir):
try:
crl = rpki.POW.CRL.derReadFile(uri_to_filename(crluri, basedir))
except rpki.POW.Error as e:
- print crluri, e
+ logger.warn("Couldn't read %s: %s", crluri, e)
return
crl_status = Status.update(crluri)
@@ -139,6 +289,16 @@ def walk_tree(cauri, ca, trusted, crl, basedir):
crl_status.add(rpki.POW.validation_status.CRL_NOT_IN_MANIFEST)
for fn, digest in mft.getFiles():
+
+ # XXX There are enough objects in the RIPE repository that we
+ # need to let the I/O loop run occasionally. Probably not on
+ # every object as this does, but without this it takes about
+ # 18 seconds to check the whole RIPE database, which makes
+ # APNIC look even slower than they are. Maybe yield every N
+ # objects for some tunable value of N, or something like that.
+
+ yield tornado.gen.moment
+
uri = diruri + fn
status = Status.update(uri)
@@ -172,38 +332,141 @@ def walk_tree(cauri, ca, trusted, crl, basedir):
cer.verify(trusted = trusted, crl = crl, status = status)
is_ca = (cer.getBasicConstraints() or (False, None))[0]
if is_ca:
- walk_tree(diruri + fn, cer, trusted, crl, basedir)
+ yield walk_tree(diruri + fn, cer, trusted, crl, basedir)
continue
status.add(rpki.POW.validation_status.UNKNOWN_OBJECT_TYPE_SKIPPED)
+@tornado.gen.coroutine
+def rsync_ta(uri):
+ assert uri.endswith(".cer")
+ yield rsync(uri, ())
+
+@tornado.gen.coroutine
+def rsync_sia(uri):
+ assert uri.endswith("/")
+ yield rsync(uri, ("--recursive", "--delete"))
+
+@tornado.gen.coroutine
+def rsync(uri, cmd_extra):
+
+ # This is where we should check our internal history database to
+ # see if we've already fetched this URI or something covering it;
+ # if so, just return success at this point, we've done all the fetching
+ # we're going to do for this URI on this rcynic run.
+
+ cmd = ["rsync", "--update", "--times", "--copy-links", "--itemize-changes", "-4"]
+ cmd.extend(cmd_extra)
+ cmd.append(uri)
+ cmd.append(uri_to_filename(uri, args.unauthenticated))
+
+ dn = os.path.dirname(cmd[-1])
+ if not os.path.exists(dn):
+ os.makedirs(dn)
+
+ t0 = time.time()
+ proc = tornado.process.Subprocess(cmd, stdout = tornado.process.Subprocess.STREAM, stderr = subprocess.STDOUT)
+ logger.debug("rsync[%s] started \"%s\"", proc.pid, " ".join(cmd))
+ ret, out = yield [proc.wait_for_exit(raise_error = False), proc.stdout.read_until_close()]
+ t1 = time.time()
+ for line in out.splitlines():
+ logger.debug("rsync[%s] %s", proc.pid, line)
+ logger.debug("rsync[%s] finished after %s seconds with status 0x%x", proc.pid, t1 - t0, ret)
+
+ # Should do something with rsync result and validation status database here.
+
+
+class CheckTALTask(object):
+
+ def __init__(self, uri, key):
+ self.uri = uri
+ self.key = key
+
+ def __repr__(self):
+ return "<CheckTALTask: \"{}\">".format(self.uri)
+
+ @tornado.gen.coroutine
+ def __call__(self):
+ yield rsync_ta(self.uri)
+ for generation in (Generation.current, Generation.backup):
+ try:
+ yield self.check(generation)
+
+ # Need to convert this into initialization of a
+ # WalkTask object and insertion of that task into the
+ # task_queue. Check rcynic:tos to see how it handles
+ # the walk_ctx initialization.
+
+ logger.debug("Starting walk of %s", self.uri)
+ yield walk_tree(self.uri, self.cer, [self.cer], None, generation.tree)
+
+ return
+ except rpki.POW.Error:
+ pass
+ Status.update(self.uri, None).add(rpki.POW.validation_status.TRUST_ANCHOR_SKIPPED)
+
+ @tornado.gen.coroutine
+ def check(self, generation):
+ status = Status.update(self.uri, generation)
+ try:
+ self.cer = rpki.POW.X509.derReadFile(uri_to_filename(self.uri, generation.tree))
+ except rpki.POW.OpenSSLError:
+ status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR)
+ status.add(rpki.POW.validation_status.OBJECT_REJECTED)
+ raise rpki.POW.ValidationError
+ if self.key.derWritePublic() != self.cer.getPublicKey().derWritePublic():
+ status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH)
+ status.add(rpki.POW.validation_status.OBJECT_REJECTED)
+ raise rpki.POW.ValidationError
+ try:
+ self.cer.verify(trusted = [self.cer], status = status)
+ except rpki.POW.ValidationError:
+ status.add(rpki.POW.validation_status.OBJECT_REJECTED)
+ raise
+
+
+@tornado.gen.coroutine
+def worker(meself):
+ #
+ # NB: This particular style of control loop REQUIRES an except
+ # clause, even if that except clause is just a pass statement.
+ #
+ while True:
+ task = yield task_queue.get()
+ try:
+ logger.info("Worker %s starting %s", meself, task)
+ yield task()
+ except:
+ logger.exception("Worker %s caught unhandled exception from %s", meself, task)
+ finally:
+ task_queue.task_done()
+ logger.info("Worker %s finished %s", meself, task)
+
+
+@tornado.gen.coroutine
+def main():
+ for i in xrange(args.workers):
+ tornado.ioloop.IOLoop.current().spawn_callback(worker, i)
+ yield [task_queue.put(CheckTALTask(uri, key)) for uri, key in read_tals()]
+ yield task_queue.join()
+
+
os.putenv("TZ", "UTC")
time.tzset()
args = parse_arguments()
-basedir = args.unauthenticated
+Generation("current", args.unauthenticated)
+Generation("backup", args.old_authenticated)
-for uri, key in read_tals():
- status = Status.update(uri)
- status.add(rpki.POW.validation_status.OBJECT_REJECTED)
- try:
- cer = rpki.POW.X509.derReadFile(uri_to_filename(uri, basedir))
- except rpki.POW.OpenSSLError:
- status.add(rpki.POW.validation_status.UNREADABLE_TRUST_ANCHOR)
- continue
- if key.derWritePublic() != cer.getPublicKey().derWritePublic():
- status.add(rpki.POW.validation_status.TRUST_ANCHOR_KEY_MISMATCH)
- continue
- trusted = [cer]
- try:
- cer.verify(trusted = trusted, status = status)
- except:
- continue
- else:
- status.remove(rpki.POW.validation_status.OBJECT_REJECTED)
- walk_tree(uri, cer, trusted, None, basedir)
-
-for uri in sorted(Status.db):
- print Status.db[uri]
+# Put this under config/argparse control later, for now I want to see everything
+logging.getLogger().setLevel(logging.DEBUG)
+
+tornado.ioloop.IOLoop.current().run_sync(main)
+
+if False:
+ for uri in sorted(Status.db):
+ print Status.db[uri]
+
+# Should be doing something to generate rcynic.xml here.