aboutsummaryrefslogtreecommitdiff
path: root/rp
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2016-01-23 21:40:16 +0000
committerRob Austein <sra@hactrn.net>2016-01-23 21:40:16 +0000
commite014b175b48fec8c7033b0bbe036d5b6aa744931 (patch)
treecfa82220dbb858bac823dbacc5d3a604cd102e90 /rp
parent14bc1c5e11dac9ccae0cbe182ff050a76a506aa9 (diff)
Pre-fetch a small number of deltas ahead of what we're currently
loading in RRDP delta processing loop, to keep the pipe full and avoid sitting idle while processing a long series of deltas. svn path=/branches/tk705/; revision=6234
Diffstat (limited to 'rp')
-rwxr-xr-xrp/rcynic/rcynicng46
1 files changed, 29 insertions, 17 deletions
diff --git a/rp/rcynic/rcynicng b/rp/rcynic/rcynicng
index cad5ec45..12adcd34 100755
--- a/rp/rcynic/rcynicng
+++ b/rp/rcynic/rcynicng
@@ -1032,10 +1032,10 @@ class Fetcher(object):
logger.debug("RRDP data for %s is up-to-date, nothing to do", self.uri)
return
- delta_serials = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
- for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
+ deltas = dict((long(delta.get("serial")), (delta.get("uri"), delta.get("hash")))
+ for delta in notification.iterchildren(rpki.relaxng.rrdp.xmlns + "delta"))
- if snapshot is None or snapshot.serial + 1 not in delta_serials:
+ if snapshot is None or snapshot.serial + 1 not in deltas:
if snapshot is not None:
logger.debug("RRDP %s no deltas available for serial %s", self.uri, snapshot.serial)
@@ -1071,20 +1071,32 @@ class Fetcher(object):
logger.debug("RRDP %s %s deltas (%s--%s)", self.uri,
(serial - snapshot.serial), snapshot.serial, serial)
- for serial in xrange(snapshot.serial + 1, serial + 1):
+ deltas = [(serial, deltas[serial][0], deltas[serial][1])
+ for serial in xrange(snapshot.serial + 1, serial + 1)]
+ futures = []
- url, hash = delta_serials[serial]
+ def start_fetches():
+ while deltas and len(futures) < args.fetch_ahead_goal:
+ serial, url, hash = deltas.pop(0)
+ logger.debug("RRDP %s serial %s fetching %s", self.uri, serial, url)
+ futures.append(self._rrdp_fetch_url(
+ url = url,
+ hash = hash,
+ tag = "delta",
+ session_id = session_id,
+ serial = serial))
- logger.debug("RRDP %s fetching %s serial %s", self.uri, url, serial)
+ start_fetches()
- retrieval, delta = yield self._rrdp_fetch_url(
- url = url,
- hash = hash,
- tag = "delta",
- session_id = session_id,
- serial = serial)
+ while futures:
+ retrieval, delta = yield futures.pop(0)
- logger.debug("RRDP %s loading delta %s serial %s", self.uri, url, serial)
+ start_fetches()
+
+ serial = long(delta.get("serial"))
+ assert serial == snapshot.serial + 1
+
+ logger.debug("RRDP %s serial %s loading", self.uri, serial)
with transaction.atomic():
snapshot.serial = serial
@@ -1108,8 +1120,6 @@ class Fetcher(object):
obj.snapshot.add(snapshot)
obj.save()
- logger.debug("RRDP %s committing delta %s serial %s", self.uri, url, serial)
-
logger.debug("RRDP %s done processing deltas", self.uri)
except (tornado.httpclient.HTTPError, socket.error, IOError, ssl.SSLError):
@@ -1281,7 +1291,7 @@ def main():
parser = argparse.ArgumentParser(description = __doc__)
- parser.add_argument("-c", "--config", help = "override default location of configuration file")
+ parser.add_argument("-c", "--config", default = "/dev/null")
parser.add_argument("--authenticated", default = "rcynic-data/authenticated")
parser.add_argument("--unauthenticated", default = "rcynic-data/unauthenticated")
@@ -1289,7 +1299,7 @@ def main():
parser.add_argument("--tals", default = "sample-trust-anchors")
- parser.add_argument("--workers", type = posint, default = 10)
+ parser.add_argument("--workers", default = 10, type = posint)
parser.add_argument("--no-fetch", action = "store_true")
parser.add_argument("--no-spawn-on-fetch", action = "store_true")
@@ -1297,6 +1307,8 @@ def main():
parser.add_argument("--prefer-rsync", action = "store_true")
+ parser.add_argument("--fetch-ahead-goal", default = 5, type = posint)
+
global args
args = parser.parse_args()