diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-09 21:31:10 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-09 21:31:10 +0000 |
commit | 07d1098ee09b2743f11f2a66294f3288a6a5f2c2 (patch) | |
tree | 2a1fddcab36202c23d2c9da9fdbd961b45c8114c /rpkid/rpki/left_right.py | |
parent | f1ea21697f9ea7deb771df7a3710189f46b1f597 (diff) |
Switch rpki.sql.session.cache to use weak references, so that Python's
garbage collector can free up cache entries we're not using for us.
Rework update_roas() to be a bit more frugal with memory. See #278.
svn path=/branches/tk274/; revision=4626
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r-- | rpkid/rpki/left_right.py | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 33c14d8c..b9a2c43f 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -381,7 +381,6 @@ class self_elt(data_elt): def six(): self.gctx.checkpoint() self.gctx.sql.sweep() - self.gctx.sql.cache_clear_maybe() rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb)) cb() @@ -675,7 +674,12 @@ class self_elt(data_elt): self.gctx.sql.sweep() roas = {} + seen = set() orphans = [] + updates = [] + publisher = rpki.rpkid.publication_queue() + ca_details = set() + for roa in self.roas: k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) if k not in roas: @@ -687,41 +691,54 @@ class self_elt(data_elt): else: orphans.append(roa) - publisher = rpki.rpkid.publication_queue() - ca_details = set() - seen = set() + for roa_request in roa_requests: + rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % ( + len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) + k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) + if k in seen: + rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) + else: + seen.add(k) + roa = roas.pop(k, None) + if roa is None: + roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) + rpki.log.debug("Couldn't find existing ROA, created %r" % roa) + else: + rpki.log.debug("Found existing %r" % roa) + updates.append(roa) - def loop(iterator, roa_request): + orphans.extend(roas.itervalues()) + + roas.clear() # Release references we no longer need, to free up memory + seen.clear() # Why does using "del" here raise SyntaxError?!? + del roa_requests[:] + + def loop(iterator, roa): self.gctx.checkpoint() - rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % ( - len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) + rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % ( + len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache))) try: - k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) - if k in seen: - rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) - else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - rpki.log.debug("Couldn't find existing ROA, created %r" % roa) - else: - rpki.log.debug("Found existing %r" % roa) - roa.update(publisher = publisher, fast = True) - ca_details.add(roa.ca_detail) + roa.update(publisher = publisher, fast = True) + ca_details.add(roa.ca_detail) + self.gctx.sql.sweep() except (SystemExit, rpki.async.ExitNow): raise + except rpki.exceptions.NoCoveringCertForROA: + rpki.log.warn("No covering certificate for %r, skipping" % roa) except Exception, e: - if not isinstance(e, rpki.exceptions.NoCoveringCertForROA): - rpki.log.traceback() + rpki.log.traceback() rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: - self.gctx.sql.sweep() for ca_detail in ca_details: + rpki.log.debug("Generating new CRL for %r" % ca_detail) ca_detail.generate_crl(publisher = publisher) + rpki.log.debug("Generating new manifest for %r" % ca_detail) ca_detail.generate_manifest(publisher = publisher) + rpki.log.debug("Sweeping") self.gctx.sql.sweep() + rpki.log.debug("Done sweeping") self.gctx.checkpoint() + rpki.log.debug("Starting publication") publisher.call_pubd(iterator, publication_failed) else: iterator() @@ -733,7 +750,6 @@ class self_elt(data_elt): cb() def done(): - orphans.extend(roas.itervalues()) for roa in orphans: try: ca_details.add(roa.ca_detail) @@ -751,8 +767,8 @@ class self_elt(data_elt): self.gctx.checkpoint() publisher.call_pubd(cb, publication_failed) - rpki.async.iterator(roa_requests, loop, done) - + rpki.async.iterator(updates, loop, done, pop_list = True) + def roa_requests_failed(e): rpki.log.traceback() rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e)) @@ -909,12 +925,14 @@ class repository_elt(data_elt): def done(r_der): try: + rpki.log.debug("Received response from pubd") r_cms = rpki.publication.cms_msg(DER = r_der) r_msg = r_cms.unwrap(bpki_ta_path) r_cms.check_replay_sql(self) for r_pdu in r_msg: handler = handlers.get(r_pdu.tag, self.default_pubd_handler) if handler: + rpki.log.debug("Calling pubd handler %r" % handler) handler(r_pdu) if len(q_msg) != len(r_msg): raise rpki.exceptions.BadPublicationReply, "Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg) @@ -924,6 +942,7 @@ class repository_elt(data_elt): except Exception, e: errback(e) + rpki.log.debug("Sending request to pubd") rpki.http.client( url = self.peer_contact_uri, msg = q_der, |