diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-09 21:31:10 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-09 21:31:10 +0000 |
commit | 07d1098ee09b2743f11f2a66294f3288a6a5f2c2 (patch) | |
tree | 2a1fddcab36202c23d2c9da9fdbd961b45c8114c /rpkid/rpki | |
parent | f1ea21697f9ea7deb771df7a3710189f46b1f597 (diff) |
Switch rpki.sql.session.cache to use weak references, so that Python's
garbage collector can free up cache entries we're not using for us.
Rework update_roas() to be a bit more frugal with memory. See #278.
svn path=/branches/tk274/; revision=4626
Diffstat (limited to 'rpkid/rpki')
-rw-r--r-- | rpkid/rpki/async.py | 27 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 71 | ||||
-rw-r--r-- | rpkid/rpki/log.py | 2 | ||||
-rw-r--r-- | rpkid/rpki/rpkid.py | 1 | ||||
-rw-r--r-- | rpkid/rpki/sql.py | 24 |
5 files changed, 77 insertions, 48 deletions
diff --git a/rpkid/rpki/async.py b/rpkid/rpki/async.py index 5eaa34f9..c2a02e4f 100644 --- a/rpkid/rpki/async.py +++ b/rpkid/rpki/async.py @@ -40,15 +40,24 @@ class iterator(object): to continue to the next item in the iteration. The termination callback receives no arguments. + + Special case for memory constrained cases: if keyword argument + pop_list is True, iterable must be a list, which is modified in + place, popping items off of it until it's empty. """ - def __init__(self, iterable, item_callback, done_callback, unwind_stack = True): + def __init__(self, iterable, item_callback, done_callback, unwind_stack = True, pop_list = False): + assert not pop_list or isinstance(iterable, list), "iterable must be a list when using pop_list" self.item_callback = item_callback self.done_callback = done_callback self.caller_file, self.caller_line, self.caller_function = traceback.extract_stack(limit = 2)[0][0:3] self.unwind_stack = unwind_stack + self.pop_list = pop_list try: - self.iterator = iter(iterable) + if self.pop_list: + self.iterator = iterable + else: + self.iterator = iter(iterable) except (ExitNow, SystemExit): raise except Exception: @@ -73,11 +82,17 @@ class iterator(object): with the next iteration value, call the termination handler if the iterator signaled StopIteration. """ - try: - self.item_callback(self, self.iterator.next()) - except StopIteration: - if self.done_callback is not None: + if self.pop_list: + if self.iterator: + self.item_callback(self, self.iterator.pop(0)) + elif self.done_callback is not None: self.done_callback() + else: + try: + self.item_callback(self, self.iterator.next()) + except StopIteration: + if self.done_callback is not None: + self.done_callback() class timer(object): """ diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 33c14d8c..b9a2c43f 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -381,7 +381,6 @@ class self_elt(data_elt): def six(): self.gctx.checkpoint() self.gctx.sql.sweep() - self.gctx.sql.cache_clear_maybe() rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb)) cb() @@ -675,7 +674,12 @@ class self_elt(data_elt): self.gctx.sql.sweep() roas = {} + seen = set() orphans = [] + updates = [] + publisher = rpki.rpkid.publication_queue() + ca_details = set() + for roa in self.roas: k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: @@ -687,41 +691,54 @@ class self_elt(data_elt): else: orphans.append(roa) - publisher = rpki.rpkid.publication_queue() - ca_details = set() - seen = set() + for roa_request in roa_requests: + rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % ( + len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) + k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) + if k in seen: + rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) + else: + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) + rpki.log.debug("Couldn't find existing ROA, created %r" % roa) + else: + rpki.log.debug("Found existing %r" % roa) + updates.append(roa) - def loop(iterator, roa_request): + orphans.extend(roas.itervalues()) + + roas.clear() # Release references we no longer need, to free up memory + seen.clear() # Why does using "del" here raise SyntaxError?!? + del roa_requests[:] + + def loop(iterator, roa): self.gctx.checkpoint() - rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % ( - len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) + rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % ( + len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache))) try: - k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) - if k in seen: - rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) - else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - rpki.log.debug("Couldn't find existing ROA, created %r" % roa) - else: - rpki.log.debug("Found existing %r" % roa) - roa.update(publisher = publisher, fast = True) - ca_details.add(roa.ca_detail) + roa.update(publisher = publisher, fast = True) + ca_details.add(roa.ca_detail) + self.gctx.sql.sweep() except (SystemExit, rpki.async.ExitNow): raise + except rpki.exceptions.NoCoveringCertForROA: + rpki.log.warn("No covering certificate for %r, skipping" % roa) except Exception, e: - if not isinstance(e, rpki.exceptions.NoCoveringCertForROA): - rpki.log.traceback() + rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: - self.gctx.sql.sweep() for ca_detail in ca_details: + rpki.log.debug("Generating new CRL for %r" % ca_detail) ca_detail.generate_crl(publisher = publisher) + rpki.log.debug("Generating new manifest for %r" % ca_detail) ca_detail.generate_manifest(publisher = publisher) + rpki.log.debug("Sweeping") self.gctx.sql.sweep() + rpki.log.debug("Done sweeping") self.gctx.checkpoint() + rpki.log.debug("Starting publication") publisher.call_pubd(iterator, publication_failed) else: iterator() @@ -733,7 +750,6 @@ class self_elt(data_elt): cb() def done(): - orphans.extend(roas.itervalues()) for roa in orphans: try: ca_details.add(roa.ca_detail) @@ -751,8 +767,8 @@ class self_elt(data_elt): self.gctx.checkpoint() publisher.call_pubd(cb, publication_failed) - rpki.async.iterator(roa_requests, loop, done) - + rpki.async.iterator(updates, loop, done, pop_list = True) + def roa_requests_failed(e): rpki.log.traceback() rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e)) @@ -909,12 +925,14 @@ class repository_elt(data_elt): def done(r_der): try: + rpki.log.debug("Received response from pubd") r_cms = rpki.publication.cms_msg(DER = r_der) r_msg = r_cms.unwrap(bpki_ta_path) r_cms.check_replay_sql(self) for r_pdu in r_msg: handler = handlers.get(r_pdu.tag, self.default_pubd_handler) if handler: + rpki.log.debug("Calling pubd handler %r" % handler) handler(r_pdu) if len(q_msg) != len(r_msg): raise rpki.exceptions.BadPublicationReply, "Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg) @@ -924,6 +942,7 @@ class repository_elt(data_elt): except Exception, e: errback(e) + rpki.log.debug("Sending request to pubd") rpki.http.client( url = self.peer_contact_uri, msg = q_der, diff --git a/rpkid/rpki/log.py b/rpkid/rpki/log.py index 869dbcf0..ee96dcfb 100644 --- a/rpkid/rpki/log.py +++ b/rpkid/rpki/log.py @@ -54,7 +54,7 @@ show_python_ids = False # Whether tracebacks are enabled globally. Individual classes and # modules may choose to override this. -enable_tracebacks = False +enable_tracebacks = True tag = "" pid = 0 diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index 93726512..ca27dbdc 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -1954,6 +1954,7 @@ class publication_queue(object): rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) def done(): + rpki.log.debug("Publication complete") self.clear() cb() rpki.async.iterator(self.repositories, loop, done) diff --git a/rpkid/rpki/sql.py b/rpkid/rpki/sql.py index 14d1e1fb..9d420c80 100644 --- a/rpkid/rpki/sql.py +++ b/rpkid/rpki/sql.py @@ -32,27 +32,27 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ +import weakref + from rpki.mysql_import import (MySQLdb, _mysql_exceptions) -import rpki.x509, rpki.resource_set, rpki.sundial, rpki.log +import rpki.x509 +import rpki.resource_set +import rpki.sundial +import rpki.log class session(object): """ SQL session layer. """ - ## @var clear_threshold - # Size above which .cache_clear_maybe() should clear the cache. - - clear_threshold = 5000 - def __init__(self, cfg): self.username = cfg.get("sql-username") self.database = cfg.get("sql-database") self.password = cfg.get("sql-password") - self.cache = {} + self.cache = weakref.WeakValueDictionary() self.dirty = set() self.connect() @@ -95,19 +95,13 @@ class session(object): def cache_clear(self): """ - Clear the object cache. + Clear the SQL object cache. Shouldn't be necessary now that the + cache uses weak references, but should be harmless. """ rpki.log.debug("Clearing SQL cache") self.assert_pristine() self.cache.clear() - def cache_clear_maybe(self): - """ - Clear the object cache if its size is above clear_threshold. - """ - if len(self.cache) >= self.clear_threshold: - self.cache_clear() - def assert_pristine(self): """ Assert that there are no dirty objects in the cache. |