aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2012-08-08 22:15:54 +0000
committerRob Austein <sra@hactrn.net>2012-08-08 22:15:54 +0000
commitf8d0d3bbbfc06ab298e3eb379e6759e1c3c3c863 (patch)
tree74427f30f83bee07706dcbf26dcd092e1588b985
parent0ffc84f40bf25c778e20d49be33eebab3c7612e5 (diff)
Checkpoint of work to date, see #274 and #275.
svn path=/branches/tk274/; revision=4623
-rw-r--r--rpkid/examples/rpki.conf3
-rw-r--r--rpkid/rpki/left_right.py24
-rw-r--r--rpkid/rpki/log.py11
-rw-r--r--rpkid/rpki/rpkid.py149
-rw-r--r--rpkid/rpki/x509.py16
5 files changed, 154 insertions, 49 deletions
diff --git a/rpkid/examples/rpki.conf b/rpkid/examples/rpki.conf
index fdcd4cdd..880758ee 100644
--- a/rpkid/examples/rpki.conf
+++ b/rpkid/examples/rpki.conf
@@ -472,3 +472,6 @@ subjectInfoAccess = 1.3.6.1.5.5.7.48.5;URI:${rootd::root_cert_sia},1.3.6.
sbgp-autonomousSysNum = critical,${rootd::root_cert_asns}
sbgp-ipAddrBlock = critical,${rootd::root_cert_addrs}
certificatePolicies = critical,1.3.6.1.5.5.7.14.2
+
+#[rpkic]
+#autosync = false
diff --git a/rpkid/rpki/left_right.py b/rpkid/rpki/left_right.py
index b74b12b5..33c14d8c 100644
--- a/rpkid/rpki/left_right.py
+++ b/rpkid/rpki/left_right.py
@@ -319,8 +319,12 @@ class self_elt(data_elt):
"""
Handle a left-right run_now action for this self.
"""
- rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % (self.self_handle, self.self_id))
- self.cron(cb)
+ rpki.log.debug("Forced immediate run of periodic actions for self %s[%d]" % (
+ self.self_handle, self.self_id))
+ if self.gctx.task_add(self.cron, cb):
+ self.gctx.task_run()
+ else:
+ cb()
def serve_fetch_one_maybe(self):
"""
@@ -378,6 +382,7 @@ class self_elt(data_elt):
self.gctx.checkpoint()
self.gctx.sql.sweep()
self.gctx.sql.cache_clear_maybe()
+ rpki.log.debug("Self %s[%d] finished cron cycle, calling %r" % (self.self_handle, self.self_id, cb))
cb()
one()
@@ -662,8 +667,9 @@ class self_elt(data_elt):
def got_roa_requests(roa_requests):
- self.gctx.checkpoint()
+ rpki.log.debug("Received response to query for ROA requests")
+ self.gctx.checkpoint()
if self.gctx.sql.dirty:
rpki.log.warn("Unexpected dirty SQL cache, flushing")
self.gctx.sql.sweep()
@@ -687,6 +693,8 @@ class self_elt(data_elt):
def loop(iterator, roa_request):
self.gctx.checkpoint()
+ rpki.log.debug("++ roa_requests %s roas %s orphans %s publisher.size %s ca_details %s seen %s cache %s" % (
+ len(roa_requests), len(roas), len(orphans), publisher.size, len(ca_details), len(seen), len(self.gctx.sql.cache)))
try:
k = (roa_request.asn, str(roa_request.ipv4), str(roa_request.ipv6))
if k in seen:
@@ -709,6 +717,10 @@ class self_elt(data_elt):
rpki.log.warn("Could not update %r, skipping: %s" % (roa, e))
if max_new_roas_at_once is not None and publisher.size > max_new_roas_at_once:
self.gctx.sql.sweep()
+ for ca_detail in ca_details:
+ ca_detail.generate_crl(publisher = publisher)
+ ca_detail.generate_manifest(publisher = publisher)
+ self.gctx.sql.sweep()
self.gctx.checkpoint()
publisher.call_pubd(iterator, publication_failed)
else:
@@ -721,7 +733,6 @@ class self_elt(data_elt):
cb()
def done():
-
orphans.extend(roas.itervalues())
for roa in orphans:
try:
@@ -732,13 +743,10 @@ class self_elt(data_elt):
except Exception, e:
rpki.log.traceback()
rpki.log.warn("Could not revoke %r: %s" % (roa, e))
-
self.gctx.sql.sweep()
-
for ca_detail in ca_details:
ca_detail.generate_crl(publisher = publisher)
ca_detail.generate_manifest(publisher = publisher)
-
self.gctx.sql.sweep()
self.gctx.checkpoint()
publisher.call_pubd(cb, publication_failed)
@@ -752,6 +760,7 @@ class self_elt(data_elt):
self.gctx.checkpoint()
self.gctx.sql.sweep()
+ rpki.log.debug("Issuing query for ROA requests")
self.gctx.irdb_query_roa_requests(self.self_handle, got_roa_requests, roa_requests_failed)
@@ -1276,6 +1285,7 @@ class child_elt(data_elt):
q_msg.payload.gctx = self.gctx
if enforce_strict_up_down_xml_sender and q_msg.sender != str(self.child_id):
raise rpki.exceptions.BadSender, "Unexpected XML sender %s" % q_msg.sender
+ self.gctx.sql.sweep()
def done(r_msg):
#
diff --git a/rpkid/rpki/log.py b/rpkid/rpki/log.py
index bc20e395..869dbcf0 100644
--- a/rpkid/rpki/log.py
+++ b/rpkid/rpki/log.py
@@ -115,13 +115,20 @@ def traceback(do_it = None):
classes have their own controls for this, this lets us provide a
unified interface). If no argument is specified, we use the global
default value rpki.log.enable_tracebacks.
+
+ Assertion failures generate backtraces unconditionally, on the
+ theory that (a) assertion failures are programming errors by
+ definition, and (b) it's often hard to figure out what's triggering
+ a particular assertion failure without the backtrace.
"""
if do_it is None:
do_it = enable_tracebacks
- if do_it:
- assert sys.exc_info() != (None, None, None), "rpki.log.traceback() called without valid trace on stack, this is a programming error"
+ e = sys.exc_info()[1]
+ assert e is not None, "rpki.log.traceback() called without valid trace on stack! This should not happen."
+
+ if do_it or isinstance(e, AssertionError):
bt = tb.extract_stack(limit = 3)
error("Exception caught in %s() at %s:%d called from %s:%d" % (bt[1][2], bt[1][0], bt[1][1], bt[0][0], bt[0][1]))
bt = tb.format_exc()
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index f3fc38fa..93726512 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -73,6 +73,8 @@ class main(object):
self.foreground = False
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
+ self.task_current = None
+ self.task_queue = []
opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?",
["config=", "debug", "foreground", "help", "profile="])
@@ -323,60 +325,91 @@ class main(object):
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
+ def task_add(self, handler, cb = None, description = None):
+ """
+ Add a task to the scheduler task queue, unless it's already queued.
+ """
+ t = task(self, handler, cb, description)
+ rpki.log.debug("New task %r" % t)
+ rpki.log.debug("Task queue %r" % self.task_queue)
+ if t not in self.task_queue:
+ rpki.log.debug("Adding %r to task queue" % t)
+ self.task_queue.append(t)
+ return True
+ else:
+ rpki.log.debug("Task %r was already in the task queue" % t)
+ return False
+
+ def task_next(self):
+ """
+ Pull next task from the task queue and put it the deferred event
+ queue (we don't want to run it directly, as that could eventually
+ blow out our call stack).
+
+ Not yet sure what to do here if task queue is empty. For now we
+ just return, on the theory that we've nothing left to do until the
+ next cron timer fires. This may be a bad assumption, revisit if
+ the rest of the code doesn't fit well with this assumption.
+ """
+ rpki.log.debug("Looking for next task")
+ try:
+ self.task_current = self.task_queue.pop(0)
+ except IndexError:
+ self.task_current = None
+ else:
+ rpki.log.debug("Pulled %r from task queue" % self.task_current)
+ rpki.async.defer(self.task_current.run)
+
+ def task_run(self):
+ """
+ Run first task on the task queue, unless one is running already.
+ """
+ if self.task_current is None:
+ self.task_next()
+
+ def cron_done(self, cb):
+ """
+ Completion handler for timer-driven cron.
+ """
+ self.sql.sweep()
+ self.cron_timeout = None
+ rpki.log.info("Finished cron run")
+ cb()
+
def cron(self, cb = None):
"""
Periodic tasks.
"""
rpki.log.trace()
+ self.sql.ping()
- def loop(iterator, s):
- self.checkpoint()
- s.cron(iterator)
-
- def done():
- self.sql.sweep()
- self.cron_timeout = None
- rpki.log.info("Finished cron run started at %s" % now)
- if not self.use_internal_cron:
- cb()
-
- def lose(e):
- self.cron_timeout = None
- if self.use_internal_cron:
- rpki.log.traceback()
- else:
- raise
-
- try:
- now = rpki.sundial.now()
+ now = rpki.sundial.now()
- assert self.use_internal_cron or self.cron_timeout is None
+ rpki.log.debug("Starting cron run")
- if self.use_internal_cron:
+ for s in rpki.left_right.self_elt.sql_fetch_all(self):
+ self.task_add(s.cron)
+ self.task_add(self.cron_done, cb)
- if self.cron_timeout is not None and self.cron_timeout < now:
- rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
- self.cron_timeout = None
+ assert self.use_internal_cron or self.cron_timeout is None
- when = now + self.cron_period
- rpki.log.debug("Scheduling next cron run at %s" % when)
- self.cron_timer.set(when)
+ if self.cron_timeout is not None and self.cron_timeout < now:
+ rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
+ self.cron_timeout = None
- if self.cron_timeout is not None:
- rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- return
+ if self.use_internal_cron:
+ when = now + self.cron_period
+ rpki.log.debug("Scheduling next cron run at %s" % when)
+ self.cron_timer.set(when)
- self.sql.ping()
+ if self.cron_timeout is None:
self.checkpoint(self.use_internal_cron)
- rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done)
+ self.task_run()
- except (rpki.async.ExitNow, SystemExit):
- self.cron_timeout = None
- raise
+ elif self.use_internal_cron:
+ rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- except Exception, e:
- lose(e)
def cronjob_handler(self, query, path, cb):
"""
@@ -391,8 +424,47 @@ class main(object):
if self.use_internal_cron:
cb(500, reason = "Running cron internally")
else:
+ rpki.log.debug("Starting externally triggered cron")
self.cron(done)
+class task(object):
+ """
+ Scheduler task object.
+ """
+
+ def __init__(self, gctx, handler, cb = None, description = None):
+ self.gctx = gctx
+ self.handler = handler
+ self.cb = cb
+ self.description = description
+
+ def __cmp__(self, other):
+ return cmp(self.handler, other.handler)
+
+ def __hash__(self):
+ return self.handler.__hash__()
+
+ def __repr__(self):
+ if self.description is None:
+ return repr(self.handler)
+ else:
+ return "<%r: %s>" % (self.handler, self.description)
+
+ def done(self):
+ """
+ Completion handler for task.
+ """
+ if self.cb is not None:
+ self.cb()
+ self.gctx.task_next()
+
+ def run(self):
+ """
+ Run this task when called via the deferred event system.
+ """
+ rpki.log.debug("Running task %r" % self)
+ self.handler(self.done)
+
class ca_obj(rpki.sql.sql_persistent):
"""
Internal CA object.
@@ -1879,6 +1951,7 @@ class publication_queue(object):
def call_pubd(self, cb, eb):
def loop(iterator, rid):
+ rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
def done():
self.clear()
diff --git a/rpkid/rpki/x509.py b/rpkid/rpki/x509.py
index 92194a96..4de729ac 100644
--- a/rpkid/rpki/x509.py
+++ b/rpkid/rpki/x509.py
@@ -1484,6 +1484,16 @@ class XML_CMS_object(CMS_object):
dump_inbound_cms = None
+ ## @var check_inbound_schema
+ # If set, perform RelaxNG schema check on inbound messages.
+
+ check_inbound_schema = False # XXX
+
+ ## @var check_outbound_schema
+ # If set, perform RelaxNG schema check on outbound messages.
+
+ check_outbound_schema = False
+
def encode(self):
"""
Encode inner content for signing.
@@ -1531,7 +1541,8 @@ class XML_CMS_object(CMS_object):
self.set_content(msg)
else:
self.set_content(msg.toXML())
- self.schema_check()
+ if self.check_outbound_schema:
+ self.schema_check()
self.sign(keypair, certs, crls)
if self.dump_outbound_cms:
self.dump_outbound_cms.dump(self)
@@ -1544,7 +1555,8 @@ class XML_CMS_object(CMS_object):
if self.dump_inbound_cms:
self.dump_inbound_cms.dump(self)
self.verify(ta)
- self.schema_check()
+ if self.check_inbound_schema:
+ self.schema_check()
if self.saxify is None:
return self.get_content()
else: