diff options
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r-- | rpki/rpkid.py | 442 |
1 files changed, 216 insertions, 226 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index c6b1001e..896fe0be 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -28,20 +28,27 @@ import random import logging import argparse +import tornado.gen +import tornado.web +import tornado.ioloop +import tornado.httputil +import tornado.httpclient +import tornado.httpserver + +from lxml.etree import Element, SubElement, tostring as ElementToString + import rpki.resource_set import rpki.up_down import rpki.left_right import rpki.x509 -import rpki.http import rpki.config import rpki.exceptions import rpki.relaxng import rpki.log -import rpki.async import rpki.daemonize + import rpki.rpkid_tasks -from lxml.etree import Element, SubElement, tostring as ElementToString logger = logging.getLogger(__name__) @@ -105,12 +112,10 @@ class main(object): logger.info("Running in profile mode with output to %s", self.profile) logger.debug("Initializing Django") - import django django.setup() logger.debug("Initializing rpkidb...") - global rpki # pylint: disable=W0602 import rpki.rpkidb # pylint: disable=W0621 @@ -127,41 +132,46 @@ class main(object): self.http_server_host = self.cfg.get("server-host", "") self.http_server_port = self.cfg.getint("server-port") - self.publication_kludge_base = self.cfg.get("publication-kludge-base", "publication/") - self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), self.cfg.getint("initial-delay-max", 120)) # Should be much longer in production - self.cron_period = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-period", 120)) - self.cron_keepalive = rpki.sundial.timedelta(seconds = self.cfg.getint("cron-keepalive", 0)) - if not self.cron_keepalive: - self.cron_keepalive = self.cron_period * 4 - self.cron_timeout = None + self.cron_period = self.cfg.getint("cron-period", 120) - self.start_cron() + if self.use_internal_cron: + logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) + tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) - rpki.http.server( - host = self.http_server_host, - port = self.http_server_port, - handlers = (("/left-right", self.left_right_handler), - ("/up-down/", self.up_down_handler, rpki.up_down.allowed_content_types), - ("/cronjob", self.cronjob_handler))) + rpkid = self - def start_cron(self): - """ - Start clock for rpkid's internal cron process. - """ + class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.left_right_handler(self) + + class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self, tenant_handle, child_handle): + yield rpkid.up_down_handler(self, tenant_handle, child_handle) + + class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.cronjob_handler(self) + + application = tornado.web.Application(( + (r"/left-right", LeftRightHandler), + (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler), + (r"/cronjob", CronjobHandler))) + + application.listen( + address = self.http_server_host, + port = self.http_server_port) + + tornado.ioloop.IOLoop.current().start() - if self.use_internal_cron: - self.cron_timer = rpki.async.timer(handler = self.cron) - when = rpki.sundial.now() + rpki.sundial.timedelta(seconds = self.initial_delay) - logger.debug("Scheduling initial cron pass at %s", when) - self.cron_timer.set(when) - else: - logger.debug("Not using internal clock, start_cron() call ignored") @staticmethod def _compose_left_right_query(): @@ -172,70 +182,86 @@ class main(object): return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, type = "query", version = rpki.left_right.version) - def irdb_query(self, q_msg, callback, errback): + + @tornado.gen.coroutine + def irdb_query(self, q_msg): """ Perform an IRDB callback query. """ - try: - q_tags = set(q_pdu.tag for q_pdu in q_msg) + q_tags = set(q_pdu.tag for q_pdu in q_msg) - q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) + q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) - def unwrap(r_der): - try: - r_cms = rpki.left_right.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) - self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - #rpki.left_right.check_response(r_msg) - if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): - raise rpki.exceptions.BadIRDBReply( - "Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - callback(r_msg) - except Exception, e: - errback(e) + http_client = tornado.httpclient.AsyncHTTPClient() - rpki.http.client( - url = self.irdb_url, - msg = q_der, - callback = unwrap, - errback = errback) + http_request = tornado.httpclient.HTTPRequest( + url = self.irdb_url, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.left_right.content_type }) - except Exception, e: - errback(e) + http_response = yield http_client.fetch(http_request) + + # Tornado already checked http_response.code for us + + content_type = http_response.headers.get("Content-Type") + + if content_type not in rpki.left_right.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type)) + + r_der = http_response.body + + r_cms = rpki.left_right.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) + self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - def irdb_query_child_resources(self, tenant_handle, child_handle, callback, errback): + #rpki.left_right.check_response(r_msg) + + if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): + raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) + + raise tornado.gen.Return(r_msg) + + + @tornado.gen.coroutine + def irdb_query_child_resources(self, tenant_handle, child_handle): """ Ask IRDB about a child's resources. """ q_msg = self._compose_left_right_query() - SubElement(q_msg, rpki.left_right.tag_list_resources, - tenant_handle = tenant_handle, child_handle = child_handle) + SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle) + + r_msg = yield self.irdb_query(q_msg) - def done(r_msg): - if len(r_msg) != 1: - raise rpki.exceptions.BadIRDBReply( - "Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) - callback(rpki.resource_set.resource_bag( - asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), - v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), - v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), - valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until")))) + if len(r_msg) != 1: + raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) - self.irdb_query(q_msg, done, errback) + bag = rpki.resource_set.resource_bag( + asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), + v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), + v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), + valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))) - def irdb_query_roa_requests(self, tenant_handle, callback, errback): + raise tornado.gen.Return(bag) + + + @tornado.gen.coroutine + def irdb_query_roa_requests(self, tenant_handle): """ Ask IRDB about self's ROA requests. """ q_msg = self._compose_left_right_query() SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) + - def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles, callback, errback): + @tornado.gen.coroutine + def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): """ Ask IRDB about self's ghostbuster record requests. """ @@ -244,16 +270,20 @@ class main(object): for parent_handle in parent_handles: SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, tenant_handle = tenant_handle, parent_handle = parent_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - def irdb_query_ee_certificate_requests(self, tenant_handle, callback, errback): + @tornado.gen.coroutine + def irdb_query_ee_certificate_requests(self, tenant_handle): """ Ask IRDB about self's EE certificate requests. """ q_msg = self._compose_left_right_query() SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle) - self.irdb_query(q_msg, callback, errback) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) + @property def left_right_models(self): @@ -273,6 +303,7 @@ class main(object): rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } return self._left_right_models + @property def left_right_trivial_handlers(self): """ @@ -287,6 +318,7 @@ class main(object): rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } return self._left_right_trivial_handlers + def handle_list_published_objects(self, q_pdu, r_msg): """ <list_published_objects/> server. @@ -317,6 +349,7 @@ class main(object): SubElement(r_msg, rpki.left_right.tag_list_published_objects, uri = c.uri, **kw).text = c.cert.get_Base64() + def handle_list_received_resources(self, q_pdu, r_msg): """ <list_received_resources/> server. @@ -344,28 +377,28 @@ class main(object): r_pdu.set("tag", msg_tag) - def left_right_handler(self, query, path, cb): + @tornado.gen.coroutine + def left_right_handler(self, handler): """ - Process one left-right PDU. + Process one left-right message. """ - # This handles five persistent classes (self, bsc, parent, child, - # repository) and two simple queries (list_published_objects and - # list_received_resources). The former probably need to dispatch - # via methods to the corresponding model classes; the latter - # probably just become calls to ordinary methods of this - # (rpki.rpkid.main) class. - # - # Need to clone logic from rpki.pubd.main.control_handler(). - logger.debug("Entering left_right_handler()") + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.left_right.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + raise tornado.gen.Return + + handler.set_header("Content-Type", rpki.left_right.content_type) + try: - q_cms = rpki.left_right.cms_msg(DER = query) + q_cms = rpki.left_right.cms_msg(DER = handler.request.body) q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, type = "reply", version = rpki.left_right.version) - self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, path) + self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path) assert q_msg.tag.startswith(rpki.left_right.xmlns) assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) @@ -376,14 +409,37 @@ class main(object): if q_msg.get("type") != "query": raise rpki.exceptions.BadQuery("Message type is not query") - def done(): - cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + for q_pdu in q_msg: - def loop(iterator, q_pdu): + try: + action = q_pdu.get("action") + model = self.left_right_models.get(q_pdu.tag) + + if q_pdu.tag in self.left_right_trivial_handlers: + self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) + + elif action in ("get", "list"): + for obj in model.objects.xml_list(q_pdu): + obj.xml_template.encode(obj, q_pdu, r_msg) + + elif action == "destroy": + obj = model.objects.xml_get_for_delete(q_pdu) + yield obj.xml_pre_delete_hook(self) + obj.delete() + obj.xml_template.acknowledge(obj, q_pdu, r_msg) - logger.debug("left_right_handler():loop(%r)", q_pdu) + elif action in ("create", "set"): + obj = model.objects.xml_get_or_create(q_pdu) + obj.xml_template.decode(obj, q_pdu) + obj.xml_pre_save_hook(q_pdu) + obj.save() + yield obj.xml_post_save_hook(self, q_pdu) + obj.xml_template.acknowledge(obj, q_pdu, r_msg) - def fail(e): + else: + raise rpki.exceptions.BadQuery("Unrecognized action %r" % action) + + except Exception, e: if not isinstance(e, rpki.exceptions.NotFound): logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) error_tenant_handle = q_pdu.get("tenant_handle") @@ -394,102 +450,50 @@ class main(object): r_pdu.set("tag", error_tag) if error_tenant_handle is not None: r_pdu.set("tenant_handle", error_tenant_handle) - cb(200, body = rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + break - try: - if q_pdu.tag in self.left_right_trivial_handlers: - logger.debug("left_right_handler(): trivial handler") - self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) - iterator() - - else: - action = q_pdu.get("action") - model = self.left_right_models[q_pdu.tag] - - logger.debug("left_right_handler(): action %s model %r", action, model) - - if action in ("get", "list"): - logger.debug("left_right_handler(): get/list") - for obj in model.objects.xml_list(q_pdu): - logger.debug("left_right_handler(): get/list: encoding %r", obj) - obj.xml_template.encode(obj, q_pdu, r_msg) - iterator() - - elif action == "destroy": - def destroy_cb(): - obj.delete() - obj.xml_template.acknowledge(obj, q_pdu, r_msg) - iterator() - logger.debug("left_right_handler(): destroy") - obj = model.objects.xml_get_for_delete(q_pdu) - obj.xml_pre_delete_hook(self, destroy_cb, fail) - - elif action in ("create", "set"): - def create_set_cb(): - obj.xml_template.acknowledge(obj, q_pdu, r_msg) - iterator() - logger.debug("left_right_handler(): create/set") - obj = model.objects.xml_get_or_create(q_pdu) - obj.xml_template.decode(obj, q_pdu) - obj.xml_pre_save_hook(q_pdu) - obj.save() - obj.xml_post_save_hook(self, q_pdu, create_set_cb, fail) - - else: - raise rpki.exceptions.BadQuery - - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - fail(e) - - rpki.async.iterator(q_msg, loop, done) - - except (rpki.async.ExitNow, SystemExit): - raise + handler.set_status(200) + handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + logger.debug("Normal exit from left_right_handler()") except Exception, e: logger.exception("Unhandled exception serving left-right request") - cb(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.finish() - up_down_url_regexp = re.compile("/up-down/([-A-Z0-9_]+)/([-A-Z0-9_]+)$", re.I) - def up_down_handler(self, q_der, path, cb): + @tornado.gen.coroutine + def up_down_handler(self, handler, tenant_handle, child_handle): """ Process one up-down PDU. """ - def done(r_der): - cb(200, body = r_der) + logger.debug("Entering up_down_handler()") + + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.up_down.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + raise tornado.gen.Return try: - match = self.up_down_url_regexp.search(path) - if match is None: - raise rpki.exceptions.BadContactURL("Bad URL path received in up_down_handler(): %s" % path) - tenant_handle, child_handle = match.groups() - try: - child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) - except rpki.rpkidb.models.Child.DoesNotExist: - raise rpki.exceptions.ChildNotFound("Could not find child %s of self %s in up_down_handler()" % ( - child_handle, tenant_handle)) - child.serve_up_down(self, q_der, done) - except (rpki.async.ExitNow, SystemExit): - raise - except (rpki.exceptions.ChildNotFound, rpki.exceptions.BadContactURL), e: - logger.warning(str(e)) - cb(400, reason = str(e)) + child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) + q_der = handler.request.body + r_der = yield child.serve_up_down(self, q_der) + handler.set_header("Content-Type", rpki.up_down.content_type) + handler.set_status(200) + handler.finish(r_der) + + except rpki.rpkidb.models.Child.DoesNotExist: + logger.info("Child %r of tenant %r not found", child_handle, tenant_handle) + handler.set_status(400, "Child %r not found" % child_handle) + handler.finish() + except Exception, e: logger.exception("Unhandled exception processing up-down request") - cb(400, reason = "Could not process PDU: %s" % e) - - def checkpoint(self, force = False): - """ - Record that we were still alive when we got here, by resetting - keepalive timer. - """ + handler.set_status(400, "Could not process PDU: %s" % e) + handler.finish() - if force or self.cron_timeout is not None: - self.cron_timeout = rpki.sundial.now() + self.cron_keepalive def task_add(self, task): """ @@ -504,11 +508,10 @@ class main(object): logger.debug("Task %r was already in the task queue", task) return False + def task_next(self): """ - Pull next task from the task queue and put it the deferred event - queue (we don't want to run it directly, as that could eventually - blow out our call stack). + Schedule next task in the queue to be run. """ try: @@ -516,77 +519,67 @@ class main(object): except IndexError: self.task_current = None else: - rpki.async.event_defer(self.task_current) + tornado.ioloop.IOLoop.current().add_callback(self.task_current) + def task_run(self): """ - Run first task on the task queue, unless one is running already. + Schedule first queued task unless a task is running already. """ if self.task_current is None: self.task_next() - def cron(self, cb = None): + + @tornado.gen.coroutine + def cron_loop(self): """ - Periodic tasks. + Asynchronous infinite loop to drive cron cycle. """ - now = rpki.sundial.now() + assert self.use_internal_cron + yield tornado.gen.sleep(self.initial_delay) + while True: + yield self.cron_run() + yield tornado.gen.sleep(self.cron_period) - logger.debug("Starting cron run") - def done(): - self.cron_timeout = None - logger.info("Finished cron run started at %s", now) - if cb is not None: - cb() + @tornado.gen.coroutine + def cron_run(self): + """ + Periodic tasks. + """ - completion = rpki.rpkid_tasks.CompletionHandler(done) + now = rpki.sundial.now() + logger.debug("Starting cron run") + futures = [] try: - selves = rpki.rpkidb.models.Tenant.objects.all() - except Exception: - logger.exception("Error pulling selves from SQL, maybe SQL server is down?") + tenants = rpki.rpkidb.models.Tenant.objects.all() + except: + logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") else: - for s in selves: - s.schedule_cron_tasks(self, completion) - nothing_queued = completion.count == 0 - - assert self.use_internal_cron or self.cron_timeout is None - - if self.cron_timeout is not None and self.cron_timeout < now: - logger.warning("cron keepalive threshold %s has expired, breaking lock", self.cron_timeout) - self.cron_timeout = None - - if self.use_internal_cron: - when = now + self.cron_period - logger.debug("Scheduling next cron run at %s", when) - self.cron_timer.set(when) - - if self.cron_timeout is None: - self.checkpoint(self.use_internal_cron) - self.task_run() - - elif self.use_internal_cron: - logger.warning("cron already running, keepalive will expire at %s", self.cron_timeout) + for tenant in tenants: + futures.extend(condition.wait() for condition in tenant.schedule_cron_tasks(self)) + if futures: + yield futures + logger.info("Finished cron run started at %s", now) - if nothing_queued: - done() - def cronjob_handler(self, query, path, cb): + @tornado.gen.coroutine + def cronjob_handler(self, handler): """ External trigger for periodic tasks. This is somewhat obsolete now that we have internal timers, but the test framework still uses it. """ - def done(): - cb(200, body = "OK") - if self.use_internal_cron: - cb(500, reason = "Running cron internally") + handler.set_status(500, "Running cron internally") else: logger.debug("Starting externally triggered cron") - self.cron(done) + yield self.cron() + handler.set_status(200) + handler.finish() class publication_queue(object): @@ -661,19 +654,16 @@ class publication_queue(object): if self.replace: self.uris[uri] = pdu - def call_pubd(self, cb, eb): - def loop(iterator, rid): + @tornado.gen.coroutine + def call_pubd(self): + for rid in self.repositories: logger.debug("Calling pubd[%r]", self.repositories[rid]) - self.repositories[rid].call_pubd(self.rpkid, iterator, eb, self.msgs[rid], self.handlers) - def done(): - self.clear() - cb() - rpki.async.iterator(self.repositories, loop, done) + yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + self.clear() @property def size(self): return sum(len(self.msgs[rid]) for rid in self.repositories) def empty(self): - assert (not self.msgs) == (self.size == 0), "Assertion failure: not self.msgs: %r, self.size %r" % (not self.msgs, self.size) return not self.msgs |