aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/left_right.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-09 21:31:10 +0000
committerRob Austein <sra@hactrn.net>2012-08-09 21:31:10 +0000
commit07d1098ee09b2743f11f2a66294f3288a6a5f2c2 (patch)
tree2a1fddcab36202c23d2c9da9fdbd961b45c8114c /rpkid/rpki/left_right.py
parentf1ea21697f9ea7deb771df7a3710189f46b1f597 (diff)
Switch rpki.sql.session.cache to use weak references, so that Python's
garbage collector can free up cache entries we're not using for us. Rework update_roas() to be a bit more frugal with memory. See #278. svn path=/branches/tk274/; revision=4626
Diffstat (limited to 'rpkid/rpki/left_right.py')
-rw-r--r--rpkid/rpki/left_right.py71
1 files changed, 45 insertions, 26 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index 33c14d8c..b9a2c43f 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -381,7 +381,6 @@ class self_elt(data_elt):
def six():
self.gctx.checkpoint()
self.gctx.sql.sweep()
- self.gctx.sql.cache_clear_maybe()
rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb))
cb()
@@ -675,7 +674,12 @@ class self_elt(data_elt):
self.gctx.sql.sweep()
roas = {}
+ seen = set()
orphans = []
+ updates = []
+ publisher = rpki.rpkid.publication_queue()
+ ca_details = set()
+
for roa in self.roas:
k = (roa.asn, str(roa.ipv4), str(roa.ipv6))
if k not in roas:
@@ -687,41 +691,54 @@ class self_elt(data_elt):
else:
orphans.append(roa)
- publisher = rpki.rpkid.publication_queue()
- ca_details = set()
- seen = set()
+ for roa_request in roa_requests:
+ rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % (
+ len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
+ k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
+ if k in seen:
+ rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
+ else:
+ seen.add(k)
+ roa = roas.pop(k, None)
+ if roa is None:
+ roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
+ rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
+ else:
+ rpki.log.debug("Found existing %r" % roa)
+ updates.append(roa)
- def loop(iterator, roa_request):
+ orphans.extend(roas.itervalues())
+
+ roas.clear() # Release references we no longer need, to free up memory
+ seen.clear() # Why does using "del" here raise SyntaxError?!?
+ del roa_requests[:]
+
+ def loop(iterator, roa):
self.gctx.checkpoint()
- rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % (
- len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
+ rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % (
+ len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache)))
try:
- k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
- if k in seen:
- rpki.log.warn("Skipping duplicate ROA request %r" % roa_request)
- else:
- seen.add(k)
- roa = roas.pop(k, None)
- if roa is None:
- roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6)
- rpki.log.debug("Couldn't find existing ROA, created %r" % roa)
- else:
- rpki.log.debug("Found existing %r" % roa)
- roa.update(publisher = publisher, fast = True)
- ca_details.add(roa.ca_detail)
+ roa.update(publisher = publisher, fast = True)
+ ca_details.add(roa.ca_detail)
+ self.gctx.sql.sweep()
except (SystemExit, rpki.async.ExitNow):
raise
+ except rpki.exceptions.NoCoveringCertForROA:
+ rpki.log.warn("No covering certificate for %r, skipping" % roa)
except Exception, e:
- if not isinstance(e, rpki.exceptions.NoCoveringCertForROA):
- rpki.log.traceback()
+ rpki.log.traceback()
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
- self.gctx.sql.sweep()
for ca_detail in ca_details:
+ rpki.log.debug("Generating new CRL for %r" % ca_detail)
ca_detail.generate_crl(publisher = publisher)
+ rpki.log.debug("Generating new manifest for %r" % ca_detail)
ca_detail.generate_manifest(publisher = publisher)
+ rpki.log.debug("Sweeping")
self.gctx.sql.sweep()
+ rpki.log.debug("Done sweeping")
self.gctx.checkpoint()
+ rpki.log.debug("Starting publication")
publisher.call_pubd(iterator, publication_failed)
else:
iterator()
@@ -733,7 +750,6 @@ class self_elt(data_elt):
cb()
def done():
- orphans.extend(roas.itervalues())
for roa in orphans:
try:
ca_details.add(roa.ca_detail)
@@ -751,8 +767,8 @@ class self_elt(data_elt):
self.gctx.checkpoint()
publisher.call_pubd(cb, publication_failed)
- rpki.async.iterator(roa_requests, loop, done)
-
+ rpki.async.iterator(updates, loop, done, pop_list = True)
+
def roa_requests_failed(e):
rpki.log.traceback()
rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e))
@@ -909,12 +925,14 @@ class repository_elt(data_elt):
def done(r_der):
try:
+ rpki.log.debug("Received response from pubd")
r_cms = rpki.publication.cms_msg(DER = r_der)
r_msg = r_cms.unwrap(bpki_ta_path)
r_cms.check_replay_sql(self)
for r_pdu in r_msg:
handler = handlers.get(r_pdu.tag, self.default_pubd_handler)
if handler:
+ rpki.log.debug("Calling pubd handler %r" % handler)
handler(r_pdu)
if len(q_msg) != len(r_msg):
raise rpki.exceptions.BadPublicationReply, "Wrong number of response PDUs from pubd: sent %r, got %r" % (q_msg, r_msg)
@@ -924,6 +942,7 @@ class repository_elt(data_elt):
except Exception, e:
errback(e)
+ rpki.log.debug("Sending request to pubd")
rpki.http.client(
url = self.peer_contact_uri,
msg = q_der,