diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
commit | b46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 (patch) | |
tree | ca0dc0276d1adc168bc3337ce0564c4ec4957c1b /rpki/pubd.py | |
parent | 397beaf6d9900dc3b3cb612c89ebf1d57b1d16f6 (diff) |
"Any programmer who fails to comply with the standard naming, formatting,
or commenting conventions should be shot. If it so happens that it is
inconvenient to shoot him, then he is to be politely requested to recode
his program in adherence to the above standard."
-- Michael Spier, Digital Equipment Corporation
svn path=/branches/tk705/; revision=6152
Diffstat (limited to 'rpki/pubd.py')
-rw-r--r-- | rpki/pubd.py | 484 |
1 files changed, 242 insertions, 242 deletions
diff --git a/rpki/pubd.py b/rpki/pubd.py index f917c18d..ee258f26 100644 --- a/rpki/pubd.py +++ b/rpki/pubd.py @@ -45,252 +45,252 @@ logger = logging.getLogger(__name__) class main(object): - """ - Main program for pubd. - """ - - def __init__(self): - - os.environ.update(TZ = "UTC", - DJANGO_SETTINGS_MODULE = "rpki.django_settings.pubd") - time.tzset() - - self.irbe_cms_timestamp = None - - parser = argparse.ArgumentParser(description = __doc__) - parser.add_argument("-c", "--config", - help = "override default location of configuration file") - parser.add_argument("-f", "--foreground", action = "store_true", - help = "do not daemonize") - parser.add_argument("--pidfile", - help = "override default location of pid file") - parser.add_argument("--profile", - help = "enable profiling, saving data to PROFILE") - rpki.log.argparse_setup(parser) - args = parser.parse_args() - - self.profile = args.profile - - rpki.log.init("pubd", args) - - self.cfg = rpki.config.parser(set_filename = args.config, section = "pubd") - self.cfg.set_global_flags() - - if not args.foreground: - rpki.daemonize.daemon(pidfile = args.pidfile) - - if self.profile: - import cProfile - prof = cProfile.Profile() - try: - prof.runcall(self.main) - finally: - prof.dump_stats(self.profile) - logger.info("Dumped profile data to %s", self.profile) - else: - self.main() - - def main(self): - - if self.profile: - logger.info("Running in profile mode with output to %s", self.profile) - - import django - django.setup() - - global rpki # pylint: disable=W0602 - import rpki.pubdb # pylint: disable=W0621 - - self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) - self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) - self.pubd_cert = rpki.x509.X509(Auto_update = self.cfg.get("pubd-cert")) - self.pubd_key = rpki.x509.RSA( Auto_update = self.cfg.get("pubd-key")) - self.pubd_crl = rpki.x509.CRL( Auto_update = self.cfg.get("pubd-crl")) - - self.http_server_host = self.cfg.get("server-host", "") - self.http_server_port = self.cfg.getint("server-port") - - self.publication_base = self.cfg.get("publication-base", "publication/") - - self.rrdp_uri_base = self.cfg.get("rrdp-uri-base", - "http://%s/rrdp/" % socket.getfqdn()) - self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h")) - self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", - "rrdp-publication/") - - try: - self.session = rpki.pubdb.models.Session.objects.get() - except rpki.pubdb.models.Session.DoesNotExist: - self.session = rpki.pubdb.models.Session.objects.create(uuid = str(uuid.uuid4()), serial = 0) - - rpki.http_simple.server( - host = self.http_server_host, - port = self.http_server_port, - handlers = (("/control", self.control_handler), - ("/client/", self.client_handler))) - - - def control_handler(self, request, q_der): - """ - Process one PDU from the IRBE. """ - - from django.db import transaction, connection - - try: - connection.cursor() # Reconnect to mysqld if necessary - q_cms = rpki.publication_control.cms_msg(DER = q_der) - q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) - self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control") - if q_msg.get("type") != "query": - raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) - r_msg = Element(rpki.publication_control.tag_msg, nsmap = rpki.publication_control.nsmap, - type = "reply", version = rpki.publication_control.version) - - try: - q_pdu = None - with transaction.atomic(): - - for q_pdu in q_msg: - if q_pdu.tag != rpki.publication_control.tag_client: - raise rpki.exceptions.BadQuery("PDU is %s, expected client" % q_pdu.tag) - client_handle = q_pdu.get("client_handle") - action = q_pdu.get("action") - if client_handle is None: - logger.info("Control %s request", action) - else: - logger.info("Control %s request for %s", action, client_handle) - - if action in ("get", "list"): - if action == "get": - clients = rpki.pubdb.models.Client.objects.get(client_handle = client_handle), - else: - clients = rpki.pubdb.models.Client.objects.all() - for client in clients: - r_pdu = SubElement(r_msg, q_pdu.tag, action = action, - client_handle = client.client_handle, base_uri = client.base_uri) - if q_pdu.get("tag"): - r_pdu.set("tag", q_pdu.get("tag")) - SubElement(r_pdu, rpki.publication_control.tag_bpki_cert).text = client.bpki_cert.get_Base64() - if client.bpki_glue is not None: - SubElement(r_pdu, rpki.publication_control.tag_bpki_glue).text = client.bpki_glue.get_Base64() - - if action in ("create", "set"): - if action == "create": - client = rpki.pubdb.models.Client(client_handle = client_handle) - else: - client = rpki.pubdb.models.Client.objects.get(client_handle = client_handle) - if q_pdu.get("base_uri"): - client.base_uri = q_pdu.get("base_uri") - bpki_cert = q_pdu.find(rpki.publication_control.tag_bpki_cert) - if bpki_cert is not None: - client.bpki_cert = rpki.x509.X509(Base64 = bpki_cert.text) - bpki_glue = q_pdu.find(rpki.publication_control.tag_bpki_glue) - if bpki_glue is not None: - client.bpki_glue = rpki.x509.X509(Base64 = bpki_glue.text) - if q_pdu.get("clear_replay_protection") == "yes": - client.last_cms_timestamp = None - client.save() - logger.debug("Stored client_handle %s, base_uri %s, bpki_cert %r, bpki_glue %r, last_cms_timestamp %s", - client.client_handle, client.base_uri, client.bpki_cert, client.bpki_glue, - client.last_cms_timestamp) - r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle) - if q_pdu.get("tag"): - r_pdu.set("tag", q_pdu.get("tag")) - - if action == "destroy": - rpki.pubdb.models.Client.objects.filter(client_handle = client_handle).delete() - r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle) - if q_pdu.get("tag"): - r_pdu.set("tag", q_pdu.get("tag")) - - except Exception, e: - logger.exception("Exception processing PDU %r", q_pdu) - r_pdu = SubElement(r_msg, rpki.publication_control.tag_report_error, error_code = e.__class__.__name__) - r_pdu.text = str(e) - if q_pdu.get("tag") is not None: - r_pdu.set("tag", q_pdu.get("tag")) - - request.send_cms_response(rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert)) - - except Exception, e: - logger.exception("Unhandled exception processing control query, path %r", request.path) - request.send_error(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) - - - client_url_regexp = re.compile("/client/([-A-Z0-9_/]+)$", re.I) - - def client_handler(self, request, q_der): - """ - Process one PDU from a client. + Main program for pubd. """ - from django.db import transaction, connection - - try: - connection.cursor() # Reconnect to mysqld if necessary - match = self.client_url_regexp.search(request.path) - if match is None: - raise rpki.exceptions.BadContactURL("Bad path: %s" % request.path) - client = rpki.pubdb.models.Client.objects.get(client_handle = match.group(1)) - q_cms = rpki.publication.cms_msg(DER = q_der) - q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) - client.last_cms_timestamp = q_cms.check_replay(client.last_cms_timestamp, client.client_handle) - client.save() - if q_msg.get("type") != "query": - raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) - r_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, - type = "reply", version = rpki.publication.version) - delta = None - try: - with transaction.atomic(): - for q_pdu in q_msg: - if q_pdu.get("uri"): - logger.info("Client %s request for %s", q_pdu.tag, q_pdu.get("uri")) - else: - logger.info("Client %s request", q_pdu.tag) + def __init__(self): - if q_pdu.tag == rpki.publication.tag_list: - for obj in client.publishedobject_set.all(): - r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash) + os.environ.update(TZ = "UTC", + DJANGO_SETTINGS_MODULE = "rpki.django_settings.pubd") + time.tzset() + + self.irbe_cms_timestamp = None + + parser = argparse.ArgumentParser(description = __doc__) + parser.add_argument("-c", "--config", + help = "override default location of configuration file") + parser.add_argument("-f", "--foreground", action = "store_true", + help = "do not daemonize") + parser.add_argument("--pidfile", + help = "override default location of pid file") + parser.add_argument("--profile", + help = "enable profiling, saving data to PROFILE") + rpki.log.argparse_setup(parser) + args = parser.parse_args() + + self.profile = args.profile + + rpki.log.init("pubd", args) + + self.cfg = rpki.config.parser(set_filename = args.config, section = "pubd") + self.cfg.set_global_flags() + + if not args.foreground: + rpki.daemonize.daemon(pidfile = args.pidfile) + + if self.profile: + import cProfile + prof = cProfile.Profile() + try: + prof.runcall(self.main) + finally: + prof.dump_stats(self.profile) + logger.info("Dumped profile data to %s", self.profile) + else: + self.main() + + def main(self): + + if self.profile: + logger.info("Running in profile mode with output to %s", self.profile) + + import django + django.setup() + + global rpki # pylint: disable=W0602 + import rpki.pubdb # pylint: disable=W0621 + + self.bpki_ta = rpki.x509.X509(Auto_update = self.cfg.get("bpki-ta")) + self.irbe_cert = rpki.x509.X509(Auto_update = self.cfg.get("irbe-cert")) + self.pubd_cert = rpki.x509.X509(Auto_update = self.cfg.get("pubd-cert")) + self.pubd_key = rpki.x509.RSA( Auto_update = self.cfg.get("pubd-key")) + self.pubd_crl = rpki.x509.CRL( Auto_update = self.cfg.get("pubd-crl")) + + self.http_server_host = self.cfg.get("server-host", "") + self.http_server_port = self.cfg.getint("server-port") + + self.publication_base = self.cfg.get("publication-base", "publication/") + + self.rrdp_uri_base = self.cfg.get("rrdp-uri-base", + "http://%s/rrdp/" % socket.getfqdn()) + self.rrdp_expiration_interval = rpki.sundial.timedelta.parse(self.cfg.get("rrdp-expiration-interval", "6h")) + self.rrdp_publication_base = self.cfg.get("rrdp-publication-base", + "rrdp-publication/") + + try: + self.session = rpki.pubdb.models.Session.objects.get() + except rpki.pubdb.models.Session.DoesNotExist: + self.session = rpki.pubdb.models.Session.objects.create(uuid = str(uuid.uuid4()), serial = 0) + + rpki.http_simple.server( + host = self.http_server_host, + port = self.http_server_port, + handlers = (("/control", self.control_handler), + ("/client/", self.client_handler))) + + + def control_handler(self, request, q_der): + """ + Process one PDU from the IRBE. + """ + + from django.db import transaction, connection + + try: + connection.cursor() # Reconnect to mysqld if necessary + q_cms = rpki.publication_control.cms_msg(DER = q_der) + q_msg = q_cms.unwrap((self.bpki_ta, self.irbe_cert)) + self.irbe_cms_timestamp = q_cms.check_replay(self.irbe_cms_timestamp, "control") + if q_msg.get("type") != "query": + raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) + r_msg = Element(rpki.publication_control.tag_msg, nsmap = rpki.publication_control.nsmap, + type = "reply", version = rpki.publication_control.version) + + try: + q_pdu = None + with transaction.atomic(): + + for q_pdu in q_msg: + if q_pdu.tag != rpki.publication_control.tag_client: + raise rpki.exceptions.BadQuery("PDU is %s, expected client" % q_pdu.tag) + client_handle = q_pdu.get("client_handle") + action = q_pdu.get("action") + if client_handle is None: + logger.info("Control %s request", action) + else: + logger.info("Control %s request for %s", action, client_handle) + + if action in ("get", "list"): + if action == "get": + clients = rpki.pubdb.models.Client.objects.get(client_handle = client_handle), + else: + clients = rpki.pubdb.models.Client.objects.all() + for client in clients: + r_pdu = SubElement(r_msg, q_pdu.tag, action = action, + client_handle = client.client_handle, base_uri = client.base_uri) + if q_pdu.get("tag"): + r_pdu.set("tag", q_pdu.get("tag")) + SubElement(r_pdu, rpki.publication_control.tag_bpki_cert).text = client.bpki_cert.get_Base64() + if client.bpki_glue is not None: + SubElement(r_pdu, rpki.publication_control.tag_bpki_glue).text = client.bpki_glue.get_Base64() + + if action in ("create", "set"): + if action == "create": + client = rpki.pubdb.models.Client(client_handle = client_handle) + else: + client = rpki.pubdb.models.Client.objects.get(client_handle = client_handle) + if q_pdu.get("base_uri"): + client.base_uri = q_pdu.get("base_uri") + bpki_cert = q_pdu.find(rpki.publication_control.tag_bpki_cert) + if bpki_cert is not None: + client.bpki_cert = rpki.x509.X509(Base64 = bpki_cert.text) + bpki_glue = q_pdu.find(rpki.publication_control.tag_bpki_glue) + if bpki_glue is not None: + client.bpki_glue = rpki.x509.X509(Base64 = bpki_glue.text) + if q_pdu.get("clear_replay_protection") == "yes": + client.last_cms_timestamp = None + client.save() + logger.debug("Stored client_handle %s, base_uri %s, bpki_cert %r, bpki_glue %r, last_cms_timestamp %s", + client.client_handle, client.base_uri, client.bpki_cert, client.bpki_glue, + client.last_cms_timestamp) + r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle) + if q_pdu.get("tag"): + r_pdu.set("tag", q_pdu.get("tag")) + + if action == "destroy": + rpki.pubdb.models.Client.objects.filter(client_handle = client_handle).delete() + r_pdu = SubElement(r_msg, q_pdu.tag, action = action, client_handle = client_handle) + if q_pdu.get("tag"): + r_pdu.set("tag", q_pdu.get("tag")) + + except Exception, e: + logger.exception("Exception processing PDU %r", q_pdu) + r_pdu = SubElement(r_msg, rpki.publication_control.tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) if q_pdu.get("tag") is not None: - r_pdu.set("tag", q_pdu.get("tag")) + r_pdu.set("tag", q_pdu.get("tag")) + + request.send_cms_response(rpki.publication_control.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert)) + + except Exception, e: + logger.exception("Unhandled exception processing control query, path %r", request.path) + request.send_error(500, "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + + + client_url_regexp = re.compile("/client/([-A-Z0-9_/]+)$", re.I) + + def client_handler(self, request, q_der): + """ + Process one PDU from a client. + """ + + from django.db import transaction, connection + + try: + connection.cursor() # Reconnect to mysqld if necessary + match = self.client_url_regexp.search(request.path) + if match is None: + raise rpki.exceptions.BadContactURL("Bad path: %s" % request.path) + client = rpki.pubdb.models.Client.objects.get(client_handle = match.group(1)) + q_cms = rpki.publication.cms_msg(DER = q_der) + q_msg = q_cms.unwrap((self.bpki_ta, client.bpki_cert, client.bpki_glue)) + client.last_cms_timestamp = q_cms.check_replay(client.last_cms_timestamp, client.client_handle) + client.save() + if q_msg.get("type") != "query": + raise rpki.exceptions.BadQuery("Message type is %s, expected query" % q_msg.get("type")) + r_msg = Element(rpki.publication.tag_msg, nsmap = rpki.publication.nsmap, + type = "reply", version = rpki.publication.version) + delta = None + try: + with transaction.atomic(): + for q_pdu in q_msg: + if q_pdu.get("uri"): + logger.info("Client %s request for %s", q_pdu.tag, q_pdu.get("uri")) + else: + logger.info("Client %s request", q_pdu.tag) + + if q_pdu.tag == rpki.publication.tag_list: + for obj in client.publishedobject_set.all(): + r_pdu = SubElement(r_msg, q_pdu.tag, uri = obj.uri, hash = obj.hash) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) + + else: + assert q_pdu.tag in (rpki.publication.tag_publish, rpki.publication.tag_withdraw) + if delta is None: + delta = self.session.new_delta(rpki.sundial.now() + self.rrdp_expiration_interval) + client.check_allowed_uri(q_pdu.get("uri")) + if q_pdu.tag == rpki.publication.tag_publish: + der = q_pdu.text.decode("base64") + logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri"))) + delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash")) + else: + logger.info("Withdrawing %s", q_pdu.get("uri")) + delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash")) + r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri")) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) + + if delta is not None: + delta.activate() + self.session.generate_snapshot() + self.session.expire_deltas() + + except Exception, e: + logger.exception("Exception processing PDU %r", q_pdu) + r_pdu = SubElement(r_msg, rpki.publication.tag_report_error, error_code = e.__class__.__name__) + r_pdu.text = str(e) + if q_pdu.get("tag") is not None: + r_pdu.set("tag", q_pdu.get("tag")) else: - assert q_pdu.tag in (rpki.publication.tag_publish, rpki.publication.tag_withdraw) - if delta is None: - delta = self.session.new_delta(rpki.sundial.now() + self.rrdp_expiration_interval) - client.check_allowed_uri(q_pdu.get("uri")) - if q_pdu.tag == rpki.publication.tag_publish: - der = q_pdu.text.decode("base64") - logger.info("Publishing %s", rpki.x509.uri_dispatch(q_pdu.get("uri"))(DER = der).tracking_data(q_pdu.get("uri"))) - delta.publish(client, der, q_pdu.get("uri"), q_pdu.get("hash")) - else: - logger.info("Withdrawing %s", q_pdu.get("uri")) - delta.withdraw(client, q_pdu.get("uri"), q_pdu.get("hash")) - r_pdu = SubElement(r_msg, q_pdu.tag, uri = q_pdu.get("uri")) - if q_pdu.get("tag") is not None: - r_pdu.set("tag", q_pdu.get("tag")) - - if delta is not None: - delta.activate() - self.session.generate_snapshot() - self.session.expire_deltas() - - except Exception, e: - logger.exception("Exception processing PDU %r", q_pdu) - r_pdu = SubElement(r_msg, rpki.publication.tag_report_error, error_code = e.__class__.__name__) - r_pdu.text = str(e) - if q_pdu.get("tag") is not None: - r_pdu.set("tag", q_pdu.get("tag")) - - else: - if delta is not None: - self.session.synchronize_rrdp_files(self.rrdp_publication_base, self.rrdp_uri_base) - delta.update_rsync_files(self.publication_base) - - request.send_cms_response(rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) - - except Exception, e: - logger.exception("Unhandled exception processing client query, path %r", request.path) - request.send_error(500, "Could not process PDU: %s" % e) + if delta is not None: + self.session.synchronize_rrdp_files(self.rrdp_publication_base, self.rrdp_uri_base) + delta.update_rsync_files(self.publication_base) + + request.send_cms_response(rpki.publication.cms_msg().wrap(r_msg, self.pubd_key, self.pubd_cert, self.pubd_crl)) + + except Exception, e: + logger.exception("Unhandled exception processing client query, path %r", request.path) + request.send_error(500, "Could not process PDU: %s" % e) |