diff options
Diffstat (limited to 'scripts/rpki/up_down.py')
-rw-r--r-- | scripts/rpki/up_down.py | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index d1437775..ebff70d0 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -2,7 +2,7 @@ """RPKI "up-down" protocol.""" -import base64, sax_utils, resource_set, lxml.etree, x509 +import base64, rpki.sax_utils, rpki.resource_set, lxml.etree, x509, rpki.exceptions xmlns="http://www.apnic.net/specs/rescerts/up-down/" @@ -55,7 +55,8 @@ class multi_uri(list): elif isinstance(ini, str): self[:] = ini.split(",") for s in self: - assert s.strip() == s and s.find("://") >= 0, "Bad URI \"%s\"" % s + if s.strip() != s or s.find("://") < 0: + raise rpki.exceptions.BadURISyntax, "Bad URI \"%s\"" % s else: raise TypeError @@ -76,13 +77,13 @@ class certificate_elt(base_elt): """Handle attributes of <certificate/> element.""" assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack) self.cert_url = multi_uri(attrs["cert_url"]) - self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) - self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) - self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) + self.req_resource_set_as = rpki.resource_set.resource_set_as(attrs.get("req_resource_set_as")) + self.req_resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) + self.req_resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) def endElement(self, stack, name, text): """Handle text content of a <certificate/> element.""" - assert name == "certificate" + assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack) self.cert = x509.X509(Base64=text) stack.pop() @@ -110,9 +111,9 @@ class class_elt(base_elt): self.class_name = attrs["class_name"] self.cert_url = multi_uri(attrs["cert_url"]) self.suggested_sia_head = attrs.get("suggested_sia_head") - self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"]) - self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"]) - self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"]) + self.resource_set_as = rpki.resource_set.resource_set_as(attrs["resource_set_as"]) + self.resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs["resource_set_ipv4"]) + self.resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs["resource_set_ipv6"]) def endElement(self, stack, name, text): """Handle <class/> elements and their children.""" @@ -161,9 +162,9 @@ class issue_pdu(base_elt): """Handle "issue" PDU.""" assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.class_name = attrs["class_name"] - self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) - self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) - self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) + self.req_resource_set_as = rpki.resource_set.resource_set_as(attrs.get("req_resource_set_as")) + self.req_resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) + self.req_resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) def endElement(self, stack, name, text): """Handle "issue" PDU.""" @@ -219,7 +220,8 @@ class error_response_pdu(base_elt): """Handle "error_response" PDU.""" if name == "status": code = int(text) - assert code in self.codes + if code not in self.codes: + raise rpki.exceptions.BadStatusCode, "%s is not a known status code" self.status = code elif name == "last_message_processed": self.last_message_processed = text @@ -274,7 +276,7 @@ class message_pdu(base_elt): def __str__(self): lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8") -class sax_handler(sax_utils.handler): +class sax_handler(rpki.sax_utils.handler): """SAX handler for Up-Down protocol.""" def create_top_level(self, name, attrs): |