aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/rpki/left_right.py9
-rw-r--r--scripts/rpki/up_down.py129
-rwxr-xr-xscripts/xml-parse-test.py16
3 files changed, 75 insertions, 79 deletions
diff --git a/scripts/rpki/left_right.py b/scripts/rpki/left_right.py
index 7dc29e7e..a46c5a26 100644
--- a/scripts/rpki/left_right.py
+++ b/scripts/rpki/left_right.py
@@ -90,8 +90,7 @@ class self_elt(base_elt):
def toXML(self):
elt = self.make_elt("self")
- for i in self.prefs:
- elt.append(i.toXML())
+ elt.extend([i.toXML() for i in self.prefs])
return elt
class bsc_elt(base_elt):
@@ -268,8 +267,7 @@ class list_resources_elt(base_elt):
def toXML(self):
elt = self.make_elt("list_resources")
- for i in self.resources:
- elt.append(i.toXML())
+ elt.extend([i.toXML() for i in self.resources])
return elt
class report_error_elt(base_elt):
@@ -319,8 +317,7 @@ class msg(list):
def toXML(self):
elt = lxml.etree.Element("{%s}msg" % (xmlns), nsmap=nsmap, version=str(self.version))
- for i in self:
- elt.append(i.toXML())
+ elt.extend([i.toXML() for i in self])
return elt
class sax_handler(sax_utils.handler):
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py
index fcdd213b..87f7b05c 100644
--- a/scripts/rpki/up_down.py
+++ b/scripts/rpki/up_down.py
@@ -1,8 +1,12 @@
# $Id$
-import base64, sax_utils, resource_set
+import base64, sax_utils, resource_set, lxml.etree
-class generic_pdu(object):
+xmlns="http://www.apnic.net/specs/rescerts/up-down/"
+
+nsmap = { None : xmlns }
+
+class base_elt(object):
"""
Generic PDU object, just provides some default methods.
"""
@@ -13,7 +17,18 @@ class generic_pdu(object):
def endElement(self, stack, name, text):
stack.pop()
-class certificate_elt(generic_pdu):
+ def make_elt(self, name, *attrs):
+ elt = lxml.etree.Element("{%s}%s" % (xmlns, name), nsmap=nsmap)
+ for key in attrs:
+ val = getattr(self, key, None)
+ if val is not None:
+ elt.set(key, str(val))
+ return elt
+
+ def make_b64elt(self, elt, name):
+ lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(getattr(self, name))
+
+class certificate_elt(base_elt):
"""
Up-Down protocol representation of an issued certificate.
"""
@@ -30,18 +45,12 @@ class certificate_elt(generic_pdu):
self.cert = base64.b64decode(text)
stack.pop()
- def __str__(self):
- xml = (' <certificate cert_url="%s"' % (self.cert_url))
- if self.req_resource_set_as:
- xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as)
- if self.req_resource_set_ipv4:
- xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4)
- if self.req_resource_set_ipv6:
- xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6)
- xml += ">" + base64.b64encode(self.cert) + "</certificate>\n"
- return xml
-
-class class_elt(generic_pdu):
+ def toXML(self):
+ elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt.text = base64.b64encode(self.cert)
+ return elt
+
+class class_elt(base_elt):
"""
Up-Down protocol representation of a resource class.
"""
@@ -71,31 +80,21 @@ class class_elt(generic_pdu):
assert name == "class", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
- def __str__(self):
- xml = (' <class class_name="%s"\n'
- ' cert_url="%s"\n'
- ' resource_set_as="%s"\n'
- ' resource_set_ipv4="%s"\n'
- ' resource_set_ipv6="%s"'
- % (self.class_name, self.cert_url, self.resource_set_as,
- self.resource_set_ipv4, self.resource_set_ipv6))
- if self.suggested_sia_head:
- xml += '\n suggested_sia_head="%s"' % (self.suggested_sia_head)
- xml += ">\n"
- for cert in self.certs:
- xml += str(cert)
- xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n"
- return xml
-
-class list_pdu(generic_pdu):
+ def toXML(self):
+ elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head")
+ elt.extend([i.toXML() for i in self.certs])
+ self.make_b64elt(elt, "issuer")
+ return elt
+
+class list_pdu(base_elt):
"""
Up-Down protocol "list" PDU.
"""
- def __str__(self):
- return ""
+ def toXML(self):
+ return []
-class list_response_pdu(generic_pdu):
+class list_response_pdu(base_elt):
"""
Up-Down protocol "list_response" PDU.
"""
@@ -110,10 +109,10 @@ class list_response_pdu(generic_pdu):
stack.append(klass)
klass.startElement(stack, name, attrs)
- def __str__(self):
- return "".join(map(str, self.classes))
+ def toXML(self):
+ return [i.toXML() for i in self.classes]
-class issue_pdu(generic_pdu):
+class issue_pdu(base_elt):
"""
Up-Down protocol "issue" PDU.
"""
@@ -130,26 +129,21 @@ class issue_pdu(generic_pdu):
self.pkcs10 = base64.b64decode(text)
stack.pop()
- def __str__(self):
- xml = (' <request class_name="%s"' % self.class_name)
- if self.req_resource_set_as:
- xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as)
- if self.req_resource_set_ipv4:
- xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4)
- if self.req_resource_set_ipv6:
- xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6)
- return xml + ">" + base64.b64encode(self.pkcs10) + "</request>\n"
+ def toXML(self):
+ elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6")
+ elt.text = base64.b64encode(self.pkcs10)
+ return [elt]
class issue_response_pdu(list_response_pdu):
"""
Up-Down protocol "issue_response" PDU.
"""
- def __str__(self):
+ def toXML(self):
assert len(self.classes) == 1
- return list_response_pdu.__str__(self)
+ return list_response_pdu.toXML(self)
-class revoke_pdu(generic_pdu):
+class revoke_pdu(base_elt):
"""
Up-Down protocol "revoke" PDU.
"""
@@ -158,8 +152,8 @@ class revoke_pdu(generic_pdu):
self.class_name = attrs["class_name"]
self.ski = attrs["ski"]
- def __str__(self):
- return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski))
+ def toXML(self):
+ return [self.make_elt("key", "class_name", "ski")]
class revoke_response_pdu(revoke_pdu):
"""
@@ -167,13 +161,15 @@ class revoke_response_pdu(revoke_pdu):
"""
pass
-class error_response_pdu(generic_pdu):
+class error_response_pdu(base_elt):
"""
Up-Down protocol "error_response" PDU.
"""
- def __str__(self):
- return ' <status>%d</status>\n' % self.status
+ def toXML(self):
+ elt = self.make_elt("status")
+ elt.text = str(self.status)
+ return [elt]
def endElement(self, stack, name, text):
if name == "status":
@@ -187,24 +183,21 @@ class error_response_pdu(generic_pdu):
stack.pop()
stack[-1].endElement(stack, name, text)
-class message_pdu(generic_pdu):
+class message_pdu(base_elt):
"""
Up-Down protocol message wrapper.
"""
- def __str__(self):
- return ('<?xml version="1.0" encoding="UTF-8"?>\n'
- '<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n'
- ' version="1"\n'
- ' sender="%s"\n'
- ' recipient="%s"\n'
- ' type="%s">\n'
- '%s</message>\n'
- % (self.sender, self.recipient, self.type, self.payload))
+ version = 1
+
+ def toXML(self):
+ elt = self.make_elt("message", "version", "sender", "recipient", "type")
+ elt.extend(self.payload.toXML())
+ return elt
def startElement(self, stack, name, attrs):
assert name == "message", "Unexpected name %s, stack %s" % (name, stack)
- self.version = attrs["version"]
+ assert self.version == int(attrs["version"])
self.sender = attrs["sender"]
self.recipient = attrs["recipient"]
self.type = attrs["type"]
@@ -219,11 +212,13 @@ class message_pdu(generic_pdu):
}[attrs["type"]]()
stack.append(self.payload)
+ def __str__(self):
+ lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8")
+
class sax_handler(sax_utils.handler):
"""
SAX handler for Up-Down protocol.
"""
def create_top_level(self, name, attrs):
- assert name == "message" and attrs["version"] == "1"
return message_pdu()
diff --git a/scripts/xml-parse-test.py b/scripts/xml-parse-test.py
index d8a954d5..8e830d8d 100755
--- a/scripts/xml-parse-test.py
+++ b/scripts/xml-parse-test.py
@@ -2,7 +2,7 @@
import glob, rpki.up_down, rpki.left_right, xml.sax, lxml.etree, lxml.sax
-def test(fileglob, schema, proto):
+def test(fileglob, schema, proto, encoding):
rng = lxml.etree.RelaxNG(lxml.etree.parse(schema))
files = glob.glob(fileglob)
files.sort()
@@ -12,10 +12,14 @@ def test(fileglob, schema, proto):
et = lxml.etree.parse(f)
rng.assertValid(et)
lxml.sax.saxify(et, handler)
- et = lxml.etree.fromstring(str(handler.result))
- print lxml.etree.tostring(et)
- rng.assertValid(et)
+ et = handler.result.toXML()
+ print lxml.etree.tostring(et, pretty_print=True, encoding=encoding, xml_declaration=True)
+ try:
+ rng.assertValid(et)
+ except lxml.etree.DocumentInvalid:
+ print rng.error_log.last_error
+ raise
-test("up-down-protocol-samples/*.xml", "up-down-medium-schema.rng", rpki.up_down)
+test("up-down-protocol-samples/*.xml", "up-down-medium-schema.rng", rpki.up_down, encoding="utf-8")
-test("left-right-protocol-samples/*.xml", "left-right-schema.rng", rpki.left_right)
+test("left-right-protocol-samples/*.xml", "left-right-schema.rng", rpki.left_right, encoding="us-ascii")