aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-07-25 17:22:21 +0000
committerRob Austein <sra@hactrn.net>2012-07-25 17:22:21 +0000
commitc4f104ae55846310d70aa913885607195489f09b (patch)
tree585491945887936cbc0f96d46ba41d62cc4db402
parentf932750ba2594758c681a771e0dafd34e6b53ae1 (diff)
Clean up SQL cache when it gets big, break up huge batches of ROA
requests to avoid timeouts. See #274. svn path=/trunk/; revision=4607
-rw-r--r--rpkid/rpki/left_right.py49
-rw-r--r--rpkid/rpki/rpkid.py4
-rw-r--r--rpkid/rpki/sql.py14
3 files changed, 56 insertions, 11 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 078b4066..04940ca1 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -36,9 +36,20 @@ import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils
import rpki.http, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa
import rpki.publication, rpki.async
+## @var enforce_strict_up_down_xml_sender
# Enforce strict checking of XML "sender" field in up-down protocol
+
enforce_strict_up_down_xml_sender = False
+## @var max_new_roas_at_once
+# Upper limit on the number of ROAs we'll create in a single
+# self_elt.update_roas() call. This is a bit of a kludge, and may be
+# replaced with something more clever or general later; for the moment
+# the goal is to avoid going totally compute bound when somebody
+# throws 50,000 new ROA requests at us in a single batch.
+
+max_new_roas_at_once = 500
+
class left_right_namespace(object):
"""
XML namespace parameters for left-right protocol.
@@ -361,7 +372,13 @@ class self_elt(data_elt):
def five():
self.gctx.checkpoint()
rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id))
- self.regenerate_crls_and_manifests(cb)
+ self.regenerate_crls_and_manifests(six)
+
+ def six():
+ self.gctx.checkpoint()
+ self.gctx.sql.sweep()
+ self.gctx.sql.cache_clear_maybe()
+ cb()
one()
@@ -667,8 +684,10 @@ class self_elt(data_elt):
publisher = rpki.rpkid.publication_queue()
ca_details = set()
+ stopped_early = False
seen = set()
for roa_request in roa_requests:
+ self.gctx.checkpoint()
try:
k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
if k in seen:
@@ -679,6 +698,10 @@ class self_elt(data_elt):
if roa is None:
roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
+ rpki.log.warn("Too many new ROAs (%d) in a single batch, returning early from update_roas() to avoid timeouts" % publisher.size)
+ stopped_early = True
+ break
else:
rpki.log.debug("Found existing %r" % roa)
roa.update(publisher = publisher, fast = True)
@@ -690,16 +713,20 @@ class self_elt(data_elt):
rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
- orphans.extend(roas.itervalues())
- for roa in orphans:
- try:
- ca_details.add(roa.ca_detail)
- roa.revoke(publisher = publisher, fast = True)
- except (SystemExit, rpki.async.ExitNow):
- raise
- except Exception, e:
- rpki.log.traceback()
- rpki.log.warn("Could not revoke %r: %s" % (roa, e))
+ # Don't flush old ROAs when we didn't finish creating new ones.
+ # We may regret this, but our options in this case are limited.
+
+ if not stopped_early:
+ orphans.extend(roas.itervalues())
+ for roa in orphans:
+ try:
+ ca_details.add(roa.ca_detail)
+ roa.revoke(publisher = publisher, fast = True)
+ except (SystemExit, rpki.async.ExitNow):
+ raise
+ except Exception, e:
+ rpki.log.traceback()
+ rpki.log.warn("Could not revoke %r: %s" % (roa, e))
self.gctx.sql.sweep()
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index 051e7c17..8c275688 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -1873,3 +1873,7 @@ class publication_queue(object):
def loop(iterator, rid):
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
rpki.async.iterator(self.repositories, loop, cb)
+
+ @property
+ def size(self):
+ return sum(len(self.msgs[rid]) for rid in self.repositories)
diff --git a/rpkid/rpki/sql.py b/rpkid/rpki/sql.py
index b6be65b6..14d1e1fb 100644
--- a/rpkid/rpki/sql.py
+++ b/rpkid/rpki/sql.py
@@ -41,6 +41,11 @@ class session(object):
SQL session layer.
"""
+ ## @var clear_threshold
+ # Size above which .cache_clear_maybe() should clear the cache.
+
+ clear_threshold = 5000
+
def __init__(self, cfg):
self.username = cfg.get("sql-username")
@@ -92,8 +97,17 @@ class session(object):
"""
Clear the object cache.
"""
+ rpki.log.debug("Clearing SQL cache")
+ self.assert_pristine()
self.cache.clear()
+ def cache_clear_maybe(self):
+ """
+ Clear the object cache if its size is above clear_threshold.
+ """
+ if len(self.cache) >= self.clear_threshold:
+ self.cache_clear()
+
def assert_pristine(self):
"""
Assert that there are no dirty objects in the cache.