diff options
author | Rob Austein <sra@hactrn.net> | 2012-07-25 17:22:21 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-07-25 17:22:21 +0000 |
commit | c4f104ae55846310d70aa913885607195489f09b (patch) | |
tree | 585491945887936cbc0f96d46ba41d62cc4db402 | |
parent | f932750ba2594758c681a771e0dafd34e6b53ae1 (diff) |
Clean up SQL cache when it gets big, break up huge batches of ROA
requests to avoid timeouts. See #274.
svn path=/trunk/; revision=4607
-rw-r--r-- | rpkid/rpki/left_right.py | 49 | ||||
-rw-r--r-- | rpkid/rpki/rpkid.py | 4 | ||||
-rw-r--r-- | rpkid/rpki/sql.py | 14 |
3 files changed, 56 insertions, 11 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 078b4066..04940ca1 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -36,9 +36,20 @@ import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils import rpki.http, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa import rpki.publication, rpki.async +## @var enforce_strict_up_down_xml_sender # Enforce strict checking of XML "sender" field in up-down protocol + enforce_strict_up_down_xml_sender = False +## @var max_new_roas_at_once +# Upper limit on the number of ROAs we'll create in a single +# self_elt.update_roas() call. This is a bit of a kludge, and may be +# replaced with something more clever or general later; for the moment +# the goal is to avoid going totally compute bound when somebody +# throws 50,000 new ROA requests at us in a single batch. + +max_new_roas_at_once = 500 + class left_right_namespace(object): """ XML namespace parameters for left-right protocol. @@ -361,7 +372,13 @@ class self_elt(data_elt): def five(): self.gctx.checkpoint() rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id)) - self.regenerate_crls_and_manifests(cb) + self.regenerate_crls_and_manifests(six) + + def six(): + self.gctx.checkpoint() + self.gctx.sql.sweep() + self.gctx.sql.cache_clear_maybe() + cb() one() @@ -667,8 +684,10 @@ class self_elt(data_elt): publisher = rpki.rpkid.publication_queue() ca_details = set() + stopped_early = False seen = set() for roa_request in roa_requests: + self.gctx.checkpoint() try: k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) if k in seen: @@ -679,6 +698,10 @@ class self_elt(data_elt): if roa is None: roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) rpki.log.debug("Couldn't find existing ROA, created %r" % roa) + if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: + rpki.log.warn("Too many new ROAs (%d) in a single batch, returning early from update_roas() to avoid timeouts" % publisher.size) + stopped_early = True + break else: rpki.log.debug("Found existing %r" % roa) roa.update(publisher = publisher, fast = True) @@ -690,16 +713,20 @@ class self_elt(data_elt): rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) - orphans.extend(roas.itervalues()) - for roa in orphans: - try: - ca_details.add(roa.ca_detail) - roa.revoke(publisher = publisher, fast = True) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - rpki.log.traceback() - rpki.log.warn("Could not revoke %r: %s" % (roa, e)) + # Don't flush old ROAs when we didn't finish creating new ones. + # We may regret this, but our options in this case are limited. + + if not stopped_early: + orphans.extend(roas.itervalues()) + for roa in orphans: + try: + ca_details.add(roa.ca_detail) + roa.revoke(publisher = publisher, fast = True) + except (SystemExit, rpki.async.ExitNow): + raise + except Exception, e: + rpki.log.traceback() + rpki.log.warn("Could not revoke %r: %s" % (roa, e)) self.gctx.sql.sweep() diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index 051e7c17..8c275688 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -1873,3 +1873,7 @@ class publication_queue(object): def loop(iterator, rid): self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) rpki.async.iterator(self.repositories, loop, cb) + + @property + def size(self): + return sum(len(self.msgs[rid]) for rid in self.repositories) diff --git a/rpkid/rpki/sql.py b/rpkid/rpki/sql.py index b6be65b6..14d1e1fb 100644 --- a/rpkid/rpki/sql.py +++ b/rpkid/rpki/sql.py @@ -41,6 +41,11 @@ class session(object): SQL session layer. """ + ## @var clear_threshold + # Size above which .cache_clear_maybe() should clear the cache. + + clear_threshold = 5000 + def __init__(self, cfg): self.username = cfg.get("sql-username") @@ -92,8 +97,17 @@ class session(object): """ Clear the object cache. """ + rpki.log.debug("Clearing SQL cache") + self.assert_pristine() self.cache.clear() + def cache_clear_maybe(self): + """ + Clear the object cache if its size is above clear_threshold. + """ + if len(self.cache) >= self.clear_threshold: + self.cache_clear() + def assert_pristine(self): """ Assert that there are no dirty objects in the cache. |