diff options
author | Rob Austein <sra@hactrn.net> | 2016-01-23 21:40:16 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2016-01-23 21:40:16 +0000 |
commit | e014b175b48fec8c7033b0bbe036d5b6aa744931 (patch) | |
tree | cfa82220dbb858bac823dbacc5d3a604cd102e90 /rp | |
parent | 14bc1c5e11dac9ccae0cbe182ff050a76a506aa9 (diff) |
Pre-fetch a small number of deltas ahead of what we're currently
loading in RRDP delta processing loop, to keep the pipe full and avoid
sitting idle while processing a long series of deltas.
svn path=/branches/tk705/; revision=6234
Diffstat (limited to 'rp')
-rwxr-xr-x | rp/rcynic/rcynicng | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng index cad5ec45..12adcd34 100755 --- a/rp/rcynic/rcynicng +++ b/rp/rcynic/rcynicng @@ -1032,10 +1032,10 @@ class Fetcher(object): logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri) return - delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) - for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) + deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash"))) + for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta")) - if snapshot is None or snapshot.serial + 1 not in delta_serials: + if snapshot is None or snapshot.serial + 1 not in deltas: if snapshot is not None: logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial) @@ -1071,20 +1071,32 @@ class Fetcher(object): logger.debug("RRDP %s %s deltas (%s--%s)", self.uri, (serial - snapshot.serial), snapshot.serial, serial) - for serial in xrange(snapshot.serial + 1, serial + 1): + deltas = [(serial, deltas[serial][0], deltas[serial][1]) + for serial in xrange(snapshot.serial + 1, serial + 1)] + futures = [] - url, hash = delta_serials[serial] + def start_fetches(): + while deltas and len(futures) < args.fetch_ahead_goal: + serial, url, hash = deltas.pop(0) + logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url) + futures.append(self._rrdp_fetch_url( + url = url, + hash = hash, + tag = "delta", + session_id = session_id, + serial = serial)) - logger.debug("RRDP %s fetching %s serial %s", self.uri, url, serial) + start_fetches() - retrieval, delta = yield self._rrdp_fetch_url( - url = url, - hash = hash, - tag = "delta", - session_id = session_id, - serial = serial) + while futures: + retrieval, delta = yield futures.pop(0) - logger.debug("RRDP %s loading delta %s serial %s", self.uri, url, serial) + start_fetches() + + serial = long(delta.get("serial")) + assert serial == snapshot.serial + 1 + + logger.debug("RRDP %s serial %s loading", self.uri, serial) with transaction.atomic(): snapshot.serial = serial @@ -1108,8 +1120,6 @@ class Fetcher(object): obj.snapshot.add(snapshot) obj.save() - logger.debug("RRDP %s committing delta %s serial %s", self.uri, url, serial) - logger.debug("RRDP %s done processing deltas", self.uri) except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError): @@ -1281,7 +1291,7 @@ def main(): parser = argparse.ArgumentParser(description = __doc__) - parser.add_argument("-c", "--config", help = "override default location of configuration file") + parser.add_argument("-c", "--config", default = "/dev/null") parser.add_argument("--authenticated", default = "rcynic-data/authenticated") parser.add_argument("--unauthenticated", default = "rcynic-data/unauthenticated") @@ -1289,7 +1299,7 @@ def main(): parser.add_argument("--tals", default = "sample-trust-anchors") - parser.add_argument("--workers", type = posint, default = 10) + parser.add_argument("--workers", default = 10, type = posint) parser.add_argument("--no-fetch", action = "store_true") parser.add_argument("--no-spawn-on-fetch", action = "store_true") @@ -1297,6 +1307,8 @@ def main(): parser.add_argument("--prefer-rsync", action = "store_true") + parser.add_argument("--fetch-ahead-goal", default = 5, type = posint) + global args args = parser.parse_args() |