diff options
Diffstat (limited to 'rpki/rpkid.py')
-rw-r--r-- | rpki/rpkid.py | 1218 |
1 files changed, 609 insertions, 609 deletions
diff --git a/rpki/rpkid.py b/rpki/rpkid.py index da6141ea..c0ddbd58 100644 --- a/rpki/rpkid.py +++ b/rpki/rpkid.py @@ -56,681 +56,681 @@ logger = logging.getLogger(__name__) class main(object): - """ - Main program for rpkid. - """ - - def __init__(self): - - os.environ.update(TZ = "UTC", - DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid") - time.tzset() - - self.irdbd_cms_timestamp = None - self.irbe_cms_timestamp = None - - self.task_queue = [] - self.task_event = tornado.locks.Event() - - self.http_client_serialize = weakref.WeakValueDictionary() - - parser = argparse.ArgumentParser(description = __doc__) - parser.add_argument("-c", "--config", - help = "override default location of configuration file") - parser.add_argument("-f", "--foreground", action = "store_true", - help = "do not daemonize") - parser.add_argument("--pidfile", - help = "override default location of pid file") - parser.add_argument("--profile", - help = "enable profiling, saving data to PROFILE") - rpki.log.argparse_setup(parser) - args = parser.parse_args() - - self.profile = args.profile - - rpki.log.init("rpkid", args) - - self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid") - self.cfg.set_global_flags() - - if not args.foreground: - rpki.daemonize.daemon(pidfile = args.pidfile) - - if self.profile: - import cProfile - prof = cProfile.Profile() - try: - prof.runcall(self.main) - finally: - prof.dump_stats(self.profile) - logger.info("Dumped profile data to %s", self.profile) - else: - self.main() - - def main(self): - - startup_msg = self.cfg.get("startup-message", "") - if startup_msg: - logger.info(startup_msg) - - if self.profile: - logger.info("Running in profile mode with output to %s", self.profile) - - logger.debug("Initializing Django") - import django - django.setup() - - logger.debug("Initializing rpkidb...") - global rpki # pylint: disable=W0602 - import rpki.rpkidb # pylint: disable=W0621 - - logger.debug("Initializing rpkidb...done") - - self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) - self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert")) - self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) - self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert")) - self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key")) - - self.irdb_url = self.cfg.get("irdb-url") - - self.http_server_host = self.cfg.get("server-host", "") - self.http_server_port = self.cfg.getint("server-port") - - self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) - - self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), - self.cfg.getint("initial-delay-max", 120)) - - # Should be much longer in production - self.cron_period = self.cfg.getint("cron-period", 120) - - if self.use_internal_cron: - logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) - tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) - - logger.debug("Scheduling task loop") - tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop) - - rpkid = self - - class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 - @tornado.gen.coroutine - def post(self): - yield rpkid.left_right_handler(self) - - class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223 - @tornado.gen.coroutine - def post(self, tenant_handle, child_handle): # pylint: disable=W0221 - yield rpkid.up_down_handler(self, tenant_handle, child_handle) - - class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223 - @tornado.gen.coroutine - def post(self): - yield rpkid.cronjob_handler(self) - - application = tornado.web.Application(( - (r"/left-right", LeftRightHandler), - (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler), - (r"/cronjob", CronjobHandler))) - - application.listen( - address = self.http_server_host, - port = self.http_server_port) - - tornado.ioloop.IOLoop.current().start() - - def task_add(self, tasks): """ - Add zero or more tasks to the task queue. + Main program for rpkid. """ - for task in tasks: - if task in self.task_queue: - logger.debug("Task %r already queued", task) - else: - logger.debug("Adding %r to task queue", task) - self.task_queue.append(task) + def __init__(self): - def task_run(self): - """ - Kick the task loop to notice recently added tasks. - """ + os.environ.update(TZ = "UTC", + DJANGO_SETTINGS_MODULE = "rpki.django_settings.rpkid") + time.tzset() - self.task_event.set() + self.irdbd_cms_timestamp = None + self.irbe_cms_timestamp = None - @tornado.gen.coroutine - def task_loop(self): - """ - Asynchronous infinite loop to run background tasks. + self.task_queue = [] + self.task_event = tornado.locks.Event() - This code is a bit finicky, because it's managing a collection of - Future objects which are running independently of the control flow - here, and the wave function doesn't collapse until we do a yield. + self.http_client_serialize = weakref.WeakValueDictionary() - So we keep this brutally simple and don't try to hide too much of - it in the AbstractTask class. For similar reasons, AbstractTask - sets aside a .future instance variable for this method's use. - """ + parser = argparse.ArgumentParser(description = __doc__) + parser.add_argument("-c", "--config", + help = "override default location of configuration file") + parser.add_argument("-f", "--foreground", action = "store_true", + help = "do not daemonize") + parser.add_argument("--pidfile", + help = "override default location of pid file") + parser.add_argument("--profile", + help = "enable profiling, saving data to PROFILE") + rpki.log.argparse_setup(parser) + args = parser.parse_args() - logger.debug("Starting task loop") - task_event_future = None - - while True: - while None in self.task_queue: - self.task_queue.remove(None) - - futures = [] - for task in self.task_queue: - if task.future is None: - task.future = task.start() - futures.append(task.future) - if task_event_future is None: - task_event_future = self.task_event.wait() - futures.append(task_event_future) - iterator = tornado.gen.WaitIterator(*futures) - - while not iterator.done(): - yield iterator.next() - if iterator.current_future is task_event_future: - self.task_event.clear() - task_event_future = None - break - else: - task = self.task_queue[iterator.current_index] - task.future = None - waiting = task.waiting() - if not waiting: - self.task_queue[iterator.current_index] = None - for task in self.task_queue: - if task is not None and not task.runnable.is_set(): - logger.debug("Reenabling task %r", task) - task.runnable.set() - if waiting: - break - - @tornado.gen.coroutine - def cron_loop(self): - """ - Asynchronous infinite loop to drive cron cycle. - """ + self.profile = args.profile - logger.debug("cron_loop(): Starting") - assert self.use_internal_cron - logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay) - yield tornado.gen.sleep(self.initial_delay) - while True: - logger.debug("cron_loop(): Running") - yield self.cron_run() - logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) - yield tornado.gen.sleep(self.cron_period) - - @tornado.gen.coroutine - def cron_run(self): - """ - Schedule periodic tasks. - """ + rpki.log.init("rpkid", args) - now = rpki.sundial.now() - logger.debug("Starting cron run") - try: - tenants = rpki.rpkidb.models.Tenant.objects.all() - except: - logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") - else: - tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self)) - self.task_add(tasks) - futures = [task.wait() for task in tasks] - self.task_run() - yield futures - logger.info("Finished cron run started at %s", now) - - @tornado.gen.coroutine - def cronjob_handler(self, handler): - """ - External trigger to schedule periodic tasks. Obsolete for - produciton use, but portions of the test framework still use this. - """ + self.cfg = rpki.config.parser(set_filename = args.config, section = "rpkid") + self.cfg.set_global_flags() - if self.use_internal_cron: - handler.set_status(500, "Running cron internally") - else: - logger.debug("Starting externally triggered cron") - yield self.cron() - handler.set_status(200) - handler.finish() + if not args.foreground: + rpki.daemonize.daemon(pidfile = args.pidfile) - @tornado.gen.coroutine - def http_fetch(self, request, serialize_on_full_url = False): - """ - Wrapper around tornado.httpclient.AsyncHTTPClient() which - serializes requests to any particular HTTP server, to avoid - spurious CMS replay errors. - """ + if self.profile: + import cProfile + prof = cProfile.Profile() + try: + prof.runcall(self.main) + finally: + prof.dump_stats(self.profile) + logger.info("Dumped profile data to %s", self.profile) + else: + self.main() - # The current definition of "particular HTTP server" is based only - # on the "netloc" portion of the URL, which could in theory could - # cause deadlocks in a loopback scenario; no such deadlocks have - # shown up in testing, but if such a thing were to occur, it would - # look like an otherwise inexplicable HTTP timeout. The solution, - # should this occur, would be to use the entire URL as the lookup - # key, perhaps only for certain protocols. - # - # The reason for the current scheme is that at least one protocol - # (publication) uses RESTful URLs but has a single service-wide - # CMS replay detection database, which translates to meaning that - # we need to serialize all requests for that service, not just - # requests to a particular URL. - - if serialize_on_full_url: - netlock = request.url - else: - netlock = urlparse.urlparse(request.url).netloc - - try: - lock = self.http_client_serialize[netlock] - except KeyError: - lock = self.http_client_serialize[netlock] = tornado.locks.Lock() - - http_client = tornado.httpclient.AsyncHTTPClient() - - with (yield lock.acquire()): - response = yield http_client.fetch(request) - - raise tornado.gen.Return(response) - - @staticmethod - def _compose_left_right_query(): - """ - Compose top level element of a left-right query to irdbd. - """ + def main(self): - return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, - type = "query", version = rpki.left_right.version) + startup_msg = self.cfg.get("startup-message", "") + if startup_msg: + logger.info(startup_msg) - @tornado.gen.coroutine - def irdb_query(self, q_msg): - """ - Perform an IRDB callback query. - """ + if self.profile: + logger.info("Running in profile mode with output to %s", self.profile) - q_tags = set(q_pdu.tag for q_pdu in q_msg) + logger.debug("Initializing Django") + import django + django.setup() - q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) + logger.debug("Initializing rpkidb...") + global rpki # pylint: disable=W0602 + import rpki.rpkidb # pylint: disable=W0621 - http_request = tornado.httpclient.HTTPRequest( - url = self.irdb_url, - method = "POST", - body = q_der, - headers = { "Content-Type" : rpki.left_right.content_type }) + logger.debug("Initializing rpkidb...done") - http_response = yield self.http_fetch(http_request) + self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) + self.irdb_cert = rpki.x509.X509(Auto_update = self.cfg.get("irdb-cert")) + self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) + self.rpkid_cert = rpki.x509.X509(Auto_update = self.cfg.get("rpkid-cert")) + self.rpkid_key = rpki.x509.RSA( Auto_update = self.cfg.get("rpkid-key")) - # Tornado already checked http_response.code for us + self.irdb_url = self.cfg.get("irdb-url") - content_type = http_response.headers.get("Content-Type") + self.http_server_host = self.cfg.get("server-host", "") + self.http_server_port = self.cfg.getint("server-port") + + self.use_internal_cron = self.cfg.getboolean("use-internal-cron", True) + + self.initial_delay = random.randint(self.cfg.getint("initial-delay-min", 10), + self.cfg.getint("initial-delay-max", 120)) + + # Should be much longer in production + self.cron_period = self.cfg.getint("cron-period", 120) + + if self.use_internal_cron: + logger.debug("Scheduling initial cron pass in %s seconds", self.initial_delay) + tornado.ioloop.IOLoop.current().spawn_callback(self.cron_loop) + + logger.debug("Scheduling task loop") + tornado.ioloop.IOLoop.current().spawn_callback(self.task_loop) + + rpkid = self + + class LeftRightHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.left_right_handler(self) + + class UpDownHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self, tenant_handle, child_handle): # pylint: disable=W0221 + yield rpkid.up_down_handler(self, tenant_handle, child_handle) + + class CronjobHandler(tornado.web.RequestHandler): # pylint: disable=W0223 + @tornado.gen.coroutine + def post(self): + yield rpkid.cronjob_handler(self) + + application = tornado.web.Application(( + (r"/left-right", LeftRightHandler), + (r"/up-down/([-a-zA-Z0-9_]+)/([-a-zA-Z0-9_]+)", UpDownHandler), + (r"/cronjob", CronjobHandler))) + + application.listen( + address = self.http_server_host, + port = self.http_server_port) + + tornado.ioloop.IOLoop.current().start() + + def task_add(self, tasks): + """ + Add zero or more tasks to the task queue. + """ + + for task in tasks: + if task in self.task_queue: + logger.debug("Task %r already queued", task) + else: + logger.debug("Adding %r to task queue", task) + self.task_queue.append(task) + + def task_run(self): + """ + Kick the task loop to notice recently added tasks. + """ + + self.task_event.set() + + @tornado.gen.coroutine + def task_loop(self): + """ + Asynchronous infinite loop to run background tasks. + + This code is a bit finicky, because it's managing a collection of + Future objects which are running independently of the control flow + here, and the wave function doesn't collapse until we do a yield. + + So we keep this brutally simple and don't try to hide too much of + it in the AbstractTask class. For similar reasons, AbstractTask + sets aside a .future instance variable for this method's use. + """ + + logger.debug("Starting task loop") + task_event_future = None + + while True: + while None in self.task_queue: + self.task_queue.remove(None) + + futures = [] + for task in self.task_queue: + if task.future is None: + task.future = task.start() + futures.append(task.future) + if task_event_future is None: + task_event_future = self.task_event.wait() + futures.append(task_event_future) + iterator = tornado.gen.WaitIterator(*futures) + + while not iterator.done(): + yield iterator.next() + if iterator.current_future is task_event_future: + self.task_event.clear() + task_event_future = None + break + else: + task = self.task_queue[iterator.current_index] + task.future = None + waiting = task.waiting() + if not waiting: + self.task_queue[iterator.current_index] = None + for task in self.task_queue: + if task is not None and not task.runnable.is_set(): + logger.debug("Reenabling task %r", task) + task.runnable.set() + if waiting: + break + + @tornado.gen.coroutine + def cron_loop(self): + """ + Asynchronous infinite loop to drive cron cycle. + """ + + logger.debug("cron_loop(): Starting") + assert self.use_internal_cron + logger.debug("cron_loop(): Startup delay %d seconds", self.initial_delay) + yield tornado.gen.sleep(self.initial_delay) + while True: + logger.debug("cron_loop(): Running") + yield self.cron_run() + logger.debug("cron_loop(): Sleeping %d seconds", self.cron_period) + yield tornado.gen.sleep(self.cron_period) + + @tornado.gen.coroutine + def cron_run(self): + """ + Schedule periodic tasks. + """ + + now = rpki.sundial.now() + logger.debug("Starting cron run") + try: + tenants = rpki.rpkidb.models.Tenant.objects.all() + except: + logger.exception("Error pulling tenants from SQL, maybe SQL server is down?") + else: + tasks = tuple(task for tenant in tenants for task in tenant.cron_tasks(self)) + self.task_add(tasks) + futures = [task.wait() for task in tasks] + self.task_run() + yield futures + logger.info("Finished cron run started at %s", now) + + @tornado.gen.coroutine + def cronjob_handler(self, handler): + """ + External trigger to schedule periodic tasks. Obsolete for + produciton use, but portions of the test framework still use this. + """ + + if self.use_internal_cron: + handler.set_status(500, "Running cron internally") + else: + logger.debug("Starting externally triggered cron") + yield self.cron() + handler.set_status(200) + handler.finish() + + @tornado.gen.coroutine + def http_fetch(self, request, serialize_on_full_url = False): + """ + Wrapper around tornado.httpclient.AsyncHTTPClient() which + serializes requests to any particular HTTP server, to avoid + spurious CMS replay errors. + """ + + # The current definition of "particular HTTP server" is based only + # on the "netloc" portion of the URL, which could in theory could + # cause deadlocks in a loopback scenario; no such deadlocks have + # shown up in testing, but if such a thing were to occur, it would + # look like an otherwise inexplicable HTTP timeout. The solution, + # should this occur, would be to use the entire URL as the lookup + # key, perhaps only for certain protocols. + # + # The reason for the current scheme is that at least one protocol + # (publication) uses RESTful URLs but has a single service-wide + # CMS replay detection database, which translates to meaning that + # we need to serialize all requests for that service, not just + # requests to a particular URL. + + if serialize_on_full_url: + netlock = request.url + else: + netlock = urlparse.urlparse(request.url).netloc - if content_type not in rpki.left_right.allowed_content_types: - raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type)) + try: + lock = self.http_client_serialize[netlock] + except KeyError: + lock = self.http_client_serialize[netlock] = tornado.locks.Lock() - r_der = http_response.body + http_client = tornado.httpclient.AsyncHTTPClient() - r_cms = rpki.left_right.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) + with (yield lock.acquire()): + response = yield http_client.fetch(request) - self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) + raise tornado.gen.Return(response) - #rpki.left_right.check_response(r_msg) + @staticmethod + def _compose_left_right_query(): + """ + Compose top level element of a left-right query to irdbd. + """ - if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): - raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) + return Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "query", version = rpki.left_right.version) - raise tornado.gen.Return(r_msg) + @tornado.gen.coroutine + def irdb_query(self, q_msg): + """ + Perform an IRDB callback query. + """ - @tornado.gen.coroutine - def irdb_query_child_resources(self, tenant_handle, child_handle): - """ - Ask IRDB about a child's resources. - """ + q_tags = set(q_pdu.tag for q_pdu in q_msg) - q_msg = self._compose_left_right_query() - SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle) + q_der = rpki.left_right.cms_msg().wrap(q_msg, self.rpkid_key, self.rpkid_cert) - r_msg = yield self.irdb_query(q_msg) + http_request = tornado.httpclient.HTTPRequest( + url = self.irdb_url, + method = "POST", + body = q_der, + headers = { "Content-Type" : rpki.left_right.content_type }) - if len(r_msg) != 1: - raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) + http_response = yield self.http_fetch(http_request) - bag = rpki.resource_set.resource_bag( - asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), - v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), - v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), - valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))) + # Tornado already checked http_response.code for us - raise tornado.gen.Return(bag) + content_type = http_response.headers.get("Content-Type") - @tornado.gen.coroutine - def irdb_query_roa_requests(self, tenant_handle): - """ - Ask IRDB about self's ROA requests. - """ + if content_type not in rpki.left_right.allowed_content_types: + raise rpki.exceptions.BadContentType("HTTP Content-Type %r, expected %r" % (rpki.left_right.content_type, content_type)) - q_msg = self._compose_left_right_query() - SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle) - r_msg = yield self.irdb_query(q_msg) - raise tornado.gen.Return(r_msg) + r_der = http_response.body - @tornado.gen.coroutine - def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): - """ - Ask IRDB about self's ghostbuster record requests. - """ + r_cms = rpki.left_right.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((self.bpki_ta, self.irdb_cert)) - q_msg = self._compose_left_right_query() - for parent_handle in parent_handles: - SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, - tenant_handle = tenant_handle, parent_handle = parent_handle) - r_msg = yield self.irdb_query(q_msg) - raise tornado.gen.Return(r_msg) + self.irdbd_cms_timestamp = r_cms.check_replay(self.irdbd_cms_timestamp, self.irdb_url) - @tornado.gen.coroutine - def irdb_query_ee_certificate_requests(self, tenant_handle): - """ - Ask IRDB about self's EE certificate requests. - """ + #rpki.left_right.check_response(r_msg) - q_msg = self._compose_left_right_query() - SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle) - r_msg = yield self.irdb_query(q_msg) - raise tornado.gen.Return(r_msg) + if r_msg.get("type") != "reply" or not all(r_pdu.tag in q_tags for r_pdu in r_msg): + raise rpki.exceptions.BadIRDBReply("Unexpected response to IRDB query: %s" % r_cms.pretty_print_content()) - @property - def left_right_models(self): - """ - Map element tag to rpkidb model. - """ + raise tornado.gen.Return(r_msg) - try: - return self._left_right_models - except AttributeError: - import rpki.rpkidb.models # pylint: disable=W0621 - self._left_right_models = { - rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant, - rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC, - rpki.left_right.tag_parent : rpki.rpkidb.models.Parent, - rpki.left_right.tag_child : rpki.rpkidb.models.Child, - rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } - return self._left_right_models - - @property - def left_right_trivial_handlers(self): - """ - Map element tag to bound handler methods for trivial PDU types. - """ + @tornado.gen.coroutine + def irdb_query_child_resources(self, tenant_handle, child_handle): + """ + Ask IRDB about a child's resources. + """ - try: - return self._left_right_trivial_handlers - except AttributeError: - self._left_right_trivial_handlers = { - rpki.left_right.tag_list_published_objects : self.handle_list_published_objects, - rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } - return self._left_right_trivial_handlers + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_resources, tenant_handle = tenant_handle, child_handle = child_handle) - def handle_list_published_objects(self, q_pdu, r_msg): - """ - <list_published_objects/> server. - """ + r_msg = yield self.irdb_query(q_msg) - tenant_handle = q_pdu.get("tenant_handle") - msg_tag = q_pdu.get("tag") - - kw = dict(tenant_handle = tenant_handle) - if msg_tag is not None: - kw.update(tag = msg_tag) - - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"): - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64() - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64() - for c in ca_detail.child_certs.all(): - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64() - for r in ca_detail.roas.filter(roa__isnull = False): - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = r.uri, **kw).text = r.roa.get_Base64() - for g in ca_detail.ghostbusters.all(): - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = g.uri, **kw).text = g.ghostbuster.get_Base64() - for c in ca_detail.ee_certificates.all(): - SubElement(r_msg, rpki.left_right.tag_list_published_objects, - uri = c.uri, **kw).text = c.cert.get_Base64() - - def handle_list_received_resources(self, q_pdu, r_msg): - """ - <list_received_resources/> server. - """ + if len(r_msg) != 1: + raise rpki.exceptions.BadIRDBReply("Expected exactly one PDU from IRDB: %s" % r_msg.pretty_print_content()) - logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu)) - tenant_handle = q_pdu.get("tenant_handle") - msg_tag = q_pdu.get("tag") - for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, - state = "active", latest_ca_cert__isnull = False): - cert = ca_detail.latest_ca_cert - resources = cert.get_3779resources() - r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources, - tenant_handle = tenant_handle, - parent_handle = ca_detail.ca.parent.parent_handle, - uri = ca_detail.ca_cert_uri, - notBefore = str(cert.getNotBefore()), - notAfter = str(cert.getNotAfter()), - sia_uri = cert.get_sia_directory_uri(), - aia_uri = cert.get_aia_uri(), - asn = str(resources.asn), - ipv4 = str(resources.v4), - ipv6 = str(resources.v6)) - if msg_tag is not None: - r_pdu.set("tag", msg_tag) - - @tornado.gen.coroutine - def left_right_handler(self, handler): - """ - Process one left-right message. - """ + bag = rpki.resource_set.resource_bag( + asn = rpki.resource_set.resource_set_as(r_msg[0].get("asn")), + v4 = rpki.resource_set.resource_set_ipv4(r_msg[0].get("ipv4")), + v6 = rpki.resource_set.resource_set_ipv6(r_msg[0].get("ipv6")), + valid_until = rpki.sundial.datetime.fromXMLtime(r_msg[0].get("valid_until"))) - logger.debug("Entering left_right_handler()") + raise tornado.gen.Return(bag) - content_type = handler.request.headers["Content-Type"] - if content_type not in rpki.left_right.allowed_content_types: - handler.set_status(415, "No handler for Content-Type %s" % content_type) - handler.finish() - return + @tornado.gen.coroutine + def irdb_query_roa_requests(self, tenant_handle): + """ + Ask IRDB about self's ROA requests. + """ - handler.set_header("Content-Type", rpki.left_right.content_type) + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_roa_requests, tenant_handle = tenant_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - try: - q_cms = rpki.left_right.cms_msg(DER = handler.request.body) - q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) - r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, - type = "reply", version = rpki.left_right.version) - self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path) + @tornado.gen.coroutine + def irdb_query_ghostbuster_requests(self, tenant_handle, parent_handles): + """ + Ask IRDB about self's ghostbuster record requests. + """ - assert q_msg.tag.startswith(rpki.left_right.xmlns) - assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) + q_msg = self._compose_left_right_query() + for parent_handle in parent_handles: + SubElement(q_msg, rpki.left_right.tag_list_ghostbuster_requests, + tenant_handle = tenant_handle, parent_handle = parent_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - if q_msg.get("version") != rpki.left_right.version: - raise rpki.exceptions.BadQuery("Unrecognized protocol version") + @tornado.gen.coroutine + def irdb_query_ee_certificate_requests(self, tenant_handle): + """ + Ask IRDB about self's EE certificate requests. + """ - if q_msg.get("type") != "query": - raise rpki.exceptions.BadQuery("Message type is not query") + q_msg = self._compose_left_right_query() + SubElement(q_msg, rpki.left_right.tag_list_ee_certificate_requests, tenant_handle = tenant_handle) + r_msg = yield self.irdb_query(q_msg) + raise tornado.gen.Return(r_msg) - for q_pdu in q_msg: + @property + def left_right_models(self): + """ + Map element tag to rpkidb model. + """ try: - action = q_pdu.get("action") - model = self.left_right_models.get(q_pdu.tag) - - if q_pdu.tag in self.left_right_trivial_handlers: - self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) - - elif action in ("get", "list"): - for obj in model.objects.xml_list(q_pdu): - obj.xml_template.encode(obj, q_pdu, r_msg) - - elif action == "destroy": - obj = model.objects.xml_get_for_delete(q_pdu) - yield obj.xml_pre_delete_hook(self) - obj.delete() - obj.xml_template.acknowledge(obj, q_pdu, r_msg) + return self._left_right_models + except AttributeError: + import rpki.rpkidb.models # pylint: disable=W0621 + self._left_right_models = { + rpki.left_right.tag_tenant : rpki.rpkidb.models.Tenant, + rpki.left_right.tag_bsc : rpki.rpkidb.models.BSC, + rpki.left_right.tag_parent : rpki.rpkidb.models.Parent, + rpki.left_right.tag_child : rpki.rpkidb.models.Child, + rpki.left_right.tag_repository : rpki.rpkidb.models.Repository } + return self._left_right_models + + @property + def left_right_trivial_handlers(self): + """ + Map element tag to bound handler methods for trivial PDU types. + """ - elif action in ("create", "set"): - obj = model.objects.xml_get_or_create(q_pdu) - obj.xml_template.decode(obj, q_pdu) - obj.xml_pre_save_hook(q_pdu) - obj.save() - yield obj.xml_post_save_hook(self, q_pdu) - obj.xml_template.acknowledge(obj, q_pdu, r_msg) + try: + return self._left_right_trivial_handlers + except AttributeError: + self._left_right_trivial_handlers = { + rpki.left_right.tag_list_published_objects : self.handle_list_published_objects, + rpki.left_right.tag_list_received_resources : self.handle_list_received_resources } + return self._left_right_trivial_handlers + + def handle_list_published_objects(self, q_pdu, r_msg): + """ + <list_published_objects/> server. + """ + + tenant_handle = q_pdu.get("tenant_handle") + msg_tag = q_pdu.get("tag") + + kw = dict(tenant_handle = tenant_handle) + if msg_tag is not None: + kw.update(tag = msg_tag) + + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, state = "active"): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.crl_uri, **kw).text = ca_detail.latest_crl.get_Base64() + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = ca_detail.manifest_uri, **kw).text = ca_detail.latest_manifest.get_Base64() + for c in ca_detail.child_certs.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, child_handle = c.child.child_handle, **kw).text = c.cert.get_Base64() + for r in ca_detail.roas.filter(roa__isnull = False): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = r.uri, **kw).text = r.roa.get_Base64() + for g in ca_detail.ghostbusters.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = g.uri, **kw).text = g.ghostbuster.get_Base64() + for c in ca_detail.ee_certificates.all(): + SubElement(r_msg, rpki.left_right.tag_list_published_objects, + uri = c.uri, **kw).text = c.cert.get_Base64() + + def handle_list_received_resources(self, q_pdu, r_msg): + """ + <list_received_resources/> server. + """ + + logger.debug(".handle_list_received_resources() %s", ElementToString(q_pdu)) + tenant_handle = q_pdu.get("tenant_handle") + msg_tag = q_pdu.get("tag") + for ca_detail in rpki.rpkidb.models.CADetail.objects.filter(ca__parent__tenant__tenant_handle = tenant_handle, + state = "active", latest_ca_cert__isnull = False): + cert = ca_detail.latest_ca_cert + resources = cert.get_3779resources() + r_pdu = SubElement(r_msg, rpki.left_right.tag_list_received_resources, + tenant_handle = tenant_handle, + parent_handle = ca_detail.ca.parent.parent_handle, + uri = ca_detail.ca_cert_uri, + notBefore = str(cert.getNotBefore()), + notAfter = str(cert.getNotAfter()), + sia_uri = cert.get_sia_directory_uri(), + aia_uri = cert.get_aia_uri(), + asn = str(resources.asn), + ipv4 = str(resources.v4), + ipv6 = str(resources.v6)) + if msg_tag is not None: + r_pdu.set("tag", msg_tag) + + @tornado.gen.coroutine + def left_right_handler(self, handler): + """ + Process one left-right message. + """ + + logger.debug("Entering left_right_handler()") + + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.left_right.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + return + + handler.set_header("Content-Type", rpki.left_right.content_type) - else: - raise rpki.exceptions.BadQuery("Unrecognized action %r" % action) + try: + q_cms = rpki.left_right.cms_msg(DER = handler.request.body) + q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) + r_msg = Element(rpki.left_right.tag_msg, nsmap = rpki.left_right.nsmap, + type = "reply", version = rpki.left_right.version) + self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, handler.request.path) + + assert q_msg.tag.startswith(rpki.left_right.xmlns) + assert all(q_pdu.tag.startswith(rpki.left_right.xmlns) for q_pdu in q_msg) + + if q_msg.get("version") != rpki.left_right.version: + raise rpki.exceptions.BadQuery("Unrecognized protocol version") + + if q_msg.get("type") != "query": + raise rpki.exceptions.BadQuery("Message type is not query") + + for q_pdu in q_msg: + + try: + action = q_pdu.get("action") + model = self.left_right_models.get(q_pdu.tag) + + if q_pdu.tag in self.left_right_trivial_handlers: + self.left_right_trivial_handlers[q_pdu.tag](q_pdu, r_msg) + + elif action in ("get", "list"): + for obj in model.objects.xml_list(q_pdu): + obj.xml_template.encode(obj, q_pdu, r_msg) + + elif action == "destroy": + obj = model.objects.xml_get_for_delete(q_pdu) + yield obj.xml_pre_delete_hook(self) + obj.delete() + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + elif action in ("create", "set"): + obj = model.objects.xml_get_or_create(q_pdu) + obj.xml_template.decode(obj, q_pdu) + obj.xml_pre_save_hook(q_pdu) + obj.save() + yield obj.xml_post_save_hook(self, q_pdu) + obj.xml_template.acknowledge(obj, q_pdu, r_msg) + + else: + raise rpki.exceptions.BadQuery("Unrecognized action %r" % action) + + except Exception, e: + if not isinstance(e, rpki.exceptions.NotFound): + logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) + error_tenant_handle = q_pdu.get("tenant_handle") + error_tag = q_pdu.get("tag") + r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) + if error_tag is not None: + r_pdu.set("tag", error_tag) + if error_tenant_handle is not None: + r_pdu.set("tenant_handle", error_tenant_handle) + break + + handler.set_status(200) + handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) + logger.debug("Normal exit from left_right_handler()") except Exception, e: - if not isinstance(e, rpki.exceptions.NotFound): - logger.exception("Unhandled exception serving left-right PDU %r", q_pdu) - error_tenant_handle = q_pdu.get("tenant_handle") - error_tag = q_pdu.get("tag") - r_pdu = SubElement(r_msg, rpki.left_right.tag_report_error, error_code = e.__class__.__name__) - r_pdu.text = str(e) - if error_tag is not None: - r_pdu.set("tag", error_tag) - if error_tenant_handle is not None: - r_pdu.set("tenant_handle", error_tenant_handle) - break - - handler.set_status(200) - handler.finish(rpki.left_right.cms_msg().wrap(r_msg, self.rpkid_key, self.rpkid_cert)) - logger.debug("Normal exit from left_right_handler()") - - except Exception, e: - logger.exception("Unhandled exception serving left-right request") - handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) - handler.finish() - - @tornado.gen.coroutine - def up_down_handler(self, handler, tenant_handle, child_handle): - """ - Process one up-down PDU. - """ + logger.exception("Unhandled exception serving left-right request") + handler.set_status(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + handler.finish() - logger.debug("Entering up_down_handler()") + @tornado.gen.coroutine + def up_down_handler(self, handler, tenant_handle, child_handle): + """ + Process one up-down PDU. + """ - content_type = handler.request.headers["Content-Type"] - if content_type not in rpki.up_down.allowed_content_types: - handler.set_status(415, "No handler for Content-Type %s" % content_type) - handler.finish() - return + logger.debug("Entering up_down_handler()") - try: - child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) - q_der = handler.request.body - r_der = yield child.serve_up_down(self, q_der) - handler.set_header("Content-Type", rpki.up_down.content_type) - handler.set_status(200) - handler.finish(r_der) + content_type = handler.request.headers["Content-Type"] + if content_type not in rpki.up_down.allowed_content_types: + handler.set_status(415, "No handler for Content-Type %s" % content_type) + handler.finish() + return - except rpki.rpkidb.models.Child.DoesNotExist: - logger.info("Child %r of tenant %r not found", child_handle, tenant_handle) - handler.set_status(400, "Child %r not found" % child_handle) - handler.finish() + try: + child = rpki.rpkidb.models.Child.objects.get(tenant__tenant_handle = tenant_handle, child_handle = child_handle) + q_der = handler.request.body + r_der = yield child.serve_up_down(self, q_der) + handler.set_header("Content-Type", rpki.up_down.content_type) + handler.set_status(200) + handler.finish(r_der) + + except rpki.rpkidb.models.Child.DoesNotExist: + logger.info("Child %r of tenant %r not found", child_handle, tenant_handle) + handler.set_status(400, "Child %r not found" % child_handle) + handler.finish() - except Exception, e: - logger.exception("Unhandled exception processing up-down request") - handler.set_status(400, "Could not process PDU: %s" % e) - handler.finish() + except Exception, e: + logger.exception("Unhandled exception processing up-down request") + handler.set_status(400, "Could not process PDU: %s" % e) + handler.finish() class publication_queue(object): - """ - Utility to simplify publication from within rpkid. - - General idea here is to accumulate a collection of objects to be - published, in one or more repositories, each potentially with its - own completion callback. Eventually we want to publish everything - we've accumulated, at which point we need to iterate over the - collection and do repository.call_pubd() for each repository. - """ - - replace = True - - def __init__(self, rpkid): - self.rpkid = rpkid - self.clear() - - def clear(self): - self.repositories = {} - self.msgs = {} - self.handlers = {} - if self.replace: - self.uris = {} - - def queue(self, uri, repository, handler = None, - old_obj = None, new_obj = None, old_hash = None): - - assert old_obj is not None or new_obj is not None or old_hash is not None - assert old_obj is None or old_hash is None - assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri)) - assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri)) - - logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s", - uri, old_obj, new_obj, old_hash) - - # id(repository) may need to change to repository.peer_contact_uri - # once we convert from our custom SQL cache to Django ORM. - - rid = id(repository) - if rid not in self.repositories: - self.repositories[rid] = repository - self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, - type = "query", version = rpki.publication.version) - - if self.replace and uri in self.uris: - logger.debug("Removing publication duplicate %r", self.uris[uri]) - old_pdu = self.uris.pop(uri) - self.msgs[rid].remove(old_pdu) - pdu_hash = old_pdu.get("hash") - elif old_hash is not None: - pdu_hash = old_hash - elif old_obj is None: - pdu_hash = None - else: - pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex") - - if new_obj is None: - pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash) - else: - pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri) - pdu.text = new_obj.get_Base64() - if pdu_hash is not None: - pdu.set("hash", pdu_hash) - - if handler is not None: - tag = str(id(pdu)) - self.handlers[tag] = handler - pdu.set("tag", tag) - - if self.replace: - self.uris[uri] = pdu - - @tornado.gen.coroutine - def call_pubd(self): - for rid in self.repositories: - logger.debug("Calling pubd[%r]", self.repositories[rid]) - yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) - self.clear() - - @property - def size(self): - return sum(len(self.msgs[rid]) for rid in self.repositories) - - def empty(self): - return not self.msgs + """ + Utility to simplify publication from within rpkid. + + General idea here is to accumulate a collection of objects to be + published, in one or more repositories, each potentially with its + own completion callback. Eventually we want to publish everything + we've accumulated, at which point we need to iterate over the + collection and do repository.call_pubd() for each repository. + """ + + replace = True + + def __init__(self, rpkid): + self.rpkid = rpkid + self.clear() + + def clear(self): + self.repositories = {} + self.msgs = {} + self.handlers = {} + if self.replace: + self.uris = {} + + def queue(self, uri, repository, handler = None, + old_obj = None, new_obj = None, old_hash = None): + + assert old_obj is not None or new_obj is not None or old_hash is not None + assert old_obj is None or old_hash is None + assert old_obj is None or isinstance(old_obj, rpki.x509.uri_dispatch(uri)) + assert new_obj is None or isinstance(new_obj, rpki.x509.uri_dispatch(uri)) + + logger.debug("Queuing publication action: uri %s, old %r, new %r, hash %s", + uri, old_obj, new_obj, old_hash) + + # id(repository) may need to change to repository.peer_contact_uri + # once we convert from our custom SQL cache to Django ORM. + + rid = id(repository) + if rid not in self.repositories: + self.repositories[rid] = repository + self.msgs[rid] = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "query", version = rpki.publication.version) + + if self.replace and uri in self.uris: + logger.debug("Removing publication duplicate %r", self.uris[uri]) + old_pdu = self.uris.pop(uri) + self.msgs[rid].remove(old_pdu) + pdu_hash = old_pdu.get("hash") + elif old_hash is not None: + pdu_hash = old_hash + elif old_obj is None: + pdu_hash = None + else: + pdu_hash = rpki.x509.sha256(old_obj.get_DER()).encode("hex") + + if new_obj is None: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_withdraw, uri = uri, hash = pdu_hash) + else: + pdu = SubElement(self.msgs[rid], rpki.publication.tag_publish, uri = uri) + pdu.text = new_obj.get_Base64() + if pdu_hash is not None: + pdu.set("hash", pdu_hash) + + if handler is not None: + tag = str(id(pdu)) + self.handlers[tag] = handler + pdu.set("tag", tag) + + if self.replace: + self.uris[uri] = pdu + + @tornado.gen.coroutine + def call_pubd(self): + for rid in self.repositories: + logger.debug("Calling pubd[%r]", self.repositories[rid]) + yield self.repositories[rid].call_pubd(self.rpkid, self.msgs[rid], self.handlers) + self.clear() + + @property + def size(self): + return sum(len(self.msgs[rid]) for rid in self.repositories) + + def empty(self): + return not self.msgs |