diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/rpki/up_down.py | 165 |
1 files changed, 92 insertions, 73 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index 37635418..1ace8c30 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -2,30 +2,16 @@ import base64, sax_utils, resource_set -class msg_pdu(object): +class generic_pdu(object): """ - Base type for all Up-Down PDUs. + Generic PDU object, just provides some default methods. """ - def __str__(self): - return ('\ -<?xml version="1.0" encoding="UTF-8"?>\n\ -<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n\ - version="1"\n\ - sender="%s"\n\ - recipient="%s"\n\ - type="%s">\n' \ - % (self.sender, self.recipient, self.type) - ) + self.toXML() + "</message>\n" - - def toXML(self): - return "" - - def startElement(self, name, attrs): + def startElement(self, stack, name, attrs): pass - def endElement(self, name, text): - pass + def endElement(self, stack, name, text): + stack.pop() class cert_elt(object): """ @@ -33,10 +19,10 @@ class cert_elt(object): """ def __init__(self, attrs): - sax_utils.snarf_attribute(self, attrs, "cert_url") - sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as) - sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4) - sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6) + self.cert_url = attrs["cert_url"] + self.req_resource_set_as = resource_set.resource_set_as(attrs["req_resource_set_as"]) + self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["req_resource_set_ipv4"]) + self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["req_resource_set_ipv6"]) def __str__(self): xml = (' <certificate cert_url="%s"' % (self.cert_url)) @@ -55,10 +41,12 @@ class class_elt(object): """ def __init__(self, attrs): - sax_utils.snarf_attribute(self, attrs, ("class_name", "cert_url", "suggested_sia_head")) - sax_utils.snarf_attribute(self, attrs, "resource_set_as", resource_set.resource_set_as) - sax_utils.snarf_attribute(self, attrs, "resource_set_ipv4", resource_set.resource_set_ipv4) - sax_utils.snarf_attribute(self, attrs, "resource_set_ipv6", resource_set.resource_set_ipv6) + self.class_name = attrs["class_name"] + self.cert_url = attrs["cert_url"] + self.suggested_sia_head = attrs.get("suggested_sia_head") + self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"]) + self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"]) + self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"]) self.certs = [] def __str__(self): @@ -78,13 +66,15 @@ class class_elt(object): xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n" return xml -class list_pdu(msg_pdu): +class list_pdu(generic_pdu): """ Up-Down protocol "list" PDU. """ - pass -class list_response_pdu(msg_pdu): + def __str__(self): + return "" + +class list_response_pdu(generic_pdu): """ Up-Down protocol "list_response" PDU. """ @@ -92,38 +82,45 @@ class list_response_pdu(msg_pdu): def __init__(self): self.classes = [] - def startElement(self, name, attrs): + def startElement(self, stack, name, attrs): if name == "class": self.classes.append(class_elt(attrs)) elif name == "certificate": self.classes[-1].certs.append(cert_elt(attrs)) + elif name != "issuer": + assert name == "list_response", "Unexpected name %s, stack %s" % (name, stack) - def endElement(self, name, text): + def endElement(self, stack, name, text): if name == "certificate": self.classes[-1].certs[-1].cert = base64.b64decode(text) elif name == "issuer": self.classes[-1].issuer = base64.b64decode(text) + elif name != "class": + assert name == "message", "Unexpected name %s, stack %s" % (name, stack) + stack.pop() + stack[-1].endElement(stack, name, text) - def toXML(self): + def __str__(self): return "".join(map(str, self.classes)) -class issue_pdu(msg_pdu): +class issue_pdu(generic_pdu): """ Up-Down protocol "issue" PDU. """ - def startElement(self, name, attrs): - assert name == "request" - sax_utils.snarf_attribute(self, attrs, "class_name") - sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as) - sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4) - sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6) + def startElement(self, stack, name, attrs): + assert name == "request", "Unexpected name %s, stack %s" % (name, stack) + self.class_name = attrs["class_name"] + self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) + self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) + self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) - def endElement(self, name, text): - assert name == "request" + def endElement(self, stack, name, text): + assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.pkcs10 = base64.b64decode(text) + stack.pop() - def toXML(self): + def __str__(self): xml = (' <request class_name="%s"' % self.class_name) if self.req_resource_set_as: xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as) @@ -138,19 +135,20 @@ class issue_response_pdu(list_response_pdu): Up-Down protocol "issue_response" PDU. """ - def toXML(self): + def __str__(self): assert len(self.classes) == 1 - return list_response_pdu.toXML(self) + return list_response_pdu.__str__(self) -class revoke_pdu(msg_pdu): +class revoke_pdu(generic_pdu): """ Up-Down protocol "revoke" PDU. """ - def startElement(self, name, attrs): - sax_utils.snarf_attribute(self, attrs, ("class_name", "ski")) + def startElement(self, stack, name, attrs): + self.class_name = attrs["class_name"] + self.ski = attrs["ski"] - def toXML(self): + def __str__(self): return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski)) class revoke_response_pdu(revoke_pdu): @@ -159,43 +157,64 @@ class revoke_response_pdu(revoke_pdu): """ pass -class error_response_pdu(msg_pdu): +class error_response_pdu(generic_pdu): """ Up-Down protocol "error_response" PDU. """ - def toXML(self): + def __str__(self): return ' <status>%d</status>\n' % self.status - def endElement(self, name, text): + def endElement(self, stack, name, text): if name == "status": self.status = int(text) elif name == "last_message_processed": self.last_message_processed = text elif name == "description": self.description = text + else: + assert name == "message", "Unexpected name %s, stack %s" % (name, stack) + stack.pop() + stack[-1].endElement(stack, name, text) + +class message_pdu(generic_pdu): + """ + Up-Down protocol message wrapper. + """ + + def __str__(self): + return ('\ +<?xml version="1.0" encoding="UTF-8"?>\n\ +<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n\ + version="1"\n\ + sender="%s"\n\ + recipient="%s"\n\ + type="%s">\n' \ + % (self.sender, self.recipient, self.type) + ) + str(self.payload) + "</message>\n" + + def startElement(self, stack, name, attrs): + assert name == "message", "Unexpected name %s, stack %s" % (name, stack) + self.version = attrs["version"] + self.sender = attrs["sender"] + self.recipient = attrs["recipient"] + self.type = attrs["type"] + self.payload = { + "list" : list_pdu, + "list_response" : list_response_pdu, + "issue" : issue_pdu, + "issue_response" : issue_response_pdu, + "revoke" : revoke_pdu, + "revoke_response" : revoke_response_pdu, + "error_response" : error_response_pdu + }[attrs["type"]]() + stack.append(self.payload) class sax_handler(sax_utils.handler): """ - SAX handler for Up-Down protocol. Builds message PDU then - dispatches to that class's handler for nested data. - """ - - def startElement(self, name, attrs): - if name == "message": - assert int(attrs.getValue("version")) == 1 - self.set_obj({ "list" : list_pdu(), - "list_response" : list_response_pdu(), - "issue" : issue_pdu(), - "issue_response" : issue_response_pdu(), - "revoke" : revoke_pdu(), - "revoke_response" : revoke_response_pdu(), - "error_response" : error_response_pdu() - }[attrs.getValue("type").encode("ascii")]) - sax_utils.snarf_attribute(self.obj, attrs, ("sender", "recipient", "type")) - else: - self.obj.startElement(name, attrs) + SAX handler for Up-Down protocol. + """ - def endElement(self, name): - if name != "message": - self.obj.endElement(name, self.get_text()) + def create_top_level(self, name, attrs): + assert name == "message" and attrs["version"] == "1" + return message_pdu() |