aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-11-24 03:17:15 +0000
committerRob Austein <sra@hactrn.net>2015-11-24 03:17:15 +0000
commit8a1603f06cc7f8d0693127ca9e381f2469dd55f1 (patch)
tree730d83fa65143702b63ef47ce50ee68074021753
parent4035533a429100b6c92c8b7bb482d340db3f8343 (diff)
Add stack forking. At this point the basic tasking structure looks complete.
svn path=/branches/tk705/; revision=6191
-rwxr-xr-xrp/rcynic/rcynicng125
1 files changed, 61 insertions, 64 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index 4ac9d18e..59c9b71a 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -341,30 +341,41 @@ class WalkFrame(object):
self.cer = cer
self.state = self.initial
+ def __repr__(self):
+ try:
+ return "<WalkFrame \"{}\" at 0x{:x}".format(self.cer.uri, id(self))
+ except:
+ return "<WalkFrame at 0x{:x}>".format(id(self))
+
@tornado.gen.coroutine
def __call__(self, wsk):
yield self.state(wsk)
@tornado.gen.coroutine
def initial(self, wsk):
- self.diruri = first_rsync_uri(self.cer.caDirectory)
-
- fetch = Fetcher(self.diruri)
-
- if fetch.needed():
-
- # This is where we'd fork another wsk task so we can keep
- # busy while rsync is running. Defer that until linear
- # version is working, so for now just wait for rsync.
-
- yield fetch.fetch()
+ self.diruri = first_rsync_uri(self.cer.caDirectory)
+ self.fetcher = Fetcher(self.diruri)
+
+ if not self.fetcher.needed():
+ self.state = self.ready
+ elif args.spawn_on_fetch:
+ self.state = self.fetch
+ task_queue.put(wsk.clone())
+ wsk.pop()
+ else:
+ self.state = self.fetch
+ @tornado.gen.coroutine
+ def fetch(self, wsk):
+ yield self.fetcher.fetch()
self.state = self.ready
@tornado.gen.coroutine
def ready(self, wsk):
self.trusted = wsk.trusted()
+ logger.debug("%r scanning products", self)
+
mft_uri = first_rsync_uri(self.cer.rpkiManifest)
crl_candidates = []
@@ -503,6 +514,12 @@ class WalkTask(object):
def __init__(self, wsk = None):
self.wsk = [] if wsk is None else wsk
+ def __repr__(self):
+ try:
+ return "<WalkTask \"{}\" at 0x{:x}".format(self.wsk[-1].cer.uri, id(self))
+ except:
+ return "<WalkTask at 0x{:x}>".format(id(self))
+
@tornado.gen.coroutine
def __call__(self):
while self.wsk:
@@ -523,15 +540,6 @@ class WalkTask(object):
return stack
-
-# Probably do need some analogue of the rsync_history database, for
-# tracking down hosts, URIs already fetched, and (perhaps) limiting
-# simultaneous connection attempts to a single host.
-
-
-task_queue = tornado.queues.Queue()
-
-
def parse_arguments():
def check_dir(s):
@@ -553,6 +561,7 @@ def parse_arguments():
parser.add_argument("--workers", type = posint, default = 10)
parser.add_argument("--no-fetch", action = "store_true")
parser.add_argument("--xml-file", type = argparse.FileType("w"), default = "rcynicng.xml")
+ parser.add_argument("--spawn-on-fetch", action = "store_true")
return parser.parse_args()
@@ -763,13 +772,31 @@ def worker(meself):
while True:
task = yield task_queue.get()
try:
- #logger.debug("Worker %s starting %s", meself, task)
+ logger.debug("Worker %s starting %s, queue length %s", meself, task, task_queue.qsize())
yield task()
except:
logger.exception("Worker %s caught unhandled exception from %s", meself, task)
finally:
task_queue.task_done()
- #logger.debug("Worker %s finished %s", meself, task)
+ logger.debug("Worker %s finished %s, queue length %s", meself, task, task_queue.qsize())
+
+
+def final_report():
+ doc = Element("rcynic-summary") # rcynic-version = "", summary-version = "", reporting-hostname = ""
+ labels = SubElement(doc, "labels")
+ for code in codes.all():
+ SubElement(labels, code.name).text = code.text
+ for uri, generation in Status.db:
+ for sym in sorted(Status.db[uri, generation].status):
+ SubElement(doc, "validation_status",
+ timestamp = str(Status.db[uri, generation].timestamp),
+ status = str(sym),
+ generation = str(generation)
+ ).text = uri
+ #
+ # Should generate <rsync_history/> elements here too, later
+ #
+ ElementTree(doc).write(file = args.xml_file, pretty_print = True)
@tornado.gen.coroutine
@@ -778,45 +805,15 @@ def main():
tornado.ioloop.IOLoop.current().spawn_callback(worker, i)
yield [task_queue.put(CheckTALTask(uri, key)) for uri, key in read_tals()]
yield task_queue.join()
-
-
-def final_report():
- # Should be doing something to generate rcynic.xml here.
-
- if True:
- # rcynic-version = "", summary-version="", reporting-hostname=""
- doc = Element("rcynic-summary")
- labels = SubElement(doc, "labels")
- for code in codes.all():
- SubElement(labels, code.name).text = code.text
- for uri in Status.db:
- status = Status.db[uri]
- ts = str(status.timestamp)
- gen = str(status.generation)
- uri = status.uri
- for sym in sorted(status.status):
- SubElement(doc, "validation_status", timestamp = ts, status = str(sym), generation = gen).text = uri
- #
- # Should generate <rsync_history/> elements here too, later
- #
- ElementTree(doc).write(file = args.xml_file, pretty_print = True)
- elif False:
- for uri in sorted(Status.db):
- print Status.db[uri]
- else:
- print "{} entries in status database".format(len(Status.db))
-
-os.putenv("TZ", "UTC")
-time.tzset()
-
-args = parse_arguments()
-
-Generation("current", args.unauthenticated)
-Generation("backup", args.old_authenticated)
-
-# Put this under config/argparse control later, for now I want to see everything
-logging.getLogger().setLevel(logging.DEBUG)
-
-tornado.ioloop.IOLoop.current().run_sync(main)
-
-final_report()
+ final_report()
+
+
+if __name__ == "__main__":
+ os.putenv("TZ", "UTC")
+ time.tzset()
+ task_queue = tornado.queues.Queue()
+ args = parse_arguments()
+ logging.basicConfig(level = logging.DEBUG, format = "%(asctime)s %(message)s", datefmt = "%Y-%m-%d %H:%M:%S")
+ Generation("current", args.unauthenticated)
+ Generation("backup", args.old_authenticated)
+ tornado.ioloop.IOLoop.current().run_sync(main)