aboutsummaryrefslogtreecommitdiff
path: root/scripts/rpki/up_down.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/rpki/up_down.py')
-rw-r--r--scripts/rpki/up_down.py129
1 files changed, 62 insertions, 67 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py
index fcdd213b..87f7b05c 100644
--- a/scripts/rpki/up_down.py
+++ b/scripts/rpki/up_down.py
@@ -1,8 +1,12 @@
# $Id$
-import base64, sax_utils, resource_set
+import base64, sax_utils, resource_set, lxml.etree
-class generic_pdu(object):
+xmlns="http://www.apnic.net/specs/rescerts/up-down/"
+
+nsmap = { None : xmlns }
+
+class base_elt(object):
"""
Generic PDU object, just provides some default methods.
"""
@@ -13,7 +17,18 @@ class generic_pdu(object):
def endElement(self, stack, name, text):
stack.pop()
-class certificate_elt(generic_pdu):
+ def make_elt(self, name, *attrs):
+ elt = lxml.etree.Element("{%s}%s" % (xmlns, name), nsmap=nsmap)
+ for key in attrs:
+ val = getattr(self, key, None)
+ if val is not None:
+ elt.set(key, str(val))
+ return elt
+
+ def make_b64elt(self, elt, name):
+ lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(getattr(self, name))
+
+class certificate_elt(base_elt):
"""
Up-Down protocol representation of an issued certificate.
"""
@@ -30,18 +45,12 @@ class certificate_elt(generic_pdu):
self.cert = base64.b64decode(text)
stack.pop()
- def __str__(self):
- xml = (' <certificate cert_url="%s"' % (self.cert_url))
- if self.req_resource_set_as:
- xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as)
- if self.req_resource_set_ipv4:
- xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4)
- if self.req_resource_set_ipv6:
- xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6)
- xml += ">" + base64.b64encode(self.cert) + "</certificate>\n"
- return xml
-
-class class_elt(generic_pdu):
+ def toXML(self):
+ elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt.text = base64.b64encode(self.cert)
+ return elt
+
+class class_elt(base_elt):
"""
Up-Down protocol representation of a resource class.
"""
@@ -71,31 +80,21 @@ class class_elt(generic_pdu):
assert name == "class", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
- def __str__(self):
- xml = (' <class class_name="%s"\n'
- ' cert_url="%s"\n'
- ' resource_set_as="%s"\n'
- ' resource_set_ipv4="%s"\n'
- ' resource_set_ipv6="%s"'
- % (self.class_name, self.cert_url, self.resource_set_as,
- self.resource_set_ipv4, self.resource_set_ipv6))
- if self.suggested_sia_head:
- xml += '\n suggested_sia_head="%s"' % (self.suggested_sia_head)
- xml += ">\n"
- for cert in self.certs:
- xml += str(cert)
- xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n"
- return xml
-
-class list_pdu(generic_pdu):
+ def toXML(self):
+ elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head")
+ elt.extend([i.toXML() for i in self.certs])
+ self.make_b64elt(elt, "issuer")
+ return elt
+
+class list_pdu(base_elt):
"""
Up-Down protocol "list" PDU.
"""
- def __str__(self):
- return ""
+ def toXML(self):
+ return []
-class list_response_pdu(generic_pdu):
+class list_response_pdu(base_elt):
"""
Up-Down protocol "list_response" PDU.
"""
@@ -110,10 +109,10 @@ class list_response_pdu(generic_pdu):
stack.append(klass)
klass.startElement(stack, name, attrs)
- def __str__(self):
- return "".join(map(str, self.classes))
+ def toXML(self):
+ return [i.toXML() for i in self.classes]
-class issue_pdu(generic_pdu):
+class issue_pdu(base_elt):
"""
Up-Down protocol "issue" PDU.
"""
@@ -130,26 +129,21 @@ class issue_pdu(generic_pdu):
self.pkcs10 = base64.b64decode(text)
stack.pop()
- def __str__(self):
- xml = (' <request class_name="%s"' % self.class_name)
- if self.req_resource_set_as:
- xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as)
- if self.req_resource_set_ipv4:
- xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4)
- if self.req_resource_set_ipv6:
- xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6)
- return xml + ">" + base64.b64encode(self.pkcs10) + "</request>\n"
+ def toXML(self):
+ elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt.text = base64.b64encode(self.pkcs10)
+ return [elt]
class issue_response_pdu(list_response_pdu):
"""
Up-Down protocol "issue_response" PDU.
"""
- def __str__(self):
+ def toXML(self):
assert len(self.classes) == 1
- return list_response_pdu.__str__(self)
+ return list_response_pdu.toXML(self)
-class revoke_pdu(generic_pdu):
+class revoke_pdu(base_elt):
"""
Up-Down protocol "revoke" PDU.
"""
@@ -158,8 +152,8 @@ class revoke_pdu(generic_pdu):
self.class_name = attrs["class_name"]
self.ski = attrs["ski"]
- def __str__(self):
- return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski))
+ def toXML(self):
+ return [self.make_elt("key", "class_name", "ski")]
class revoke_response_pdu(revoke_pdu):
"""
@@ -167,13 +161,15 @@ class revoke_response_pdu(revoke_pdu):
"""
pass
-class error_response_pdu(generic_pdu):
+class error_response_pdu(base_elt):
"""
Up-Down protocol "error_response" PDU.
"""
- def __str__(self):
- return ' <status>%d</status>\n' % self.status
+ def toXML(self):
+ elt = self.make_elt("status")
+ elt.text = str(self.status)
+ return [elt]
def endElement(self, stack, name, text):
if name == "status":
@@ -187,24 +183,21 @@ class error_response_pdu(generic_pdu):
stack.pop()
stack[-1].endElement(stack, name, text)
-class message_pdu(generic_pdu):
+class message_pdu(base_elt):
"""
Up-Down protocol message wrapper.
"""
- def __str__(self):
- return ('<?xml version="1.0" encoding="UTF-8"?>\n'
- '<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n'
- ' version="1"\n'
- ' sender="%s"\n'
- ' recipient="%s"\n'
- ' type="%s">\n'
- '%s</message>\n'
- % (self.sender, self.recipient, self.type, self.payload))
+ version = 1
+
+ def toXML(self):
+ elt = self.make_elt("message", "version", "sender", "recipient", "type")
+ elt.extend(self.payload.toXML())
+ return elt
def startElement(self, stack, name, attrs):
assert name == "message", "Unexpected name %s, stack %s" % (name, stack)
- self.version = attrs["version"]
+ assert self.version == int(attrs["version"])
self.sender = attrs["sender"]
self.recipient = attrs["recipient"]
self.type = attrs["type"]
@@ -219,11 +212,13 @@ class message_pdu(generic_pdu):
}[attrs["type"]]()
stack.append(self.payload)
+ def __str__(self):
+ lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8")
+
class sax_handler(sax_utils.handler):
"""
SAX handler for Up-Down protocol.
"""
def create_top_level(self, name, attrs):
- assert name == "message" and attrs["version"] == "1"
return message_pdu()