aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/rpki/up_down.py165
1 files changed, 92 insertions, 73 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py
index 37635418..1ace8c30 100644
--- a/scripts/rpki/up_down.py
+++ b/scripts/rpki/up_down.py
@@ -2,30 +2,16 @@
import base64, sax_utils, resource_set
-class msg_pdu(object):
+class generic_pdu(object):
"""
- Base type for all Up-Down PDUs.
+ Generic PDU object, just provides some default methods.
"""
- def __str__(self):
- return ('\
-<?xml version="1.0" encoding="UTF-8"?>\n\
-<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n\
- version="1"\n\
- sender="%s"\n\
- recipient="%s"\n\
- type="%s">\n' \
- % (self.sender, self.recipient, self.type)
- ) + self.toXML() + "</message>\n"
-
- def toXML(self):
- return ""
-
- def startElement(self, name, attrs):
+ def startElement(self, stack, name, attrs):
pass
- def endElement(self, name, text):
- pass
+ def endElement(self, stack, name, text):
+ stack.pop()
class cert_elt(object):
"""
@@ -33,10 +19,10 @@ class cert_elt(object):
"""
def __init__(self, attrs):
- sax_utils.snarf_attribute(self, attrs, "cert_url")
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as)
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4)
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6)
+ self.cert_url = attrs["cert_url"]
+ self.req_resource_set_as = resource_set.resource_set_as(attrs["req_resource_set_as"])
+ self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["req_resource_set_ipv4"])
+ self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["req_resource_set_ipv6"])
def __str__(self):
xml = (' <certificate cert_url="%s"' % (self.cert_url))
@@ -55,10 +41,12 @@ class class_elt(object):
"""
def __init__(self, attrs):
- sax_utils.snarf_attribute(self, attrs, ("class_name", "cert_url", "suggested_sia_head"))
- sax_utils.snarf_attribute(self, attrs, "resource_set_as", resource_set.resource_set_as)
- sax_utils.snarf_attribute(self, attrs, "resource_set_ipv4", resource_set.resource_set_ipv4)
- sax_utils.snarf_attribute(self, attrs, "resource_set_ipv6", resource_set.resource_set_ipv6)
+ self.class_name = attrs["class_name"]
+ self.cert_url = attrs["cert_url"]
+ self.suggested_sia_head = attrs.get("suggested_sia_head")
+ self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"])
+ self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"])
+ self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"])
self.certs = []
def __str__(self):
@@ -78,13 +66,15 @@ class class_elt(object):
xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n"
return xml
-class list_pdu(msg_pdu):
+class list_pdu(generic_pdu):
"""
Up-Down protocol "list" PDU.
"""
- pass
-class list_response_pdu(msg_pdu):
+ def __str__(self):
+ return ""
+
+class list_response_pdu(generic_pdu):
"""
Up-Down protocol "list_response" PDU.
"""
@@ -92,38 +82,45 @@ class list_response_pdu(msg_pdu):
def __init__(self):
self.classes = []
- def startElement(self, name, attrs):
+ def startElement(self, stack, name, attrs):
if name == "class":
self.classes.append(class_elt(attrs))
elif name == "certificate":
self.classes[-1].certs.append(cert_elt(attrs))
+ elif name != "issuer":
+ assert name == "list_response", "Unexpected name %s, stack %s" % (name, stack)
- def endElement(self, name, text):
+ def endElement(self, stack, name, text):
if name == "certificate":
self.classes[-1].certs[-1].cert = base64.b64decode(text)
elif name == "issuer":
self.classes[-1].issuer = base64.b64decode(text)
+ elif name != "class":
+ assert name == "message", "Unexpected name %s, stack %s" % (name, stack)
+ stack.pop()
+ stack[-1].endElement(stack, name, text)
- def toXML(self):
+ def __str__(self):
return "".join(map(str, self.classes))
-class issue_pdu(msg_pdu):
+class issue_pdu(generic_pdu):
"""
Up-Down protocol "issue" PDU.
"""
- def startElement(self, name, attrs):
- assert name == "request"
- sax_utils.snarf_attribute(self, attrs, "class_name")
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as)
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4)
- sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6)
+ def startElement(self, stack, name, attrs):
+ assert name == "request", "Unexpected name %s, stack %s" % (name, stack)
+ self.class_name = attrs["class_name"]
+ self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as"))
+ self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4"))
+ self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6"))
- def endElement(self, name, text):
- assert name == "request"
+ def endElement(self, stack, name, text):
+ assert name == "request", "Unexpected name %s, stack %s" % (name, stack)
self.pkcs10 = base64.b64decode(text)
+ stack.pop()
- def toXML(self):
+ def __str__(self):
xml = (' <request class_name="%s"' % self.class_name)
if self.req_resource_set_as:
xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as)
@@ -138,19 +135,20 @@ class issue_response_pdu(list_response_pdu):
Up-Down protocol "issue_response" PDU.
"""
- def toXML(self):
+ def __str__(self):
assert len(self.classes) == 1
- return list_response_pdu.toXML(self)
+ return list_response_pdu.__str__(self)
-class revoke_pdu(msg_pdu):
+class revoke_pdu(generic_pdu):
"""
Up-Down protocol "revoke" PDU.
"""
- def startElement(self, name, attrs):
- sax_utils.snarf_attribute(self, attrs, ("class_name", "ski"))
+ def startElement(self, stack, name, attrs):
+ self.class_name = attrs["class_name"]
+ self.ski = attrs["ski"]
- def toXML(self):
+ def __str__(self):
return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski))
class revoke_response_pdu(revoke_pdu):
@@ -159,43 +157,64 @@ class revoke_response_pdu(revoke_pdu):
"""
pass
-class error_response_pdu(msg_pdu):
+class error_response_pdu(generic_pdu):
"""
Up-Down protocol "error_response" PDU.
"""
- def toXML(self):
+ def __str__(self):
return ' <status>%d</status>\n' % self.status
- def endElement(self, name, text):
+ def endElement(self, stack, name, text):
if name == "status":
self.status = int(text)
elif name == "last_message_processed":
self.last_message_processed = text
elif name == "description":
self.description = text
+ else:
+ assert name == "message", "Unexpected name %s, stack %s" % (name, stack)
+ stack.pop()
+ stack[-1].endElement(stack, name, text)
+
+class message_pdu(generic_pdu):
+ """
+ Up-Down protocol message wrapper.
+ """
+
+ def __str__(self):
+ return ('\
+<?xml version="1.0" encoding="UTF-8"?>\n\
+<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n\
+ version="1"\n\
+ sender="%s"\n\
+ recipient="%s"\n\
+ type="%s">\n' \
+ % (self.sender, self.recipient, self.type)
+ ) + str(self.payload) + "</message>\n"
+
+ def startElement(self, stack, name, attrs):
+ assert name == "message", "Unexpected name %s, stack %s" % (name, stack)
+ self.version = attrs["version"]
+ self.sender = attrs["sender"]
+ self.recipient = attrs["recipient"]
+ self.type = attrs["type"]
+ self.payload = {
+ "list" : list_pdu,
+ "list_response" : list_response_pdu,
+ "issue" : issue_pdu,
+ "issue_response" : issue_response_pdu,
+ "revoke" : revoke_pdu,
+ "revoke_response" : revoke_response_pdu,
+ "error_response" : error_response_pdu
+ }[attrs["type"]]()
+ stack.append(self.payload)
class sax_handler(sax_utils.handler):
"""
- SAX handler for Up-Down protocol. Builds message PDU then
- dispatches to that class's handler for nested data.
- """
-
- def startElement(self, name, attrs):
- if name == "message":
- assert int(attrs.getValue("version")) == 1
- self.set_obj({ "list" : list_pdu(),
- "list_response" : list_response_pdu(),
- "issue" : issue_pdu(),
- "issue_response" : issue_response_pdu(),
- "revoke" : revoke_pdu(),
- "revoke_response" : revoke_response_pdu(),
- "error_response" : error_response_pdu()
- }[attrs.getValue("type").encode("ascii")])
- sax_utils.snarf_attribute(self.obj, attrs, ("sender", "recipient", "type"))
- else:
- self.obj.startElement(name, attrs)
+ SAX handler for Up-Down protocol.
+ """
- def endElement(self, name):
- if name != "message":
- self.obj.endElement(name, self.get_text())
+ def create_top_level(self, name, attrs):
+ assert name == "message" and attrs["version"] == "1"
+ return message_pdu()