diff options
author | Rob Austein <sra@hactrn.net> | 2015-11-24 03:17:15 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-11-24 03:17:15 +0000 |
commit | 8a1603f06cc7f8d0693127ca9e381f2469dd55f1 (patch) | |
tree | 730d83fa65143702b63ef47ce50ee68074021753 | |
parent | 4035533a429100b6c92c8b7bb482d340db3f8343 (diff) |
Add stack forking. At this point the basic tasking structure looks complete.
svn path=/branches/tk705/; revision=6191
-rwxr-xr-x | rp/rcynic/rcynicng | 125 |
1 files changed, 61 insertions, 64 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index 4ac9d18e..59c9b71a 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -341,30 +341,41 @@ class WalkFrame(object): self.cer = cer self.state = self.initial + def __repr__(self): + try: + return "<WalkFrame \"{}\" at 0x{:x}".format(self.cer.uri, id(self)) + except: + return "<WalkFrame at 0x{:x}>".format(id(self)) + @tornado.gen.coroutine def __call__(self, wsk): yield self.state(wsk) @tornado.gen.coroutine def initial(self, wsk): - self.diruri = first_rsync_uri(self.cer.caDirectory) - - fetch = Fetcher(self.diruri) - - if fetch.needed(): - - # This is where we'd fork another wsk task so we can keep - # busy while rsync is running. Defer that until linear - # version is working, so for now just wait for rsync. - - yield fetch.fetch() + self.diruri = first_rsync_uri(self.cer.caDirectory) + self.fetcher = Fetcher(self.diruri) + + if not self.fetcher.needed(): + self.state = self.ready + elif args.spawn_on_fetch: + self.state = self.fetch + task_queue.put(wsk.clone()) + wsk.pop() + else: + self.state = self.fetch + @tornado.gen.coroutine + def fetch(self, wsk): + yield self.fetcher.fetch() self.state = self.ready @tornado.gen.coroutine def ready(self, wsk): self.trusted = wsk.trusted() + logger.debug("%r scanning products", self) + mft_uri = first_rsync_uri(self.cer.rpkiManifest) crl_candidates = [] @@ -503,6 +514,12 @@ class WalkTask(object): def __init__(self, wsk = None): self.wsk = [] if wsk is None else wsk + def __repr__(self): + try: + return "<WalkTask \"{}\" at 0x{:x}".format(self.wsk[-1].cer.uri, id(self)) + except: + return "<WalkTask at 0x{:x}>".format(id(self)) + @tornado.gen.coroutine def __call__(self): while self.wsk: @@ -523,15 +540,6 @@ class WalkTask(object): return stack - -# Probably do need some analogue of the rsync_history database, for -# tracking down hosts, URIs already fetched, and (perhaps) limiting -# simultaneous connection attempts to a single host. - - -task_queue = tornado.queues.Queue() - - def parse_arguments(): def check_dir(s): @@ -553,6 +561,7 @@ def parse_arguments(): parser.add_argument("--workers", type = posint, default = 10) parser.add_argument("--no-fetch", action = "store_true") parser.add_argument("--xml-file", type = argparse.FileType("w"), default = "rcynicng.xml") + parser.add_argument("--spawn-on-fetch", action = "store_true") return parser.parse_args() @@ -763,13 +772,31 @@ def worker(meself): while True: task = yield task_queue.get() try: - #logger.debug("Worker %s starting %s", meself, task) + logger.debug("Worker %s starting %s, queue length %s", meself, task, task_queue.qsize()) yield task() except: logger.exception("Worker %s caught unhandled exception from %s", meself, task) finally: task_queue.task_done() - #logger.debug("Worker %s finished %s", meself, task) + logger.debug("Worker %s finished %s, queue length %s", meself, task, task_queue.qsize()) + + +def final_report(): + doc = Element("rcynic-summary") # rcynic-version = "", summary-version = "", reporting-hostname = "" + labels = SubElement(doc, "labels") + for code in codes.all(): + SubElement(labels, code.name).text = code.text + for uri, generation in Status.db: + for sym in sorted(Status.db[uri, generation].status): + SubElement(doc, "validation_status", + timestamp = str(Status.db[uri, generation].timestamp), + status = str(sym), + generation = str(generation) + ).text = uri + # + # Should generate <rsync_history/> elements here too, later + # + ElementTree(doc).write(file = args.xml_file, pretty_print = True) @tornado.gen.coroutine @@ -778,45 +805,15 @@ def main(): tornado.ioloop.IOLoop.current().spawn_callback(worker, i) yield [task_queue.put(CheckTALTask(uri, key)) for uri, key in read_tals()] yield task_queue.join() - - -def final_report(): - # Should be doing something to generate rcynic.xml here. - - if True: - # rcynic-version = "", summary-version="", reporting-hostname="" - doc = Element("rcynic-summary") - labels = SubElement(doc, "labels") - for code in codes.all(): - SubElement(labels, code.name).text = code.text - for uri in Status.db: - status = Status.db[uri] - ts = str(status.timestamp) - gen = str(status.generation) - uri = status.uri - for sym in sorted(status.status): - SubElement(doc, "validation_status", timestamp = ts, status = str(sym), generation = gen).text = uri - # - # Should generate <rsync_history/> elements here too, later - # - ElementTree(doc).write(file = args.xml_file, pretty_print = True) - elif False: - for uri in sorted(Status.db): - print Status.db[uri] - else: - print "{} entries in status database".format(len(Status.db)) - -os.putenv("TZ", "UTC") -time.tzset() - -args = parse_arguments() - -Generation("current", args.unauthenticated) -Generation("backup", args.old_authenticated) - -# Put this under config/argparse control later, for now I want to see everything -logging.getLogger().setLevel(logging.DEBUG) - -tornado.ioloop.IOLoop.current().run_sync(main) - -final_report() + final_report() + + +if __name__ == "__main__": + os.putenv("TZ", "UTC") + time.tzset() + task_queue = tornado.queues.Queue() + args = parse_arguments() + logging.basicConfig(level = logging.DEBUG, format = "%(asctime)s %(message)s", datefmt = "%Y-%m-%d %H:%M:%S") + Generation("current", args.unauthenticated) + Generation("backup", args.old_authenticated) + tornado.ioloop.IOLoop.current().run_sync(main) |