aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-09 21:31:10 +0000
committerRob Austein <sra@hactrn.net>2012-08-09 21:31:10 +0000
commit07d1098ee09b2743f11f2a66294f3288a6a5f2c2 (patch)
tree2a1fddcab36202c23d2c9da9fdbd961b45c8114c /rpkid/rpki
parentf1ea21697f9ea7deb771df7a3710189f46b1f597 (diff)
Switch rpki.sql.session.cache to use weak references, so that Python's
garbage collector can free up cache entries we're not using for us. Rework update_roas() to be a bit more frugal with memory. See #278. svn path=/branches/tk274/; revision=4626
Diffstat (limited to 'rpkid/rpki')
-rw-r--r--rpkid/rpki/async.py27
-rw-r--r--rpkid/rpki/left_right.py71
-rw-r--r--rpkid/rpki/log.py2
-rw-r--r--rpkid/rpki/rpkid.py1
-rw-r--r--rpkid/rpki/sql.py24
5 files changed, 77 insertions, 48 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py
index 5eaa34f9..c2a02e4f 100644
--- a/rpkid/rpki/async.py
+++ b/rpkid/rpki/async.py
@@ -40,15 +40,24 @@ class iterator(object):
to continue to the next item in the iteration.
The termination callback receives no arguments.
+
+ Special case for memory constrained cases: if keyword argument
+ pop_list is True, iterable must be a list, which is modified in
+ place, popping items off of it until it's empty.
"""
- def __init__(self, iterable, item_callback, done_callback, unwind_stack = True):
+ def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False):
+ assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list"
self.item_callback = item_callback
self.done_callback = done_callback
self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3]
self.unwind_stack = unwind_stack
+ self.pop_list = pop_list
try:
- self.iterator = iter(iterable)
+ if self.pop_list:
+ self.iterator = iterable
+ else:
+ self.iterator = iter(iterable)
except (ExitNow, SystemExit):
raise
except Exception:
@@ -73,11 +82,17 @@ class iterator(object):
with the next iteration value, call the termination handler if the
iterator signaled StopIteration.
"""
- try:
- self.item_callback(self, self.iterator.next())
- except StopIteration:
- if self.done_callback is not None:
+ if self.pop_list:
+ if self.iterator:
+ self.item_callback(self, self.iterator.pop(0))
+ elif self.done_callback is not None:
self.done_callback()
+ else:
+ try:
+ self.item_callback(self, self.iterator.next())
+ except StopIteration:
+ if self.done_callback is not None:
+ self.done_callback()
class timer(object):
"""
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 33c14d8c..b9a2c43f 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -381,7 +381,6 @@ class self_elt(data_elt):
def six():
self.gctx.checkpoint()
self.gctx.sql.sweep()
- self.gctx.sql.cache_clear_maybe()
rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb))
cb()
@@ -675,7 +674,12 @@ class self_elt(data_elt):
self.gctx.sql.sweep()
roas = {}
+ seen = set()
orphans = []
+ updates = []
+ publisher = rpki.rpkid.publication_queue()
+ ca_details = set()
+
for roa in self.roas:
k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
if k not in roas:
@@ -687,41 +691,54 @@ class self_elt(data_elt):
else:
orphans.append(roa)
- publisher = rpki.rpkid.publication_queue()
- ca_details = set()
- seen = set()
+ for roa_request in roa_requests:
+ rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % (
+ len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
+ k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
+ if k in seen:
+ rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ updates.append(roa)
- def loop(iterator, roa_request):
+ orphans.extend(roas.itervalues())
+
+ roas.clear() # Release references we no longer need, to free up memory
+ seen.clear() # Why does using "del" here raise SyntaxError?!?
+ del roa_requests[:]
+
+ def loop(iterator, roa):
self.gctx.checkpoint()
- rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % (
- len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
+ rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % (
+ len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache)))
try:
- k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
- if k in seen:
- rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
- else:
- rpki.log.debug("Found existing %r" % roa)
- roa.update(publisher = publisher, fast = True)
- ca_details.add(roa.ca_detail)
+ roa.update(publisher = publisher, fast = True)
+ ca_details.add(roa.ca_detail)
+ self.gctx.sql.sweep()
except (SystemExit, rpki.async.ExitNow):
raise
+ except rpki.exceptions.NoCoveringCertForROA:
+ rpki.log.warn("No covering certificate for %r, skipping" % roa)
except Exception, e:
- if not isinstance(e, rpki.exceptions.NoCoveringCertForROA):
- rpki.log.traceback()
+ rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
- self.gctx.sql.sweep()
for ca_detail in ca_details:
+ rpki.log.debug("Generating new CRL for %r" % ca_detail)
ca_detail.generate_crl(publisher = publisher)
+ rpki.log.debug("Generating new manifest for %r" % ca_detail)
ca_detail.generate_manifest(publisher = publisher)
+ rpki.log.debug("Sweeping")
self.gctx.sql.sweep()
+ rpki.log.debug("Done sweeping")
self.gctx.checkpoint()
+ rpki.log.debug("Starting publication")
publisher.call_pubd(iterator, publication_failed)
else:
iterator()
@@ -733,7 +750,6 @@ class self_elt(data_elt):
cb()
def done():
- orphans.extend(roas.itervalues())
for roa in orphans:
try:
ca_details.add(roa.ca_detail)
@@ -751,8 +767,8 @@ class self_elt(data_elt):
self.gctx.checkpoint()
publisher.call_pubd(cb, publication_failed)
- rpki.async.iterator(roa_requests, loop, done)
-
+ rpki.async.iterator(updates, loop, done, pop_list = True)
+
def roa_requests_failed(e):
rpki.log.traceback()
rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e))
@@ -909,12 +925,14 @@ class repository_elt(data_elt):
def done(r_der):
try:
+ rpki.log.debug("Received response from pubd")
r_cms = rpki.publication.cms_msg(DER = r_der)
r_msg = r_cms.unwrap(bpki_ta_path)
r_cms.check_replay_sql(self)
for r_pdu in r_msg:
handler = handlers.get(r_pdu.tag, self.default_pubd_handler)
if handler:
+ rpki.log.debug("Calling pubd handler %r" % handler)
handler(r_pdu)
if len(q_msg) != len(r_msg):
raise rpki.exceptions.BadPublicationReply, "Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg)
@@ -924,6 +942,7 @@ class repository_elt(data_elt):
except Exception, e:
errback(e)
+ rpki.log.debug("Sending request to pubd")
rpki.http.client(
url = self.peer_contact_uri,
msg = q_der,
diff --git a/rpkid/rpki/log.py b/rpkid/rpki/log.py
index 869dbcf0..ee96dcfb 100644
--- a/rpkid/rpki/log.py
+++ b/rpkid/rpki/log.py
@@ -54,7 +54,7 @@ show_python_ids = False
# Whether tracebacks are enabled globally. Individual classes and
# modules may choose to override this.
-enable_tracebacks = False
+enable_tracebacks = True
tag = ""
pid = 0
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index 93726512..ca27dbdc 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -1954,6 +1954,7 @@ class publication_queue(object):
rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
def done():
+ rpki.log.debug("Publication complete")
self.clear()
cb()
rpki.async.iterator(self.repositories, loop, done)
diff --git a/rpkid/rpki/sql.py b/rpkid/rpki/sql.py
index 14d1e1fb..9d420c80 100644
--- a/rpkid/rpki/sql.py
+++ b/rpkid/rpki/sql.py
@@ -32,27 +32,27 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
+import weakref
+
from rpki.mysql_import import (MySQLdb, _mysql_exceptions)
-import rpki.x509, rpki.resource_set, rpki.sundial, rpki.log
+import rpki.x509
+import rpki.resource_set
+import rpki.sundial
+import rpki.log
class session(object):
"""
SQL session layer.
"""
- ## @var clear_threshold
- # Size above which .cache_clear_maybe() should clear the cache.
-
- clear_threshold = 5000
-
def __init__(self, cfg):
self.username = cfg.get("sql-username")
self.database = cfg.get("sql-database")
self.password = cfg.get("sql-password")
- self.cache = {}
+ self.cache = weakref.WeakValueDictionary()
self.dirty = set()
self.connect()
@@ -95,19 +95,13 @@ class session(object):
def cache_clear(self):
"""
- Clear the object cache.
+ Clear the SQL object cache. Shouldn't be necessary now that the
+ cache uses weak references, but should be harmless.
"""
rpki.log.debug("Clearing SQL cache")
self.assert_pristine()
self.cache.clear()
- def cache_clear_maybe(self):
- """
- Clear the object cache if its size is above clear_threshold.
- """
- if len(self.cache) >= self.clear_threshold:
- self.cache_clear()
-
def assert_pristine(self):
"""
Assert that there are no dirty objects in the cache.