aboutsummaryrefslogtreecommitdiff
path: root/rpkid/rpki/rpkid.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r--rpkid/rpki/rpkid.py265
1 files changed, 204 insertions, 61 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py
index f3fc38fa..42671f7f 100644
--- a/rpkid/rpki/rpkid.py
+++ b/rpkid/rpki/rpkid.py
@@ -42,7 +42,6 @@ import os
import time
import getopt
import sys
-import lxml.etree
import re
import random
import rpki.resource_set
@@ -57,6 +56,7 @@ import rpki.relaxng
import rpki.log
import rpki.async
import rpki.daemonize
+import rpki.rpkid_tasks
class main(object):
"""
@@ -73,6 +73,8 @@ class main(object):
self.foreground = False
self.irdbd_cms_timestamp = None
self.irbe_cms_timestamp = None
+ self.task_current = None
+ self.task_queue = []
opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?",
["config=", "debug", "foreground", "help", "profile="])
@@ -135,11 +137,17 @@ class main(object):
self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/")
+ # Icky hack to let Iain do some testing quickly, should go away
+ # once we sort out whether we can make this change permanent.
+
+ self.merge_publication_directories = self.cfg.getboolean("merge_publication_directories",
+ False)
+
self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True)
self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10),
self.cfg.getint("initial-delay-max", 120))
-
+
# Should be much longer in production
self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120))
self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0))
@@ -269,7 +277,6 @@ class main(object):
cb(200, body = reply)
try:
- self.sql.ping()
q_cms = rpki.left_right.cms_msg(DER = query)
q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert))
self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp)
@@ -296,7 +303,6 @@ class main(object):
cb(200, body = reply)
try:
- self.sql.ping()
match = self.up_down_url_regexp.search(path)
if match is None:
raise rpki.exceptions.BadContactURL, "Bad URL path received in up_down_handler(): %s" % path
@@ -323,6 +329,38 @@ class main(object):
if force or self.cron_timeout is not None:
self.cron_timeout = rpki.sundial.now() + self.cron_keepalive
+ def task_add(self, task):
+ """
+ Add a task to the scheduler task queue, unless it's already queued.
+ """
+ if task not in self.task_queue:
+ rpki.log.debug("Adding %r to task queue" % task)
+ self.task_queue.append(task)
+ return True
+ else:
+ rpki.log.debug("Task %r was already in the task queue" % task)
+ return False
+
+ def task_next(self):
+ """
+ Pull next task from the task queue and put it the deferred event
+ queue (we don't want to run it directly, as that could eventually
+ blow out our call stack).
+ """
+ try:
+ self.task_current = self.task_queue.pop(0)
+ except IndexError:
+ self.task_current = None
+ else:
+ rpki.async.event_defer(self.task_current)
+
+ def task_run(self):
+ """
+ Run first task on the task queue, unless one is running already.
+ """
+ if self.task_current is None:
+ self.task_next()
+
def cron(self, cb = None):
"""
Periodic tasks.
@@ -330,53 +368,42 @@ class main(object):
rpki.log.trace()
- def loop(iterator, s):
- self.checkpoint()
- s.cron(iterator)
+ now = rpki.sundial.now()
+
+ rpki.log.debug("Starting cron run")
def done():
self.sql.sweep()
self.cron_timeout = None
rpki.log.info("Finished cron run started at %s" % now)
- if not self.use_internal_cron:
+ if cb is not None:
cb()
- def lose(e):
- self.cron_timeout = None
- if self.use_internal_cron:
- rpki.log.traceback()
- else:
- raise
-
- try:
- now = rpki.sundial.now()
-
- assert self.use_internal_cron or self.cron_timeout is None
-
- if self.use_internal_cron:
+ completion = rpki.rpkid_tasks.CompletionHandler(done)
+ for s in rpki.left_right.self_elt.sql_fetch_all(self):
+ s.schedule_cron_tasks(completion)
+ nothing_queued = completion.count == 0
- if self.cron_timeout is not None and self.cron_timeout < now:
- rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
- self.cron_timeout = None
+ assert self.use_internal_cron or self.cron_timeout is None
- when = now + self.cron_period
- rpki.log.debug("Scheduling next cron run at %s" % when)
- self.cron_timer.set(when)
+ if self.cron_timeout is not None and self.cron_timeout < now:
+ rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout)
+ self.cron_timeout = None
- if self.cron_timeout is not None:
- rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- return
+ if self.use_internal_cron:
+ when = now + self.cron_period
+ rpki.log.debug("Scheduling next cron run at %s" % when)
+ self.cron_timer.set(when)
- self.sql.ping()
+ if self.cron_timeout is None:
self.checkpoint(self.use_internal_cron)
- rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done)
+ self.task_run()
- except (rpki.async.ExitNow, SystemExit):
- self.cron_timeout = None
- raise
+ elif self.use_internal_cron:
+ rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout)
- except Exception, e:
- lose(e)
+ if nothing_queued:
+ done()
def cronjob_handler(self, query, path, cb):
"""
@@ -391,6 +418,7 @@ class main(object):
if self.use_internal_cron:
cb(500, reason = "Running cron internally")
else:
+ rpki.log.debug("Starting externally triggered cron")
self.cron(done)
class ca_obj(rpki.sql.sql_persistent):
@@ -403,15 +431,22 @@ class ca_obj(rpki.sql.sql_persistent):
"ca_id",
"last_crl_sn",
("next_crl_update", rpki.sundial.datetime),
- "last_issued_sn", "last_manifest_sn",
+ "last_issued_sn",
+ "last_manifest_sn",
("next_manifest_update", rpki.sundial.datetime),
- "sia_uri", "parent_id", "parent_resource_class")
+ "sia_uri",
+ "parent_id",
+ "parent_resource_class")
last_crl_sn = 0
last_issued_sn = 0
last_manifest_sn = 0
+ def __repr__(self):
+ return rpki.log.log_repr(self, repr(self.parent), self.parent_resource_class)
+
@property
+ @rpki.sql.cache_reference
def parent(self):
"""
Fetch parent object to which this CA object links.
@@ -447,6 +482,13 @@ class ca_obj(rpki.sql.sql_persistent):
return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'deprecated'", (self.ca_id,))
@property
+ def active_or_deprecated_ca_details(self):
+ """
+ Fetch active and deprecated ca_details for this CA, if any.
+ """
+ return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND (state = 'active' OR state = 'deprecated')", (self.ca_id,))
+
+ @property
def revoked_ca_details(self):
"""
Fetch revoked ca_details for this CA, if any.
@@ -473,7 +515,11 @@ class ca_obj(rpki.sql.sql_persistent):
sia_uri = parent.sia_base
if not sia_uri.endswith("/"):
raise rpki.exceptions.BadURISyntax, "SIA URI must end with a slash: %s" % sia_uri
- return sia_uri + str(self.ca_id) + "/"
+ # With luck this can go away sometime soon.
+ if self.gctx.merge_publication_directories:
+ return sia_uri
+ else:
+ return sia_uri + str(self.ca_id) + "/"
def check_for_updates(self, parent, rc, cb, eb):
"""
@@ -588,6 +634,7 @@ class ca_obj(rpki.sql.sql_persistent):
callback = cb,
errback = eb)
+ rpki.log.debug("Sending issue request to %r from %r" % (parent, self.create))
rpki.up_down.issue_pdu.query(parent, self, ca_detail, done, eb)
def delete(self, parent, callback):
@@ -663,6 +710,7 @@ class ca_obj(rpki.sql.sql_persistent):
callback = cb,
errback = eb)
+ rpki.log.debug("Sending issue request to %r from %r" % (parent, self.rekey))
rpki.up_down.issue_pdu.query(parent, self, new_detail, done, eb)
def revoke(self, cb, eb, revoke_all = False):
@@ -716,6 +764,11 @@ class ca_detail_obj(rpki.sql.sql_persistent):
crl_published = None
manifest_published = None
latest_ca_cert = None
+ latest_crl = None
+ latest_manifest = None
+
+ def __repr__(self):
+ return rpki.log.log_repr(self, repr(self.ca), self.state, self.ca_cert_uri)
def sql_decode(self, vals):
"""
@@ -726,6 +779,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
assert self.manifest_public_key is None or self.manifest_private_key_id is None or self.manifest_public_key.get_DER() == self.manifest_private_key_id.get_public_DER()
@property
+ @rpki.sql.cache_reference
def ca(self):
"""
Fetch CA object to which this ca_detail links.
@@ -815,14 +869,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
child_cert.reissue(ca_detail = self, publisher = publisher)
for roa in predecessor.roas:
roa.regenerate(publisher = publisher)
-
- # Need to do something to regenerate ghostbusters here?
- # Yes, I suspect so, since presumably we want the ghostbuster to
- # be issued by the new ca_detail at this point. But check code.
-
- if predecessor.ghostbusters:
- rpki.log.warn("Probably should be regenerating Ghostbusters %r here" % ghostbuster)
-
+ for ghostbuster in predecessor.ghostbusters:
+ ghostbuster.regenerate(publisher = publisher)
+ predecessor.generate_crl(publisher = publisher)
+ predecessor.generate_manifest(publisher = publisher)
publisher.call_pubd(callback, errback)
@@ -898,10 +948,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
nextUpdate = rpki.sundial.now()
if self.latest_manifest is not None:
- try:
- self.latest_manifest.get_content()
- except rpki.exceptions.CMSContentNotSet:
- self.latest_manifest.extract()
+ self.latest_manifest.extract_if_needed()
nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate())
if self.latest_crl is not None:
@@ -942,7 +989,10 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
def issued(issue_response):
- self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert
+ new_ca_cert = issue_response.payload.classes[0].certs[0].cert
+ if self.latest_ca_cert != new_ca_cert:
+ self.latest_ca_cert = new_ca_cert
+ self.sql_mark_dirty()
new_resources = self.latest_ca_cert.get_3779resources()
publisher = publication_queue()
@@ -952,11 +1002,12 @@ class ca_detail_obj(rpki.sql.sql_persistent):
if sia_uri_changed or child_resources.oversized(new_resources):
child_cert.reissue(
ca_detail = self,
- resources = child_resources.intersection(new_resources),
+ resources = child_resources & new_resources,
publisher = publisher)
publisher.call_pubd(callback, errback)
+ rpki.log.debug("Sending issue request to %r from %r" % (parent, self.update))
rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback)
@classmethod
@@ -994,7 +1045,6 @@ class ca_detail_obj(rpki.sql.sql_persistent):
notAfter = self.latest_ca_cert.getNotAfter(),
is_ca = False)
-
def generate_manifest_cert(self):
"""
Generate a new manifest certificate for this ca_detail.
@@ -1005,7 +1055,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
ca = self.ca,
resources = resources,
subject_key = self.manifest_public_key,
- sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.manifest_uri)),))
+ sia = (None, None, self.manifest_uri))
def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None):
"""
@@ -1015,6 +1065,8 @@ class ca_detail_obj(rpki.sql.sql_persistent):
containing the newly issued cert.
"""
+ self.check_failed_publication(publisher)
+
assert child_cert is None or child_cert.child_id == child.child_id
cert = self.latest_ca_cert.issue(
@@ -1036,6 +1088,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
rpki.log.debug("Created new child_cert %r" % child_cert)
else:
child_cert.cert = cert
+ del child_cert.ca_detail
child_cert.ca_detail_id = self.ca_detail_id
rpki.log.debug("Reusing existing child_cert %r" % child_cert)
@@ -1058,6 +1111,8 @@ class ca_detail_obj(rpki.sql.sql_persistent):
new CRL is needed.
"""
+ self.check_failed_publication(publisher)
+
ca = self.ca
parent = ca.parent
crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval)
@@ -1071,7 +1126,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
if now > revoked_cert.expires + crl_interval:
revoked_cert.sql_delete()
else:
- certlist.append((revoked_cert.serial, revoked_cert.revoked.toASN1tuple(), ()))
+ certlist.append((revoked_cert.serial, revoked_cert.revoked))
certlist.sort()
self.latest_crl = rpki.x509.CRL.generate(
@@ -1100,22 +1155,30 @@ class ca_detail_obj(rpki.sql.sql_persistent):
Generate a new manifest for this ca_detail.
"""
+ self.check_failed_publication(publisher)
+
ca = self.ca
parent = ca.parent
crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval)
now = rpki.sundial.now()
+ uri = self.manifest_uri
if nextUpdate is None:
nextUpdate = now + crl_interval
if self.latest_manifest_cert is None or self.latest_manifest_cert.getNotAfter() < nextUpdate:
+ rpki.log.debug("Generating EE certificate for %s" % uri)
self.generate_manifest_cert()
+ rpki.log.debug("Latest CA cert notAfter %s, new %s EE notAfter %s" % (
+ self.latest_ca_cert.getNotAfter(), uri, self.latest_manifest_cert.getNotAfter()))
+ rpki.log.debug("Constructing manifest object list for %s" % uri)
objs = [(self.crl_uri_tail, self.latest_crl)]
objs.extend((c.uri_tail, c.cert) for c in self.child_certs)
objs.extend((r.uri_tail, r.roa) for r in self.roas if r.roa is not None)
objs.extend((g.uri_tail, g.ghostbuster) for g in self.ghostbusters)
+ rpki.log.debug("Building manifest object %s" % uri)
self.latest_manifest = rpki.x509.SignedManifest.build(
serial = ca.next_manifest_number(),
thisUpdate = now,
@@ -1124,10 +1187,11 @@ class ca_detail_obj(rpki.sql.sql_persistent):
keypair = self.manifest_private_key_id,
certs = self.latest_manifest_cert)
+ rpki.log.debug("Manifest generation took %s" % (rpki.sundial.now() - now))
self.manifest_published = rpki.sundial.now()
self.sql_mark_dirty()
- publisher.publish(cls = rpki.publication.manifest_elt, uri = self.manifest_uri, obj = self.latest_manifest, repository = parent.repository,
+ publisher.publish(cls = rpki.publication.manifest_elt, uri = uri, obj = self.latest_manifest, repository = parent.repository,
handler = self.manifest_published_callback)
def manifest_published_callback(self, pdu):
@@ -1144,6 +1208,7 @@ class ca_detail_obj(rpki.sql.sql_persistent):
"""
publisher = publication_queue()
+ self.check_failed_publication(publisher)
for roa in self.roas:
roa.regenerate(publisher, fast = True)
for ghostbuster in self.ghostbusters:
@@ -1152,6 +1217,48 @@ class ca_detail_obj(rpki.sql.sql_persistent):
child_cert.reissue(self, publisher, force = True)
publisher.call_pubd(cb, eb)
+ def check_failed_publication(self, publisher):
+ """
+ Check for failed publication of objects issued by this ca_detail.
+
+ All publishable objects have timestamp fields recording time of
+ last attempted publication, and callback methods which clear these
+ timestamps once publication has succeeded. Our task here is to
+ look for objects issued by this ca_detail which have timestamps
+ set (indicating that they have not been published) and for which
+ the timestamps are not very recent (for some definition of very
+ recent -- intent is to allow a bit of slack in case pubd is just
+ being slow). In such cases, we want to retry publication.
+
+ As an optimization, we can probably just check the manifest and
+ CRL; if these are up to date we probably don't need to check other
+ objects (which would involve several more SQL queries). Not sure
+ yet whether this optimization is worthwhile.
+
+ At the moment, we only check CRL and manifest, full stop. This
+ should be expanded to check other objects, but that would take
+ longer and I have a user who needs this fix today.
+ """
+
+ stale = rpki.sundial.now() - rpki.sundial.timedelta(seconds = 60)
+ repository = self.ca.parent.repository
+
+ if self.latest_crl is not None and self.crl_published is not None and self.crl_published < stale:
+ rpki.log.debug("Retrying publication for %s" % self.crl_uri)
+ publisher.publish(cls = rpki.publication.crl_elt,
+ uri = self.crl_uri,
+ obj = self.latest_crl,
+ repository = repository,
+ handler = self.crl_published_callback)
+
+ if self.latest_manifest is not None and self.manifest_published is not None and self.manifest_published < stale:
+ rpki.log.debug("Retrying publication for %s" % self.manifest_uri)
+ publisher.publish(cls = rpki.publication.manifest_elt,
+ uri = self.manifest_uri,
+ obj = self.latest_manifest,
+ repository = repository,
+ handler = self.manifest_published_callback)
+
class child_cert_obj(rpki.sql.sql_persistent):
"""
Certificate that has been issued to a child.
@@ -1166,6 +1273,9 @@ class child_cert_obj(rpki.sql.sql_persistent):
"ski",
("published", rpki.sundial.datetime))
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.uri)
+
def __init__(self, gctx = None, child_id = None, ca_detail_id = None, cert = None):
"""
Initialize a child_cert_obj.
@@ -1180,19 +1290,28 @@ class child_cert_obj(rpki.sql.sql_persistent):
self.sql_mark_dirty()
@property
+ @rpki.sql.cache_reference
def child(self):
"""
Fetch child object to which this child_cert object links.
"""
return rpki.left_right.child_elt.sql_fetch(self.gctx, self.child_id)
-
+
@property
+ @rpki.sql.cache_reference
def ca_detail(self):
"""
Fetch ca_detail object to which this child_cert object links.
"""
return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
+ @ca_detail.deleter
+ def ca_detail(self):
+ try:
+ del self._ca_detail
+ except AttributeError:
+ pass
+
@property
def uri_tail(self):
"""
@@ -1353,6 +1472,9 @@ class revoked_cert_obj(rpki.sql.sql_persistent):
("revoked", rpki.sundial.datetime),
("expires", rpki.sundial.datetime))
+ def __repr__(self):
+ return rpki.log.log_repr(self, repr(self.ca_detail), self.serial, self.revoked)
+
def __init__(self, gctx = None, serial = None, revoked = None, expires = None, ca_detail_id = None):
"""
Initialize a revoked_cert_obj.
@@ -1367,6 +1489,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent):
self.sql_mark_dirty()
@property
+ @rpki.sql.cache_reference
def ca_detail(self):
"""
Fetch ca_detail object to which this revoked_cert_obj links.
@@ -1406,6 +1529,7 @@ class roa_obj(rpki.sql.sql_persistent):
published = None
@property
+ @rpki.sql.cache_reference
def self(self):
"""
Fetch self object to which this roa_obj links.
@@ -1413,12 +1537,20 @@ class roa_obj(rpki.sql.sql_persistent):
return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id)
@property
+ @rpki.sql.cache_reference
def ca_detail(self):
"""
Fetch ca_detail object to which this roa_obj links.
"""
return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id)
+ @ca_detail.deleter
+ def ca_detail(self):
+ try:
+ del self._ca_detail
+ except AttributeError:
+ pass
+
def sql_fetch_hook(self):
"""
Extra SQL fetch actions for roa_obj -- handle prefix lists.
@@ -1569,12 +1701,13 @@ class roa_obj(rpki.sql.sql_persistent):
resources = rpki.resource_set.resource_bag(v4 = v4, v6 = v6)
keypair = rpki.x509.RSA.generate()
+ del self.ca_detail
self.ca_detail_id = ca_detail.ca_detail_id
self.cert = ca_detail.issue_ee(
ca = ca,
resources = resources,
subject_key = keypair.get_RSApublic(),
- sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri_from_key(keypair))),))
+ sia = (None, None, self.uri_from_key(keypair)))
self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,))
self.published = rpki.sundial.now()
self.sql_store()
@@ -1685,7 +1818,11 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
published = None
vcard = None
+ def __repr__(self):
+ return rpki.log.log_repr(self, self.uri)
+
@property
+ @rpki.sql.cache_reference
def self(self):
"""
Fetch self object to which this ghostbuster_obj links.
@@ -1693,6 +1830,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id)
@property
+ @rpki.sql.cache_reference
def ca_detail(self):
"""
Fetch ca_detail object to which this ghostbuster_obj links.
@@ -1748,7 +1886,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent):
ca = ca,
resources = resources,
subject_key = keypair.get_RSApublic(),
- sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri_from_key(keypair))),))
+ sia = (None, None, self.uri_from_key(keypair)))
self.ghostbuster = rpki.x509.Ghostbuster.build(self.vcard, keypair, (self.cert,))
self.published = rpki.sundial.now()
self.sql_store()
@@ -1879,6 +2017,7 @@ class publication_queue(object):
def call_pubd(self, cb, eb):
def loop(iterator, rid):
+ rpki.log.debug("Calling pubd[%r]" % self.repositories[rid])
self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers)
def done():
self.clear()
@@ -1888,3 +2027,7 @@ class publication_queue(object):
@property
def size(self):
return sum(len(self.msgs[rid]) for rid in self.repositories)
+
+ def empty(self):
+ assert (not self.msgs) == (self.size == 0)
+ return not self.msgs