diff options
-rw-r--r-- | scripts/rpki/left_right.py | 9 | ||||
-rw-r--r-- | scripts/rpki/up_down.py | 129 | ||||
-rwxr-xr-x | scripts/xml-parse-test.py | 16 |
3 files changed, 75 insertions, 79 deletions
diff --git a/scripts/rpki/left_right.py b/scripts/rpki/left_right.py index 7dc29e7e..a46c5a26 100644 --- a/scripts/rpki/left_right.py +++ b/scripts/rpki/left_right.py @@ -90,8 +90,7 @@ class self_elt(base_elt): def toXML(self): elt = self.make_elt("self") - for i in self.prefs: - elt.append(i.toXML()) + elt.extend([i.toXML() for i in self.prefs]) return elt class bsc_elt(base_elt): @@ -268,8 +267,7 @@ class list_resources_elt(base_elt): def toXML(self): elt = self.make_elt("list_resources") - for i in self.resources: - elt.append(i.toXML()) + elt.extend([i.toXML() for i in self.resources]) return elt class report_error_elt(base_elt): @@ -319,8 +317,7 @@ class msg(list): def toXML(self): elt = lxml.etree.Element("{%s}msg" % (xmlns), nsmap=nsmap, version=str(self.version)) - for i in self: - elt.append(i.toXML()) + elt.extend([i.toXML() for i in self]) return elt class sax_handler(sax_utils.handler): diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index fcdd213b..87f7b05c 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -1,8 +1,12 @@ # $Id$ -import base64, sax_utils, resource_set +import base64, sax_utils, resource_set, lxml.etree -class generic_pdu(object): +xmlns="http://www.apnic.net/specs/rescerts/up-down/" + +nsmap = { None : xmlns } + +class base_elt(object): """ Generic PDU object, just provides some default methods. """ @@ -13,7 +17,18 @@ class generic_pdu(object): def endElement(self, stack, name, text): stack.pop() -class certificate_elt(generic_pdu): + def make_elt(self, name, *attrs): + elt = lxml.etree.Element("{%s}%s" % (xmlns, name), nsmap=nsmap) + for key in attrs: + val = getattr(self, key, None) + if val is not None: + elt.set(key, str(val)) + return elt + + def make_b64elt(self, elt, name): + lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(getattr(self, name)) + +class certificate_elt(base_elt): """ Up-Down protocol representation of an issued certificate. """ @@ -30,18 +45,12 @@ class certificate_elt(generic_pdu): self.cert = base64.b64decode(text) stack.pop() - def __str__(self): - xml = (' <certificate cert_url="%s"' % (self.cert_url)) - if self.req_resource_set_as: - xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as) - if self.req_resource_set_ipv4: - xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4) - if self.req_resource_set_ipv6: - xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6) - xml += ">" + base64.b64encode(self.cert) + "</certificate>\n" - return xml - -class class_elt(generic_pdu): + def toXML(self): + elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") + elt.text = base64.b64encode(self.cert) + return elt + +class class_elt(base_elt): """ Up-Down protocol representation of a resource class. """ @@ -71,31 +80,21 @@ class class_elt(generic_pdu): assert name == "class", "Unexpected name %s, stack %s" % (name, stack) stack.pop() - def __str__(self): - xml = (' <class class_name="%s"\n' - ' cert_url="%s"\n' - ' resource_set_as="%s"\n' - ' resource_set_ipv4="%s"\n' - ' resource_set_ipv6="%s"' - % (self.class_name, self.cert_url, self.resource_set_as, - self.resource_set_ipv4, self.resource_set_ipv6)) - if self.suggested_sia_head: - xml += '\n suggested_sia_head="%s"' % (self.suggested_sia_head) - xml += ">\n" - for cert in self.certs: - xml += str(cert) - xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n" - return xml - -class list_pdu(generic_pdu): + def toXML(self): + elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head") + elt.extend([i.toXML() for i in self.certs]) + self.make_b64elt(elt, "issuer") + return elt + +class list_pdu(base_elt): """ Up-Down protocol "list" PDU. """ - def __str__(self): - return "" + def toXML(self): + return [] -class list_response_pdu(generic_pdu): +class list_response_pdu(base_elt): """ Up-Down protocol "list_response" PDU. """ @@ -110,10 +109,10 @@ class list_response_pdu(generic_pdu): stack.append(klass) klass.startElement(stack, name, attrs) - def __str__(self): - return "".join(map(str, self.classes)) + def toXML(self): + return [i.toXML() for i in self.classes] -class issue_pdu(generic_pdu): +class issue_pdu(base_elt): """ Up-Down protocol "issue" PDU. """ @@ -130,26 +129,21 @@ class issue_pdu(generic_pdu): self.pkcs10 = base64.b64decode(text) stack.pop() - def __str__(self): - xml = (' <request class_name="%s"' % self.class_name) - if self.req_resource_set_as: - xml += ('\n req_resource_set_as="%s"' % self.req_resource_set_as) - if self.req_resource_set_ipv4: - xml += ('\n req_resource_set_ipv4="%s"' % self.req_resource_set_ipv4) - if self.req_resource_set_ipv6: - xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6) - return xml + ">" + base64.b64encode(self.pkcs10) + "</request>\n" + def toXML(self): + elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") + elt.text = base64.b64encode(self.pkcs10) + return [elt] class issue_response_pdu(list_response_pdu): """ Up-Down protocol "issue_response" PDU. """ - def __str__(self): + def toXML(self): assert len(self.classes) == 1 - return list_response_pdu.__str__(self) + return list_response_pdu.toXML(self) -class revoke_pdu(generic_pdu): +class revoke_pdu(base_elt): """ Up-Down protocol "revoke" PDU. """ @@ -158,8 +152,8 @@ class revoke_pdu(generic_pdu): self.class_name = attrs["class_name"] self.ski = attrs["ski"] - def __str__(self): - return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski)) + def toXML(self): + return [self.make_elt("key", "class_name", "ski")] class revoke_response_pdu(revoke_pdu): """ @@ -167,13 +161,15 @@ class revoke_response_pdu(revoke_pdu): """ pass -class error_response_pdu(generic_pdu): +class error_response_pdu(base_elt): """ Up-Down protocol "error_response" PDU. """ - def __str__(self): - return ' <status>%d</status>\n' % self.status + def toXML(self): + elt = self.make_elt("status") + elt.text = str(self.status) + return [elt] def endElement(self, stack, name, text): if name == "status": @@ -187,24 +183,21 @@ class error_response_pdu(generic_pdu): stack.pop() stack[-1].endElement(stack, name, text) -class message_pdu(generic_pdu): +class message_pdu(base_elt): """ Up-Down protocol message wrapper. """ - def __str__(self): - return ('<?xml version="1.0" encoding="UTF-8"?>\n' - '<message xmlns="http://www.apnic.net/specs/rescerts/up-down/"\n' - ' version="1"\n' - ' sender="%s"\n' - ' recipient="%s"\n' - ' type="%s">\n' - '%s</message>\n' - % (self.sender, self.recipient, self.type, self.payload)) + version = 1 + + def toXML(self): + elt = self.make_elt("message", "version", "sender", "recipient", "type") + elt.extend(self.payload.toXML()) + return elt def startElement(self, stack, name, attrs): assert name == "message", "Unexpected name %s, stack %s" % (name, stack) - self.version = attrs["version"] + assert self.version == int(attrs["version"]) self.sender = attrs["sender"] self.recipient = attrs["recipient"] self.type = attrs["type"] @@ -219,11 +212,13 @@ class message_pdu(generic_pdu): }[attrs["type"]]() stack.append(self.payload) + def __str__(self): + lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8") + class sax_handler(sax_utils.handler): """ SAX handler for Up-Down protocol. """ def create_top_level(self, name, attrs): - assert name == "message" and attrs["version"] == "1" return message_pdu() diff --git a/scripts/xml-parse-test.py b/scripts/xml-parse-test.py index d8a954d5..8e830d8d 100755 --- a/scripts/xml-parse-test.py +++ b/scripts/xml-parse-test.py @@ -2,7 +2,7 @@ import glob, rpki.up_down, rpki.left_right, xml.sax, lxml.etree, lxml.sax -def test(fileglob, schema, proto): +def test(fileglob, schema, proto, encoding): rng = lxml.etree.RelaxNG(lxml.etree.parse(schema)) files = glob.glob(fileglob) files.sort() @@ -12,10 +12,14 @@ def test(fileglob, schema, proto): et = lxml.etree.parse(f) rng.assertValid(et) lxml.sax.saxify(et, handler) - et = lxml.etree.fromstring(str(handler.result)) - print lxml.etree.tostring(et) - rng.assertValid(et) + et = handler.result.toXML() + print lxml.etree.tostring(et, pretty_print=True, encoding=encoding, xml_declaration=True) + try: + rng.assertValid(et) + except lxml.etree.DocumentInvalid: + print rng.error_log.last_error + raise -test("up-down-protocol-samples/*.xml", "up-down-medium-schema.rng", rpki.up_down) +test("up-down-protocol-samples/*.xml", "up-down-medium-schema.rng", rpki.up_down, encoding="utf-8") -test("left-right-protocol-samples/*.xml", "left-right-schema.rng", rpki.left_right) +test("left-right-protocol-samples/*.xml", "left-right-schema.rng", rpki.left_right, encoding="us-ascii") |