diff options
-rw-r--r-- | rpkid/rpki/left_right.py | 462 | ||||
-rw-r--r-- | rpkid/rpki/rpkid.py | 83 | ||||
-rw-r--r-- | rpkid/rpki/rpkid_tasks.py | 84 | ||||
-rw-r--r-- | rpkid/tests/yamltest.py | 2 |
4 files changed, 107 insertions, 524 deletions
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index 6a176c15..c6089db4 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -34,22 +34,13 @@ PERFORMANCE OF THIS SOFTWARE. import rpki.resource_set, rpki.x509, rpki.sql, rpki.exceptions, rpki.xml_utils import rpki.http, rpki.up_down, rpki.relaxng, rpki.sundial, rpki.log, rpki.roa -import rpki.publication, rpki.async +import rpki.publication, rpki.async, rpki.rpkid_tasks ## @var enforce_strict_up_down_xml_sender # Enforce strict checking of XML "sender" field in up-down protocol enforce_strict_up_down_xml_sender = False -## @var max_new_roas_at_once -# Upper limit on the number of ROAs we'll create in a single -# self_elt.update_roas() call. This is a bit of a kludge, and may be -# replaced with something more clever or general later; for the moment -# the goal is to avoid going totally compute bound when somebody -# throws 50,000 new ROA requests at us in a single batch. - -max_new_roas_at_once = 200 - class left_right_namespace(object): """ XML namespace parameters for left-right protocol. @@ -159,6 +150,7 @@ class self_elt(data_elt): regen_margin = None bpki_cert = None bpki_glue = None + cron_tasks = None @property def bscs(self): @@ -326,10 +318,10 @@ class self_elt(data_elt): """ rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % ( self.self_handle, self.self_id)) - if self.gctx.task_add(self.cron, cb): - self.gctx.task_run() - else: - cb() + completion = rpki.rpkid_tasks.CompletionHandler(cb) + self.schedule_cron_tasks(completion) + assert completion.count > 0 + self.gctx.task_run() def serve_fetch_one_maybe(self): """ @@ -353,440 +345,22 @@ class self_elt(data_elt): """ return self.sql_fetch_all(self.gctx) - def cron(self, cb): + def schedule_cron_tasks(self, completion): """ - Periodic tasks. + Schedule periodic tasks. """ - def one(): - self.gctx.checkpoint() - rpki.log.debug("Self %s[%d] polling parents" % (self.self_handle, self.self_id)) - self.client_poll(two) - - def two(): - self.gctx.checkpoint() - rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id)) - self.update_children(three) - - def three(): - self.gctx.checkpoint() - rpki.log.debug("Self %s[%d] updating ROAs" % (self.self_handle, self.self_id)) - self.update_roas(four) - - def four(): - self.gctx.checkpoint() - rpki.log.debug("Self %s[%d] updating Ghostbuster records" % (self.self_handle, self.self_id)) - self.update_ghostbusters(five) - - def five(): - self.gctx.checkpoint() - rpki.log.debug("Self %s[%d] regenerating CRLs and manifests" % (self.self_handle, self.self_id)) - self.regenerate_crls_and_manifests(six) - - def six(): - self.gctx.checkpoint() - self.gctx.sql.sweep() - rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb)) - cb() - - one() - - - def client_poll(self, callback): - """ - Run the regular client poll cycle with each of this self's parents - in turn. - """ - - rpki.log.trace() - - def parent_loop(parent_iterator, parent): - - def got_list(r_msg): - ca_map = dict((ca.parent_resource_class, ca) for ca in parent.cas) - self.gctx.checkpoint() - - def class_loop(class_iterator, rc): - - def class_update_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't update class, skipping: %s" % e) - class_iterator() - - def class_create_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't create class, skipping: %s" % e) - class_iterator() - - self.gctx.checkpoint() - if rc.class_name in ca_map: - ca = ca_map[rc.class_name] - del ca_map[rc.class_name] - ca.check_for_updates(parent, rc, class_iterator, class_update_failed) - else: - rpki.rpkid.ca_obj.create(parent, rc, class_iterator, class_create_failed) - - def class_done(): - - def ca_loop(iterator, ca): - self.gctx.checkpoint() - ca.delete(parent, iterator) - - def ca_done(): - self.gctx.checkpoint() - self.gctx.sql.sweep() - parent_iterator() - - rpki.async.iterator(ca_map.values(), ca_loop, ca_done) - - rpki.async.iterator(r_msg.payload.classes, class_loop, class_done) - - def list_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't get resource class list from parent %r, skipping: %s (%r)" % (parent, e, e)) - parent_iterator() - - rpki.up_down.list_pdu.query(parent, got_list, list_failed) - - rpki.async.iterator(self.parents, parent_loop, callback) - - - def update_children(self, cb): - """ - Check for updated IRDB data for all of this self's children and - issue new certs as necessary. Must handle changes both in - resources and in expiration date. - """ - - rpki.log.trace() - now = rpki.sundial.now() - rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin) - publisher = rpki.rpkid.publication_queue() - - def loop(iterator, child): - - def lose(e): - rpki.log.traceback() - rpki.log.warn("Couldn't update child %r, skipping: %s" % (child, e)) - iterator() - - def got_resources(irdb_resources): - try: - for child_cert in child_certs: - ca_detail = child_cert.ca_detail - ca = ca_detail.ca - if ca_detail.state == "active": - old_resources = child_cert.cert.get_3779resources() - new_resources = irdb_resources.intersection(old_resources).intersection(ca_detail.latest_ca_cert.get_3779resources()) - - if new_resources.empty(): - rpki.log.debug("Resources shrank to the null set, revoking and withdrawing child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI())) - child_cert.revoke(publisher = publisher) - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - - elif old_resources != new_resources or (old_resources.valid_until < rsn and irdb_resources.valid_until > now): - rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI())) - child_cert.reissue( - ca_detail = ca_detail, - resources = new_resources, - publisher = publisher) - - elif old_resources.valid_until < now: - rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" - % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) - child_cert.sql_delete() - publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository) - ca_detail.generate_manifest(publisher = publisher) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - self.gctx.checkpoint() - lose(e) - else: - self.gctx.checkpoint() - self.gctx.sql.sweep() - iterator() - - self.gctx.checkpoint() - self.gctx.sql.sweep() - child_certs = child.child_certs - if child_certs: - self.gctx.irdb_query_child_resources(child.self.self_handle, child.child_handle, got_resources, lose) - else: - iterator() - - def done(): - def lose(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) - self.gctx.checkpoint() - cb() - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(cb, lose) + if self.cron_tasks is None: + self.cron_tasks = ( + rpki.rpkid_tasks.PollParentTask(self), + rpki.rpkid_tasks.UpdateChildrenTask(self), + rpki.rpkid_tasks.UpdateROAsTask(self), + rpki.rpkid_tasks.UpdateGhostbustersTask(self), + rpki.rpkid_tasks.RegnerateCRLsAndManifestsTask(self)) - rpki.async.iterator(self.children, loop, done) - - - def regenerate_crls_and_manifests(self, cb): - """ - Generate new CRLs and manifests as necessary for all of this - self's CAs. Extracting nextUpdate from a manifest is hard at the - moment due to implementation silliness, so for now we generate a - new manifest whenever we generate a new CRL - - This method also cleans up tombstones left behind by revoked - ca_detail objects, since we're walking through the relevant - portions of the database anyway. - """ - - rpki.log.trace() - now = rpki.sundial.now() - regen_margin = rpki.sundial.timedelta(seconds = self.regen_margin) - publisher = rpki.rpkid.publication_queue() - - for parent in self.parents: - for ca in parent.cas: - try: - for ca_detail in ca.revoked_ca_details: - if now > ca_detail.latest_crl.getNextUpdate(): - ca_detail.delete(ca = ca, publisher = publisher) - ca_detail = ca.active_ca_detail - if ca_detail is not None and now + regen_margin > ca_detail.latest_crl.getNextUpdate(): - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - rpki.log.traceback() - rpki.log.warn("Couldn't regenerate CRLs and manifests for CA %r, skipping: %s" % (ca, e)) - - def lose(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish updated CRLs and manifests for self %r, skipping: %s" % (self.self_handle, e)) - self.gctx.checkpoint() - cb() - - self.gctx.checkpoint() - self.gctx.sql.sweep() - publisher.call_pubd(cb, lose) - - - def update_ghostbusters(self, cb): - """ - Generate or update Ghostbuster records for this self. - - This is heavily based on .update_roas(), and probably both of them - need refactoring. - """ - - parents = dict((p.parent_handle, p) for p in self.parents) - - def got_ghostbuster_requests(ghostbuster_requests): - - try: - self.gctx.checkpoint() - if self.gctx.sql.dirty: - rpki.log.warn("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - - ghostbusters = {} - orphans = [] - for ghostbuster in self.ghostbusters: - k = (ghostbuster.ca_detail_id, ghostbuster.vcard) - if ghostbuster.ca_detail.state != "active" or k in ghostbusters: - orphans.append(ghostbuster) - else: - ghostbusters[k] = ghostbuster - - publisher = rpki.rpkid.publication_queue() - ca_details = set() - - seen = set() - for ghostbuster_request in ghostbuster_requests: - if ghostbuster_request.parent_handle not in parents: - rpki.log.warn("Unknown parent_handle %r in Ghostbuster request, skipping" % ghostbuster_request.parent_handle) - continue - k = (ghostbuster_request.parent_handle, ghostbuster_request.vcard) - if k in seen: - rpki.log.warn("Skipping duplicate Ghostbuster request %r" % ghostbuster_request) - continue - seen.add(k) - for ca in parents[ghostbuster_request.parent_handle].cas: - ca_detail = ca.active_ca_detail - if ca_detail is not None: - ghostbuster = ghostbusters.pop((ca_detail.ca_detail_id, ghostbuster_request.vcard), None) - if ghostbuster is None: - ghostbuster = rpki.rpkid.ghostbuster_obj(self.gctx, self.self_id, ca_detail.ca_detail_id, ghostbuster_request.vcard) - rpki.log.debug("Created new Ghostbuster request for %r" % ghostbuster_request.parent_handle) - else: - rpki.log.debug("Found existing Ghostbuster request for %r" % ghostbuster_request.parent_handle) - ghostbuster.update(publisher = publisher, fast = True) - ca_details.add(ca_detail) - - orphans.extend(ghostbusters.itervalues()) - for ghostbuster in orphans: - ca_details.add(ghostbuster.ca_detail) - ghostbuster.revoke(publisher = publisher, fast = True) - - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - - self.gctx.sql.sweep() - - def publication_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish Ghostbuster updates for %s, skipping: %s" % (self.self_handle, e)) - self.gctx.checkpoint() - cb() - - self.gctx.checkpoint() - publisher.call_pubd(cb, publication_failed) - - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - rpki.log.traceback() - rpki.log.warn("Could not update Ghostbuster records for %s, skipping: %s" % (self.self_handle, e)) - cb() - - def ghostbuster_requests_failed(e): - rpki.log.traceback() - rpki.log.warn("Could not fetch Ghostbuster record requests for %s, skipping: %s" % (self.self_handle, e)) - cb() - - self.gctx.checkpoint() - self.gctx.sql.sweep() - self.gctx.irdb_query_ghostbuster_requests(self.self_handle, parents.iterkeys(), - got_ghostbuster_requests, ghostbuster_requests_failed) - - - def update_roas(self, cb): - """ - Generate or update ROAs for this self. - """ - - def got_roa_requests(roa_requests): - - rpki.log.debug("Received response to query for ROA requests") - - self.gctx.checkpoint() - if self.gctx.sql.dirty: - rpki.log.warn("Unexpected dirty SQL cache, flushing") - self.gctx.sql.sweep() - - roas = {} - seen = set() - orphans = [] - updates = [] - publisher = rpki.rpkid.publication_queue() - ca_details = set() - - for roa in self.roas: - k = (roa.asn, str(roa.ipv4), str(roa.ipv6)) - if k not in roas: - roas[k] = roa - elif (roa.roa is not None and roa.cert is not None and roa.ca_detail is not None and roa.ca_detail.state == "active" and - (roas[k].roa is None or roas[k].cert is None or roas[k].ca_detail is None or roas[k].ca_detail.state != "active")): - orphans.append(roas[k]) - roas[k] = roa - else: - orphans.append(roa) - - for roa_request in roa_requests: - rpki.log.debug("++ roa_requests %s roas %s orphans %s updates %s publisher.size %s ca_details %s seen %s cache %s" % ( - len(roa_requests), len(roas), len(orphans), len(updates), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) - k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) - if k in seen: - rpki.log.warn("Skipping duplicate ROA request %r" % roa_request) - else: - seen.add(k) - roa = roas.pop(k, None) - if roa is None: - roa = rpki.rpkid.roa_obj(self.gctx, self.self_id, roa_request.asn, roa_request.ipv4, roa_request.ipv6) - rpki.log.debug("Couldn't find existing ROA, created %r" % roa) - else: - rpki.log.debug("Found existing %r" % roa) - updates.append(roa) - - orphans.extend(roas.itervalues()) - - roas.clear() # Release references we no longer need, to free up memory - seen.clear() # Why does using "del" here raise SyntaxError?!? - del roa_requests[:] - - def loop(iterator, roa): - self.gctx.checkpoint() - rpki.log.debug("++ updates %s orphans %s publisher.size %s ca_details %s cache %s" % ( - len(updates), len(orphans), publisher.size, len(ca_details), len(self.gctx.sql.cache))) - try: - roa.update(publisher = publisher, fast = True) - ca_details.add(roa.ca_detail) - self.gctx.sql.sweep() - except (SystemExit, rpki.async.ExitNow): - raise - except rpki.exceptions.NoCoveringCertForROA: - rpki.log.warn("No covering certificate for %r, skipping" % roa) - except Exception, e: - rpki.log.traceback() - rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) - if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: - for ca_detail in ca_details: - rpki.log.debug("Generating new CRL for %r" % ca_detail) - ca_detail.generate_crl(publisher = publisher) - rpki.log.debug("Generating new manifest for %r" % ca_detail) - ca_detail.generate_manifest(publisher = publisher) - rpki.log.debug("Sweeping") - self.gctx.sql.sweep() - rpki.log.debug("Done sweeping") - self.gctx.checkpoint() - rpki.log.debug("Starting publication") - publisher.call_pubd(iterator, publication_failed) - else: - iterator() - - def publication_failed(e): - rpki.log.traceback() - rpki.log.warn("Couldn't publish for %s, skipping: %s" % (self.self_handle, e)) - self.gctx.checkpoint() - cb() - - def done(): - for roa in orphans: - try: - ca_details.add(roa.ca_detail) - roa.revoke(publisher = publisher, fast = True) - except (SystemExit, rpki.async.ExitNow): - raise - except Exception, e: - rpki.log.traceback() - rpki.log.warn("Could not revoke %r: %s" % (roa, e)) - self.gctx.sql.sweep() - self.gctx.checkpoint() - if publisher.size > 0: - for ca_detail in ca_details: - ca_detail.generate_crl(publisher = publisher) - ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() - self.gctx.checkpoint() - publisher.call_pubd(cb, publication_failed) - else: - cb() - - rpki.async.iterator(updates, loop, done, pop_list = True) - - def roa_requests_failed(e): - rpki.log.traceback() - rpki.log.warn("Could not fetch ROA requests for %s, skipping: %s" % (self.self_handle, e)) - cb() - - self.gctx.checkpoint() - self.gctx.sql.sweep() - rpki.log.debug("Issuing query for ROA requests") - self.gctx.irdb_query_roa_requests(self.self_handle, got_roa_requests, roa_requests_failed) + for task in self.cron_tasks: + self.gctx.task_add(task) + completion.register(task) class bsc_elt(data_elt): diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index 80b0b7cb..962bedaa 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -57,6 +57,7 @@ import rpki.relaxng import rpki.log import rpki.async import rpki.daemonize +import rpki.rpkid_tasks class main(object): """ @@ -323,19 +324,18 @@ class main(object): if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive - def task_add(self, handler, cb = None, description = None): + def task_add(self, task): """ Add a task to the scheduler task queue, unless it's already queued. """ - t = task(self, handler, cb, description) - rpki.log.debug("New task %r" % t) + rpki.log.debug("New task %r" % task) rpki.log.debug("Task queue %r" % self.task_queue) - if t not in self.task_queue: - rpki.log.debug("Adding %r to task queue" % t) - self.task_queue.append(t) + if task not in self.task_queue: + rpki.log.debug("Adding %r to task queue" % task) + self.task_queue.append(task) return True else: - rpki.log.debug("Task %r was already in the task queue" % t) + rpki.log.debug("Task %r was already in the task queue" % task) return False def task_next(self): @@ -343,20 +343,14 @@ class main(object): Pull next task from the task queue and put it the deferred event queue (we don't want to run it directly, as that could eventually blow out our call stack). - - Not yet sure what to do here if task queue is empty. For now we - just return, on the theory that we've nothing left to do until the - next cron timer fires. This may be a bad assumption, revisit if - the rest of the code doesn't fit well with this assumption. """ - rpki.log.debug("Looking for next task") try: self.task_current = self.task_queue.pop(0) except IndexError: self.task_current = None else: rpki.log.debug("Pulled %r from task queue" % self.task_current) - rpki.async.defer(self.task_current.run) + rpki.async.defer(self.task_current) def task_run(self): """ @@ -365,15 +359,6 @@ class main(object): if self.task_current is None: self.task_next() - def cron_done(self, cb): - """ - Completion handler for timer-driven cron. - """ - self.sql.sweep() - self.cron_timeout = None - rpki.log.info("Finished cron run") - cb() - def cron(self, cb = None): """ Periodic tasks. @@ -385,9 +370,17 @@ class main(object): rpki.log.debug("Starting cron run") + def done(): + self.sql.sweep() + self.cron_timeout = None + rpki.log.info("Finished cron run started at %s" % now) + if cb is not None: + cb() + + completion = rpki.rpkid_tasks.CompletionHandler(done) for s in rpki.left_right.self_elt.sql_fetch_all(self): - self.task_add(s.cron) - self.task_add(self.cron_done, cb) + s.schedule_cron_tasks(completion) + nothing_queued = completion.count == 0 assert self.use_internal_cron or self.cron_timeout is None @@ -407,6 +400,8 @@ class main(object): elif self.use_internal_cron: rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) + if nothing_queued: + done() def cronjob_handler(self, query, path, cb): """ @@ -424,44 +419,6 @@ class main(object): rpki.log.debug("Starting externally triggered cron") self.cron(done) -class task(object): - """ - Scheduler task object. - """ - - def __init__(self, gctx, handler, cb = None, description = None): - self.gctx = gctx - self.handler = handler - self.cb = cb - self.description = description - - def __cmp__(self, other): - return cmp(self.handler, other.handler) - - def __hash__(self): - return self.handler.__hash__() - - def __repr__(self): - if self.description is None: - return repr(self.handler) - else: - return "<%r: %s>" % (self.handler, self.description) - - def done(self): - """ - Completion handler for task. - """ - if self.cb is not None: - self.cb() - self.gctx.task_next() - - def run(self): - """ - Run this task when called via the deferred event system. - """ - rpki.log.debug("Running task %r" % self) - self.handler(self.done) - class ca_obj(rpki.sql.sql_persistent): """ Internal CA object. diff --git a/rpkid/rpki/rpkid_tasks.py b/rpkid/rpki/rpkid_tasks.py index c1f2cd7b..62d0f474 100644 --- a/rpkid/rpki/rpkid_tasks.py +++ b/rpkid/rpki/rpkid_tasks.py @@ -19,6 +19,23 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ +import rpki.log +import rpki.rpkid +import rpki.async +import rpki.up_down +import rpki.sundial +import rpki.publication +import rpki.exceptions + +## @var max_new_roas_at_once +# Upper limit on the number of ROAs we'll create in a single +# self_elt.update_roas() call. This is a bit of a kludge, and may be +# replaced with something more clever or general later; for the moment +# the goal is to avoid going totally compute bound when somebody +# throws 50,000 new ROA requests at us in a single batch. + +max_new_roas_at_once = 200 + class CompletionHandler(object): """ Track one or more scheduled rpkid tasks and execute a callback when @@ -30,17 +47,26 @@ class CompletionHandler(object): self.tasks = set() def register(self, task): + rpki.log.debug("Completion handler %r registering task %r" % (self, task)) self.tasks.add(task) - task.register_completion(self.complete) + task.register_completion(self.done) - def complete(self, task): + def done(self, task): try: self.tasks.remove(task) except KeyError: rpki.log.warn("Completion handler %r called with unregistered task %r, blundering onwards" % (self, task)) + else: + rpki.log.debug("Completion handler %r called with registered task %r" % (self, task)) if not self.tasks: + rpki.log.debug("Completion handler %r finished, calling %r" % (self, self.cb)) self.cb() + @property + def count(self): + return len(self.tasks) + + class AbstractTask(object): """ Abstract base class for rpkid scheduler task objects. This just @@ -57,6 +83,7 @@ class AbstractTask(object): self.description = description self.completions = [] self.continuation = None + self.clear() def __repr__(self): return rpki.log.log_repr(self, self.description) @@ -65,8 +92,9 @@ class AbstractTask(object): self.completions.append(completion) def exit(self): - for completion in self.completions: - completion() + while self.completions: + self.completions.pop(0)(self) + self.clear() self.self.gctx.task_next() def postpone(self, continuation): @@ -77,6 +105,7 @@ class AbstractTask(object): def __call__(self): if self.continuation is None: rpki.log.debug("Running task %r" % self) + self.clear() self.start() else: rpki.log.debug("Restarting task %r at " % (self, self.continuation)) @@ -90,6 +119,9 @@ class AbstractTask(object): def start(self): raise NotImplementedError + def clear(self): + pass + class PollParentTask(AbstractTask): """ @@ -97,6 +129,12 @@ class PollParentTask(AbstractTask): parents, in turn. """ + def clear(self): + self.parent_iterator = None + self.parent = None + self.ca_map = None + self.class_iterator = None + def start(self): rpki.log.trace() self.gctx.checkpoint() @@ -144,7 +182,7 @@ class PollParentTask(AbstractTask): def ca_loop(self, iterator, ca): self.gctx.checkpoint() - ca.delete(parent, iterator) + ca.delete(self.parent, iterator) def ca_done(self): self.gctx.checkpoint() @@ -159,13 +197,20 @@ class UpdateChildrenTask(AbstractTask): resources and in expiration date. """ + def clear(self): + self.now = None + self.rsn = None + self.publisher = None + self.iterator = None + self.child = None + self.child_certs = None + def start(self): rpki.log.trace() self.gctx.checkpoint() rpki.log.debug("Self %s[%d] updating children" % (self.self_handle, self.self_id)) - self.now = rpki.sundial.now() - self.rsn = now + rpki.sundial.timedelta(seconds = self.regen_margin) + self.rsn = self.now + rpki.sundial.timedelta(seconds = self.regen_margin) self.publisher = rpki.rpkid.publication_queue() rpki.async.iterator(self.children, self.loop, self.done) @@ -201,7 +246,7 @@ class UpdateChildrenTask(AbstractTask): ca_detail.generate_manifest(publisher = self.publisher) elif old_resources != new_resources or (old_resources.valid_until < self.rsn and irdb_resources.valid_until > self.now): - rpki.log.debug("Need to reissue child %s certificate SKI %s" % (child.child_handle, child_cert.cert.gSKI())) + rpki.log.debug("Need to reissue child %s certificate SKI %s" % (self.child.child_handle, child_cert.cert.gSKI())) child_cert.reissue( ca_detail = ca_detail, resources = new_resources, @@ -209,7 +254,7 @@ class UpdateChildrenTask(AbstractTask): elif old_resources.valid_until < self.now: rpki.log.debug("Child %s certificate SKI %s has expired: cert.valid_until %s, irdb.valid_until %s" - % (child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) + % (self.child.child_handle, child_cert.cert.gSKI(), old_resources.valid_until, irdb_resources.valid_until)) child_cert.sql_delete() self.publisher.withdraw(cls = rpki.publication.certificate_elt, uri = child_cert.uri, obj = child_cert.cert, repository = ca.parent.repository) ca_detail.generate_manifest(publisher = self.publisher) @@ -218,16 +263,16 @@ class UpdateChildrenTask(AbstractTask): raise except Exception, e: self.gctx.checkpoint() - lose(e) + self.lose(e) else: self.gctx.checkpoint() self.gctx.sql.sweep() self.iterator() - def done(): + def done(self): self.gctx.checkpoint() self.gctx.sql.sweep() - publisher.call_pubd(self.exit, self.publication_failed) + self.publisher.call_pubd(self.exit, self.publication_failed) def publication_failed(self, e): rpki.log.traceback() @@ -241,6 +286,13 @@ class UpdateROAsTask(AbstractTask): Generate or update ROAs for this self. """ + def clear(self): + self.orphans = None + self.updates = None + self.publisher = None + self.ca_details = None + self.count = None + def start(self): rpki.log.trace() self.gctx.checkpoint() @@ -320,7 +372,7 @@ class UpdateROAsTask(AbstractTask): iterator() def publish(self, done): - if publisher.size > 0: + if self.publisher.size > 0: for ca_detail in self.ca_details: rpki.log.debug("Generating new CRL for %r" % ca_detail) ca_detail.generate_crl(publisher = self.publisher) @@ -341,10 +393,10 @@ class UpdateROAsTask(AbstractTask): self.exit() def done(self): - for roa in orphans: + for roa in self.orphans: try: - ca_details.add(roa.ca_detail) - roa.revoke(publisher = publisher, fast = True) + self.ca_details.add(roa.ca_detail) + roa.revoke(publisher = self.publisher, fast = True) except (SystemExit, rpki.async.ExitNow): raise except Exception, e: diff --git a/rpkid/tests/yamltest.py b/rpkid/tests/yamltest.py index 3039f545..6d4a6381 100644 --- a/rpkid/tests/yamltest.py +++ b/rpkid/tests/yamltest.py @@ -608,7 +608,7 @@ try: root_uri = "rsync://localhost:%d/rpki/" % db.root.pubd.rsync_port root_sia = ((rpki.oids.name2oid["id-ad-caRepository"], ("uri", root_uri)), - (rpki.oids.name2oid["id-ad-rpkiManifest"], ("uri", root_uri + "root.mnf"))) + (rpki.oids.name2oid["id-ad-rpkiManifest"], ("uri", root_uri + "root.mft"))) root_cert = rpki.x509.X509.self_certify( keypair = root_key, |