diff options
author | Rob Austein <sra@hactrn.net> | 2012-08-08 22:15:54 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2012-08-08 22:15:54 +0000 |
commit | f8d0d3bbbfc06ab298e3eb379e6759e1c3c3c863 (patch) | |
tree | 74427f30f83bee07706dcbf26dcd092e1588b985 | |
parent | 0ffc84f40bf25c778e20d49be33eebab3c7612e5 (diff) |
Checkpoint of work to date, see #274 and #275.
svn path=/branches/tk274/; revision=4623
-rw-r--r-- | rpkid/examples/rpki.conf | 3 | ||||
-rw-r--r-- | rpkid/rpki/left_right.py | 24 | ||||
-rw-r--r-- | rpkid/rpki/log.py | 11 | ||||
-rw-r--r-- | rpkid/rpki/rpkid.py | 149 | ||||
-rw-r--r-- | rpkid/rpki/x509.py | 16 |
5 files changed, 154 insertions, 49 deletions
diff --git a/rpkid/examples/rpki.conf b/rpkid/examples/rpki.conf index fdcd4cdd..880758ee 100644 --- a/rpkid/examples/rpki.conf +++ b/rpkid/examples/rpki.conf @@ -472,3 +472,6 @@ subjectInfoAccess = 1.3.6.1.5.5.7.48.5;URI:${rootd::root_cert_sia},1.3.6. sbgp-autonomousSysNum = critical,${rootd::root_cert_asns} sbgp-ipAddrBlock = critical,${rootd::root_cert_addrs} certificatePolicies = critical,1.3.6.1.5.5.7.14.2 + +#[rpkic] +#autosync = false diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py index b74b12b5..33c14d8c 100644 --- a/rpkid/rpki/left_right.py +++ b/rpkid/rpki/left_right.py @@ -319,8 +319,12 @@ class self_elt(data_elt): """ Handle a left-right run_now action for this self. """ - rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % (self.self_handle, self.self_id)) - self.cron(cb) + rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % ( + self.self_handle, self.self_id)) + if self.gctx.task_add(self.cron, cb): + self.gctx.task_run() + else: + cb() def serve_fetch_one_maybe(self): """ @@ -378,6 +382,7 @@ class self_elt(data_elt): self.gctx.checkpoint() self.gctx.sql.sweep() self.gctx.sql.cache_clear_maybe() + rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb)) cb() one() @@ -662,8 +667,9 @@ class self_elt(data_elt): def got_roa_requests(roa_requests): - self.gctx.checkpoint() + rpki.log.debug("Received response to query for ROA requests") + self.gctx.checkpoint() if self.gctx.sql.dirty: rpki.log.warn("Unexpected dirty SQL cache, flushing") self.gctx.sql.sweep() @@ -687,6 +693,8 @@ class self_elt(data_elt): def loop(iterator, roa_request): self.gctx.checkpoint() + rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % ( + len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache))) try: k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6)) if k in seen: @@ -709,6 +717,10 @@ class self_elt(data_elt): rpki.log.warn("Could not update %r, skipping: %s" % (roa, e)) if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once: self.gctx.sql.sweep() + for ca_detail in ca_details: + ca_detail.generate_crl(publisher = publisher) + ca_detail.generate_manifest(publisher = publisher) + self.gctx.sql.sweep() self.gctx.checkpoint() publisher.call_pubd(iterator, publication_failed) else: @@ -721,7 +733,6 @@ class self_elt(data_elt): cb() def done(): - orphans.extend(roas.itervalues()) for roa in orphans: try: @@ -732,13 +743,10 @@ class self_elt(data_elt): except Exception, e: rpki.log.traceback() rpki.log.warn("Could not revoke %r: %s" % (roa, e)) - self.gctx.sql.sweep() - for ca_detail in ca_details: ca_detail.generate_crl(publisher = publisher) ca_detail.generate_manifest(publisher = publisher) - self.gctx.sql.sweep() self.gctx.checkpoint() publisher.call_pubd(cb, publication_failed) @@ -752,6 +760,7 @@ class self_elt(data_elt): self.gctx.checkpoint() self.gctx.sql.sweep() + rpki.log.debug("Issuing query for ROA requests") self.gctx.irdb_query_roa_requests(self.self_handle, got_roa_requests, roa_requests_failed) @@ -1276,6 +1285,7 @@ class child_elt(data_elt): q_msg.payload.gctx = self.gctx if enforce_strict_up_down_xml_sender and q_msg.sender != str(self.child_id): raise rpki.exceptions.BadSender, "Unexpected XML sender %s" % q_msg.sender + self.gctx.sql.sweep() def done(r_msg): # diff --git a/rpkid/rpki/log.py b/rpkid/rpki/log.py index bc20e395..869dbcf0 100644 --- a/rpkid/rpki/log.py +++ b/rpkid/rpki/log.py @@ -115,13 +115,20 @@ def traceback(do_it = None): classes have their own controls for this, this lets us provide a unified interface). If no argument is specified, we use the global default value rpki.log.enable_tracebacks. + + Assertion failures generate backtraces unconditionally, on the + theory that (a) assertion failures are programming errors by + definition, and (b) it's often hard to figure out what's triggering + a particular assertion failure without the backtrace. """ if do_it is None: do_it = enable_tracebacks - if do_it: - assert sys.exc_info() != (None, None, None), "rpki.log.traceback() called without valid trace on stack, this is a programming error" + e = sys.exc_info()[1] + assert e is not None, "rpki.log.traceback() called without valid trace on stack! This should not happen." + + if do_it or isinstance(e, AssertionError): bt = tb.extract_stack(limit = 3) error("Exception caught in %s() at %s:%d called from %s:%d" % (bt[1][2], bt[1][0], bt[1][1], bt[0][0], bt[0][1])) bt = tb.format_exc() diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index f3fc38fa..93726512 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -73,6 +73,8 @@ class main(object): self.foreground = False self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None + self.task_current = None + self.task_queue = [] opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?", ["config=", "debug", "foreground", "help", "profile="]) @@ -323,60 +325,91 @@ class main(object): if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive + def task_add(self, handler, cb = None, description = None): + """ + Add a task to the scheduler task queue, unless it's already queued. + """ + t = task(self, handler, cb, description) + rpki.log.debug("New task %r" % t) + rpki.log.debug("Task queue %r" % self.task_queue) + if t not in self.task_queue: + rpki.log.debug("Adding %r to task queue" % t) + self.task_queue.append(t) + return True + else: + rpki.log.debug("Task %r was already in the task queue" % t) + return False + + def task_next(self): + """ + Pull next task from the task queue and put it the deferred event + queue (we don't want to run it directly, as that could eventually + blow out our call stack). + + Not yet sure what to do here if task queue is empty. For now we + just return, on the theory that we've nothing left to do until the + next cron timer fires. This may be a bad assumption, revisit if + the rest of the code doesn't fit well with this assumption. + """ + rpki.log.debug("Looking for next task") + try: + self.task_current = self.task_queue.pop(0) + except IndexError: + self.task_current = None + else: + rpki.log.debug("Pulled %r from task queue" % self.task_current) + rpki.async.defer(self.task_current.run) + + def task_run(self): + """ + Run first task on the task queue, unless one is running already. + """ + if self.task_current is None: + self.task_next() + + def cron_done(self, cb): + """ + Completion handler for timer-driven cron. + """ + self.sql.sweep() + self.cron_timeout = None + rpki.log.info("Finished cron run") + cb() + def cron(self, cb = None): """ Periodic tasks. """ rpki.log.trace() + self.sql.ping() - def loop(iterator, s): - self.checkpoint() - s.cron(iterator) - - def done(): - self.sql.sweep() - self.cron_timeout = None - rpki.log.info("Finished cron run started at %s" % now) - if not self.use_internal_cron: - cb() - - def lose(e): - self.cron_timeout = None - if self.use_internal_cron: - rpki.log.traceback() - else: - raise - - try: - now = rpki.sundial.now() + now = rpki.sundial.now() - assert self.use_internal_cron or self.cron_timeout is None + rpki.log.debug("Starting cron run") - if self.use_internal_cron: + for s in rpki.left_right.self_elt.sql_fetch_all(self): + self.task_add(s.cron) + self.task_add(self.cron_done, cb) - if self.cron_timeout is not None and self.cron_timeout < now: - rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) - self.cron_timeout = None + assert self.use_internal_cron or self.cron_timeout is None - when = now + self.cron_period - rpki.log.debug("Scheduling next cron run at %s" % when) - self.cron_timer.set(when) + if self.cron_timeout is not None and self.cron_timeout < now: + rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) + self.cron_timeout = None - if self.cron_timeout is not None: - rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - return + if self.use_internal_cron: + when = now + self.cron_period + rpki.log.debug("Scheduling next cron run at %s" % when) + self.cron_timer.set(when) - self.sql.ping() + if self.cron_timeout is None: self.checkpoint(self.use_internal_cron) - rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done) + self.task_run() - except (rpki.async.ExitNow, SystemExit): - self.cron_timeout = None - raise + elif self.use_internal_cron: + rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - except Exception, e: - lose(e) def cronjob_handler(self, query, path, cb): """ @@ -391,8 +424,47 @@ class main(object): if self.use_internal_cron: cb(500, reason = "Running cron internally") else: + rpki.log.debug("Starting externally triggered cron") self.cron(done) +class task(object): + """ + Scheduler task object. + """ + + def __init__(self, gctx, handler, cb = None, description = None): + self.gctx = gctx + self.handler = handler + self.cb = cb + self.description = description + + def __cmp__(self, other): + return cmp(self.handler, other.handler) + + def __hash__(self): + return self.handler.__hash__() + + def __repr__(self): + if self.description is None: + return repr(self.handler) + else: + return "<%r: %s>" % (self.handler, self.description) + + def done(self): + """ + Completion handler for task. + """ + if self.cb is not None: + self.cb() + self.gctx.task_next() + + def run(self): + """ + Run this task when called via the deferred event system. + """ + rpki.log.debug("Running task %r" % self) + self.handler(self.done) + class ca_obj(rpki.sql.sql_persistent): """ Internal CA object. @@ -1879,6 +1951,7 @@ class publication_queue(object): def call_pubd(self, cb, eb): def loop(iterator, rid): + rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) def done(): self.clear() diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py index 92194a96..4de729ac 100644 --- a/rpkid/rpki/x509.py +++ b/rpkid/rpki/x509.py @@ -1484,6 +1484,16 @@ class XML_CMS_object(CMS_object): dump_inbound_cms = None + ## @var check_inbound_schema + # If set, perform RelaxNG schema check on inbound messages. + + check_inbound_schema = False # XXX + + ## @var check_outbound_schema + # If set, perform RelaxNG schema check on outbound messages. + + check_outbound_schema = False + def encode(self): """ Encode inner content for signing. @@ -1531,7 +1541,8 @@ class XML_CMS_object(CMS_object): self.set_content(msg) else: self.set_content(msg.toXML()) - self.schema_check() + if self.check_outbound_schema: + self.schema_check() self.sign(keypair, certs, crls) if self.dump_outbound_cms: self.dump_outbound_cms.dump(self) @@ -1544,7 +1555,8 @@ class XML_CMS_object(CMS_object): if self.dump_inbound_cms: self.dump_inbound_cms.dump(self) self.verify(ta) - self.schema_check() + if self.check_inbound_schema: + self.schema_check() if self.saxify is None: return self.get_content() else: |