diff options
Diffstat (limited to 'rpkid/rpki/rpkid.py')
-rw-r--r-- | rpkid/rpki/rpkid.py | 265 |
1 files changed, 204 insertions, 61 deletions
diff --git a/rpkid/rpki/rpkid.py b/rpkid/rpki/rpkid.py index f3fc38fa..42671f7f 100644 --- a/rpkid/rpki/rpkid.py +++ b/rpkid/rpki/rpkid.py @@ -42,7 +42,6 @@ import os import time import getopt import sys -import lxml.etree import re import random import rpki.resource_set @@ -57,6 +56,7 @@ import rpki.relaxng import rpki.log import rpki.async import rpki.daemonize +import rpki.rpkid_tasks class main(object): """ @@ -73,6 +73,8 @@ class main(object): self.foreground = False self.irdbd_cms_timestamp = None self.irbe_cms_timestamp = None + self.task_current = None + self.task_queue = [] opts, argv = getopt.getopt(sys.argv[1:], "c:dfhp:?", ["config=", "debug", "foreground", "help", "profile="]) @@ -135,11 +137,17 @@ class main(object): self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/") + # Icky hack to let Iain do some testing quickly, should go away + # once we sort out whether we can make this change permanent. + + self.merge_publication_directories = self.cfg.getboolean("merge_publication_directories", + False) + self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), self.cfg.getint("initial-delay-max", 120)) - + # Should be much longer in production self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120)) self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0)) @@ -269,7 +277,6 @@ class main(object): cb(200, body = reply) try: - self.sql.ping() q_cms = rpki.left_right.cms_msg(DER = query) q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp) @@ -296,7 +303,6 @@ class main(object): cb(200, body = reply) try: - self.sql.ping() match = self.up_down_url_regexp.search(path) if match is None: raise rpki.exceptions.BadContactURL, "Bad URL path received in up_down_handler(): %s" % path @@ -323,6 +329,38 @@ class main(object): if force or self.cron_timeout is not None: self.cron_timeout = rpki.sundial.now() + self.cron_keepalive + def task_add(self, task): + """ + Add a task to the scheduler task queue, unless it's already queued. + """ + if task not in self.task_queue: + rpki.log.debug("Adding %r to task queue" % task) + self.task_queue.append(task) + return True + else: + rpki.log.debug("Task %r was already in the task queue" % task) + return False + + def task_next(self): + """ + Pull next task from the task queue and put it the deferred event + queue (we don't want to run it directly, as that could eventually + blow out our call stack). + """ + try: + self.task_current = self.task_queue.pop(0) + except IndexError: + self.task_current = None + else: + rpki.async.event_defer(self.task_current) + + def task_run(self): + """ + Run first task on the task queue, unless one is running already. + """ + if self.task_current is None: + self.task_next() + def cron(self, cb = None): """ Periodic tasks. @@ -330,53 +368,42 @@ class main(object): rpki.log.trace() - def loop(iterator, s): - self.checkpoint() - s.cron(iterator) + now = rpki.sundial.now() + + rpki.log.debug("Starting cron run") def done(): self.sql.sweep() self.cron_timeout = None rpki.log.info("Finished cron run started at %s" % now) - if not self.use_internal_cron: + if cb is not None: cb() - def lose(e): - self.cron_timeout = None - if self.use_internal_cron: - rpki.log.traceback() - else: - raise - - try: - now = rpki.sundial.now() - - assert self.use_internal_cron or self.cron_timeout is None - - if self.use_internal_cron: + completion = rpki.rpkid_tasks.CompletionHandler(done) + for s in rpki.left_right.self_elt.sql_fetch_all(self): + s.schedule_cron_tasks(completion) + nothing_queued = completion.count == 0 - if self.cron_timeout is not None and self.cron_timeout < now: - rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) - self.cron_timeout = None + assert self.use_internal_cron or self.cron_timeout is None - when = now + self.cron_period - rpki.log.debug("Scheduling next cron run at %s" % when) - self.cron_timer.set(when) + if self.cron_timeout is not None and self.cron_timeout < now: + rpki.log.warn("cron keepalive threshold %s has expired, breaking lock" % self.cron_timeout) + self.cron_timeout = None - if self.cron_timeout is not None: - rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - return + if self.use_internal_cron: + when = now + self.cron_period + rpki.log.debug("Scheduling next cron run at %s" % when) + self.cron_timer.set(when) - self.sql.ping() + if self.cron_timeout is None: self.checkpoint(self.use_internal_cron) - rpki.async.iterator(rpki.left_right.self_elt.sql_fetch_all(self), loop, done) + self.task_run() - except (rpki.async.ExitNow, SystemExit): - self.cron_timeout = None - raise + elif self.use_internal_cron: + rpki.log.warn("cron already running, keepalive will expire at %s" % self.cron_timeout) - except Exception, e: - lose(e) + if nothing_queued: + done() def cronjob_handler(self, query, path, cb): """ @@ -391,6 +418,7 @@ class main(object): if self.use_internal_cron: cb(500, reason = "Running cron internally") else: + rpki.log.debug("Starting externally triggered cron") self.cron(done) class ca_obj(rpki.sql.sql_persistent): @@ -403,15 +431,22 @@ class ca_obj(rpki.sql.sql_persistent): "ca_id", "last_crl_sn", ("next_crl_update", rpki.sundial.datetime), - "last_issued_sn", "last_manifest_sn", + "last_issued_sn", + "last_manifest_sn", ("next_manifest_update", rpki.sundial.datetime), - "sia_uri", "parent_id", "parent_resource_class") + "sia_uri", + "parent_id", + "parent_resource_class") last_crl_sn = 0 last_issued_sn = 0 last_manifest_sn = 0 + def __repr__(self): + return rpki.log.log_repr(self, repr(self.parent), self.parent_resource_class) + @property + @rpki.sql.cache_reference def parent(self): """ Fetch parent object to which this CA object links. @@ -447,6 +482,13 @@ class ca_obj(rpki.sql.sql_persistent): return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND state = 'deprecated'", (self.ca_id,)) @property + def active_or_deprecated_ca_details(self): + """ + Fetch active and deprecated ca_details for this CA, if any. + """ + return ca_detail_obj.sql_fetch_where(self.gctx, "ca_id = %s AND (state = 'active' OR state = 'deprecated')", (self.ca_id,)) + + @property def revoked_ca_details(self): """ Fetch revoked ca_details for this CA, if any. @@ -473,7 +515,11 @@ class ca_obj(rpki.sql.sql_persistent): sia_uri = parent.sia_base if not sia_uri.endswith("/"): raise rpki.exceptions.BadURISyntax, "SIA URI must end with a slash: %s" % sia_uri - return sia_uri + str(self.ca_id) + "/" + # With luck this can go away sometime soon. + if self.gctx.merge_publication_directories: + return sia_uri + else: + return sia_uri + str(self.ca_id) + "/" def check_for_updates(self, parent, rc, cb, eb): """ @@ -588,6 +634,7 @@ class ca_obj(rpki.sql.sql_persistent): callback = cb, errback = eb) + rpki.log.debug("Sending issue request to %r from %r" % (parent, self.create)) rpki.up_down.issue_pdu.query(parent, self, ca_detail, done, eb) def delete(self, parent, callback): @@ -663,6 +710,7 @@ class ca_obj(rpki.sql.sql_persistent): callback = cb, errback = eb) + rpki.log.debug("Sending issue request to %r from %r" % (parent, self.rekey)) rpki.up_down.issue_pdu.query(parent, self, new_detail, done, eb) def revoke(self, cb, eb, revoke_all = False): @@ -716,6 +764,11 @@ class ca_detail_obj(rpki.sql.sql_persistent): crl_published = None manifest_published = None latest_ca_cert = None + latest_crl = None + latest_manifest = None + + def __repr__(self): + return rpki.log.log_repr(self, repr(self.ca), self.state, self.ca_cert_uri) def sql_decode(self, vals): """ @@ -726,6 +779,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): assert self.manifest_public_key is None or self.manifest_private_key_id is None or self.manifest_public_key.get_DER() == self.manifest_private_key_id.get_public_DER() @property + @rpki.sql.cache_reference def ca(self): """ Fetch CA object to which this ca_detail links. @@ -815,14 +869,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): child_cert.reissue(ca_detail = self, publisher = publisher) for roa in predecessor.roas: roa.regenerate(publisher = publisher) - - # Need to do something to regenerate ghostbusters here? - # Yes, I suspect so, since presumably we want the ghostbuster to - # be issued by the new ca_detail at this point. But check code. - - if predecessor.ghostbusters: - rpki.log.warn("Probably should be regenerating Ghostbusters %r here" % ghostbuster) - + for ghostbuster in predecessor.ghostbusters: + ghostbuster.regenerate(publisher = publisher) + predecessor.generate_crl(publisher = publisher) + predecessor.generate_manifest(publisher = publisher) publisher.call_pubd(callback, errback) @@ -898,10 +948,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): nextUpdate = rpki.sundial.now() if self.latest_manifest is not None: - try: - self.latest_manifest.get_content() - except rpki.exceptions.CMSContentNotSet: - self.latest_manifest.extract() + self.latest_manifest.extract_if_needed() nextUpdate = nextUpdate.later(self.latest_manifest.getNextUpdate()) if self.latest_crl is not None: @@ -942,7 +989,10 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ def issued(issue_response): - self.latest_ca_cert = issue_response.payload.classes[0].certs[0].cert + new_ca_cert = issue_response.payload.classes[0].certs[0].cert + if self.latest_ca_cert != new_ca_cert: + self.latest_ca_cert = new_ca_cert + self.sql_mark_dirty() new_resources = self.latest_ca_cert.get_3779resources() publisher = publication_queue() @@ -952,11 +1002,12 @@ class ca_detail_obj(rpki.sql.sql_persistent): if sia_uri_changed or child_resources.oversized(new_resources): child_cert.reissue( ca_detail = self, - resources = child_resources.intersection(new_resources), + resources = child_resources & new_resources, publisher = publisher) publisher.call_pubd(callback, errback) + rpki.log.debug("Sending issue request to %r from %r" % (parent, self.update)) rpki.up_down.issue_pdu.query(parent, ca, self, issued, errback) @classmethod @@ -994,7 +1045,6 @@ class ca_detail_obj(rpki.sql.sql_persistent): notAfter = self.latest_ca_cert.getNotAfter(), is_ca = False) - def generate_manifest_cert(self): """ Generate a new manifest certificate for this ca_detail. @@ -1005,7 +1055,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): ca = self.ca, resources = resources, subject_key = self.manifest_public_key, - sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.manifest_uri)),)) + sia = (None, None, self.manifest_uri)) def issue(self, ca, child, subject_key, sia, resources, publisher, child_cert = None): """ @@ -1015,6 +1065,8 @@ class ca_detail_obj(rpki.sql.sql_persistent): containing the newly issued cert. """ + self.check_failed_publication(publisher) + assert child_cert is None or child_cert.child_id == child.child_id cert = self.latest_ca_cert.issue( @@ -1036,6 +1088,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): rpki.log.debug("Created new child_cert %r" % child_cert) else: child_cert.cert = cert + del child_cert.ca_detail child_cert.ca_detail_id = self.ca_detail_id rpki.log.debug("Reusing existing child_cert %r" % child_cert) @@ -1058,6 +1111,8 @@ class ca_detail_obj(rpki.sql.sql_persistent): new CRL is needed. """ + self.check_failed_publication(publisher) + ca = self.ca parent = ca.parent crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) @@ -1071,7 +1126,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): if now > revoked_cert.expires + crl_interval: revoked_cert.sql_delete() else: - certlist.append((revoked_cert.serial, revoked_cert.revoked.toASN1tuple(), ())) + certlist.append((revoked_cert.serial, revoked_cert.revoked)) certlist.sort() self.latest_crl = rpki.x509.CRL.generate( @@ -1100,22 +1155,30 @@ class ca_detail_obj(rpki.sql.sql_persistent): Generate a new manifest for this ca_detail. """ + self.check_failed_publication(publisher) + ca = self.ca parent = ca.parent crl_interval = rpki.sundial.timedelta(seconds = parent.self.crl_interval) now = rpki.sundial.now() + uri = self.manifest_uri if nextUpdate is None: nextUpdate = now + crl_interval if self.latest_manifest_cert is None or self.latest_manifest_cert.getNotAfter() < nextUpdate: + rpki.log.debug("Generating EE certificate for %s" % uri) self.generate_manifest_cert() + rpki.log.debug("Latest CA cert notAfter %s, new %s EE notAfter %s" % ( + self.latest_ca_cert.getNotAfter(), uri, self.latest_manifest_cert.getNotAfter())) + rpki.log.debug("Constructing manifest object list for %s" % uri) objs = [(self.crl_uri_tail, self.latest_crl)] objs.extend((c.uri_tail, c.cert) for c in self.child_certs) objs.extend((r.uri_tail, r.roa) for r in self.roas if r.roa is not None) objs.extend((g.uri_tail, g.ghostbuster) for g in self.ghostbusters) + rpki.log.debug("Building manifest object %s" % uri) self.latest_manifest = rpki.x509.SignedManifest.build( serial = ca.next_manifest_number(), thisUpdate = now, @@ -1124,10 +1187,11 @@ class ca_detail_obj(rpki.sql.sql_persistent): keypair = self.manifest_private_key_id, certs = self.latest_manifest_cert) + rpki.log.debug("Manifest generation took %s" % (rpki.sundial.now() - now)) self.manifest_published = rpki.sundial.now() self.sql_mark_dirty() - publisher.publish(cls = rpki.publication.manifest_elt, uri = self.manifest_uri, obj = self.latest_manifest, repository = parent.repository, + publisher.publish(cls = rpki.publication.manifest_elt, uri = uri, obj = self.latest_manifest, repository = parent.repository, handler = self.manifest_published_callback) def manifest_published_callback(self, pdu): @@ -1144,6 +1208,7 @@ class ca_detail_obj(rpki.sql.sql_persistent): """ publisher = publication_queue() + self.check_failed_publication(publisher) for roa in self.roas: roa.regenerate(publisher, fast = True) for ghostbuster in self.ghostbusters: @@ -1152,6 +1217,48 @@ class ca_detail_obj(rpki.sql.sql_persistent): child_cert.reissue(self, publisher, force = True) publisher.call_pubd(cb, eb) + def check_failed_publication(self, publisher): + """ + Check for failed publication of objects issued by this ca_detail. + + All publishable objects have timestamp fields recording time of + last attempted publication, and callback methods which clear these + timestamps once publication has succeeded. Our task here is to + look for objects issued by this ca_detail which have timestamps + set (indicating that they have not been published) and for which + the timestamps are not very recent (for some definition of very + recent -- intent is to allow a bit of slack in case pubd is just + being slow). In such cases, we want to retry publication. + + As an optimization, we can probably just check the manifest and + CRL; if these are up to date we probably don't need to check other + objects (which would involve several more SQL queries). Not sure + yet whether this optimization is worthwhile. + + At the moment, we only check CRL and manifest, full stop. This + should be expanded to check other objects, but that would take + longer and I have a user who needs this fix today. + """ + + stale = rpki.sundial.now() - rpki.sundial.timedelta(seconds = 60) + repository = self.ca.parent.repository + + if self.latest_crl is not None and self.crl_published is not None and self.crl_published < stale: + rpki.log.debug("Retrying publication for %s" % self.crl_uri) + publisher.publish(cls = rpki.publication.crl_elt, + uri = self.crl_uri, + obj = self.latest_crl, + repository = repository, + handler = self.crl_published_callback) + + if self.latest_manifest is not None and self.manifest_published is not None and self.manifest_published < stale: + rpki.log.debug("Retrying publication for %s" % self.manifest_uri) + publisher.publish(cls = rpki.publication.manifest_elt, + uri = self.manifest_uri, + obj = self.latest_manifest, + repository = repository, + handler = self.manifest_published_callback) + class child_cert_obj(rpki.sql.sql_persistent): """ Certificate that has been issued to a child. @@ -1166,6 +1273,9 @@ class child_cert_obj(rpki.sql.sql_persistent): "ski", ("published", rpki.sundial.datetime)) + def __repr__(self): + return rpki.log.log_repr(self, self.uri) + def __init__(self, gctx = None, child_id = None, ca_detail_id = None, cert = None): """ Initialize a child_cert_obj. @@ -1180,19 +1290,28 @@ class child_cert_obj(rpki.sql.sql_persistent): self.sql_mark_dirty() @property + @rpki.sql.cache_reference def child(self): """ Fetch child object to which this child_cert object links. """ return rpki.left_right.child_elt.sql_fetch(self.gctx, self.child_id) - + @property + @rpki.sql.cache_reference def ca_detail(self): """ Fetch ca_detail object to which this child_cert object links. """ return ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) + @ca_detail.deleter + def ca_detail(self): + try: + del self._ca_detail + except AttributeError: + pass + @property def uri_tail(self): """ @@ -1353,6 +1472,9 @@ class revoked_cert_obj(rpki.sql.sql_persistent): ("revoked", rpki.sundial.datetime), ("expires", rpki.sundial.datetime)) + def __repr__(self): + return rpki.log.log_repr(self, repr(self.ca_detail), self.serial, self.revoked) + def __init__(self, gctx = None, serial = None, revoked = None, expires = None, ca_detail_id = None): """ Initialize a revoked_cert_obj. @@ -1367,6 +1489,7 @@ class revoked_cert_obj(rpki.sql.sql_persistent): self.sql_mark_dirty() @property + @rpki.sql.cache_reference def ca_detail(self): """ Fetch ca_detail object to which this revoked_cert_obj links. @@ -1406,6 +1529,7 @@ class roa_obj(rpki.sql.sql_persistent): published = None @property + @rpki.sql.cache_reference def self(self): """ Fetch self object to which this roa_obj links. @@ -1413,12 +1537,20 @@ class roa_obj(rpki.sql.sql_persistent): return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) @property + @rpki.sql.cache_reference def ca_detail(self): """ Fetch ca_detail object to which this roa_obj links. """ return rpki.rpkid.ca_detail_obj.sql_fetch(self.gctx, self.ca_detail_id) + @ca_detail.deleter + def ca_detail(self): + try: + del self._ca_detail + except AttributeError: + pass + def sql_fetch_hook(self): """ Extra SQL fetch actions for roa_obj -- handle prefix lists. @@ -1569,12 +1701,13 @@ class roa_obj(rpki.sql.sql_persistent): resources = rpki.resource_set.resource_bag(v4 = v4, v6 = v6) keypair = rpki.x509.RSA.generate() + del self.ca_detail self.ca_detail_id = ca_detail.ca_detail_id self.cert = ca_detail.issue_ee( ca = ca, resources = resources, subject_key = keypair.get_RSApublic(), - sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri_from_key(keypair))),)) + sia = (None, None, self.uri_from_key(keypair))) self.roa = rpki.x509.ROA.build(self.asn, self.ipv4, self.ipv6, keypair, (self.cert,)) self.published = rpki.sundial.now() self.sql_store() @@ -1685,7 +1818,11 @@ class ghostbuster_obj(rpki.sql.sql_persistent): published = None vcard = None + def __repr__(self): + return rpki.log.log_repr(self, self.uri) + @property + @rpki.sql.cache_reference def self(self): """ Fetch self object to which this ghostbuster_obj links. @@ -1693,6 +1830,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): return rpki.left_right.self_elt.sql_fetch(self.gctx, self.self_id) @property + @rpki.sql.cache_reference def ca_detail(self): """ Fetch ca_detail object to which this ghostbuster_obj links. @@ -1748,7 +1886,7 @@ class ghostbuster_obj(rpki.sql.sql_persistent): ca = ca, resources = resources, subject_key = keypair.get_RSApublic(), - sia = ((rpki.oids.name2oid["id-ad-signedObject"], ("uri", self.uri_from_key(keypair))),)) + sia = (None, None, self.uri_from_key(keypair))) self.ghostbuster = rpki.x509.Ghostbuster.build(self.vcard, keypair, (self.cert,)) self.published = rpki.sundial.now() self.sql_store() @@ -1879,6 +2017,7 @@ class publication_queue(object): def call_pubd(self, cb, eb): def loop(iterator, rid): + rpki.log.debug("Calling pubd[%r]" % self.repositories[rid]) self.repositories[rid].call_pubd(iterator, eb, self.msgs[rid], self.handlers) def done(): self.clear() @@ -1888,3 +2027,7 @@ class publication_queue(object): @property def size(self): return sum(len(self.msgs[rid]) for rid in self.repositories) + + def empty(self): + assert (not self.msgs) == (self.size == 0) + return not self.msgs |