aboutsummaryrefslogtreecommitdiff
path: root/scripts/rpki/up_down.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2007-09-18 21:44:21 +0000
committerRob Austein <sra@hactrn.net>2007-09-18 21:44:21 +0000
commit2c5df0fe0ecf1683bf2935d027411905035042b0 (patch)
tree259f542f3669c0855fe2b9a12e606f7df7bc0888 /scripts/rpki/up_down.py
parentf0d2933a30b05039919a3aecbb53e640d4c9c126 (diff)
Start adding our own exceptions.
svn path=/scripts/rpki/cms.py; revision=988
Diffstat (limited to 'scripts/rpki/up_down.py')
-rw-r--r--scripts/rpki/up_down.py30
1 files changed, 16 insertions, 14 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py
index d1437775..ebff70d0 100644
--- a/scripts/rpki/up_down.py
+++ b/scripts/rpki/up_down.py
@@ -2,7 +2,7 @@
"""RPKI "up-down" protocol."""
-import base64, sax_utils, resource_set, lxml.etree, x509
+import base64, rpki.sax_utils, rpki.resource_set, lxml.etree, x509, rpki.exceptions
xmlns="http://www.apnic.net/specs/rescerts/up-down/"
@@ -55,7 +55,8 @@ class multi_uri(list):
elif isinstance(ini, str):
self[:] = ini.split(",")
for s in self:
- assert s.strip() == s and s.find("://") >= 0, "Bad URI \"%s\"" % s
+ if s.strip() != s or s.find("://") < 0:
+ raise rpki.exceptions.BadURISyntax, "Bad URI \"%s\"" % s
else:
raise TypeError
@@ -76,13 +77,13 @@ class certificate_elt(base_elt):
"""Handle attributes of <certificate/> element."""
assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack)
self.cert_url = multi_uri(attrs["cert_url"])
- self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as"))
- self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4"))
- self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6"))
+ self.req_resource_set_as = rpki.resource_set.resource_set_as(attrs.get("req_resource_set_as"))
+ self.req_resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4"))
+ self.req_resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6"))
def endElement(self, stack, name, text):
"""Handle text content of a <certificate/> element."""
- assert name == "certificate"
+ assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack)
self.cert = x509.X509(Base64=text)
stack.pop()
@@ -110,9 +111,9 @@ class class_elt(base_elt):
self.class_name = attrs["class_name"]
self.cert_url = multi_uri(attrs["cert_url"])
self.suggested_sia_head = attrs.get("suggested_sia_head")
- self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"])
- self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"])
- self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"])
+ self.resource_set_as = rpki.resource_set.resource_set_as(attrs["resource_set_as"])
+ self.resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs["resource_set_ipv4"])
+ self.resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs["resource_set_ipv6"])
def endElement(self, stack, name, text):
"""Handle <class/> elements and their children."""
@@ -161,9 +162,9 @@ class issue_pdu(base_elt):
"""Handle "issue" PDU."""
assert name == "request", "Unexpected name %s, stack %s" % (name, stack)
self.class_name = attrs["class_name"]
- self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as"))
- self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4"))
- self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6"))
+ self.req_resource_set_as = rpki.resource_set.resource_set_as(attrs.get("req_resource_set_as"))
+ self.req_resource_set_ipv4 = rpki.resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4"))
+ self.req_resource_set_ipv6 = rpki.resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6"))
def endElement(self, stack, name, text):
"""Handle "issue" PDU."""
@@ -219,7 +220,8 @@ class error_response_pdu(base_elt):
"""Handle "error_response" PDU."""
if name == "status":
code = int(text)
- assert code in self.codes
+ if code not in self.codes:
+ raise rpki.exceptions.BadStatusCode, "%s is not a known status code"
self.status = code
elif name == "last_message_processed":
self.last_message_processed = text
@@ -274,7 +276,7 @@ class message_pdu(base_elt):
def __str__(self):
lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8")
-class sax_handler(sax_utils.handler):
+class sax_handler(rpki.sax_utils.handler):
"""SAX handler for Up-Down protocol."""
def create_top_level(self, name, attrs):