From 47093d0254fd81857e1199874a255f50f7c8694a Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Tue, 31 Jul 2007 05:45:57 +0000 Subject: Use lxml for encoding too svn path=/scripts/rpki/left_right.py; revision=796 --- scripts/rpki/up_down.py | 129 +++++++++++++++++++++++------------------------- 1 file changed, 62 insertions(+), 67 deletions(-) (limited to 'scripts/rpki/up_down.py') diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index fcdd213b..87f7b05c 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -1,8 +1,12 @@ # $Id$ -import base64, sax_utils, resource_set +import base64, sax_utils, resource_set, lxml.etree -class generic_pdu(object): +xmlns="http://www.apnic.net/specs/rescerts/up-down/" + +nsmap = { None : xmlns } + +class base_elt(object): """ Generic PDU object, just provides some default methods. """ @@ -13,7 +17,18 @@ class generic_pdu(object): def endElement(self, stack, name, text): stack.pop() -class certificate_elt(generic_pdu): + def make_elt(self, name, *attrs): + elt = lxml.etree.Element("{%s}%s" % (xmlns, name), nsmap=nsmap) + for key in attrs: + val = getattr(self, key, None) + if val is not None: + elt.set(key, str(val)) + return elt + + def make_b64elt(self, elt, name): + lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(getattr(self, name)) + +class certificate_elt(base_elt): """ Up-Down protocol representation of an issued certificate. """ @@ -30,18 +45,12 @@ class certificate_elt(generic_pdu): self.cert = base64.b64decode(text) stack.pop() - def __str__(self): - xml = (' \n" - return xml - -class class_elt(generic_pdu): + def toXML(self): + elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") + elt.text = base64.b64encode(self.cert) + return elt + +class class_elt(base_elt): """ Up-Down protocol representation of a resource class. """ @@ -71,31 +80,21 @@ class class_elt(generic_pdu): assert name == "class", "Unexpected name %s, stack %s" % (name, stack) stack.pop() - def __str__(self): - xml = (' \n \n" - return xml - -class list_pdu(generic_pdu): + def toXML(self): + elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head") + elt.extend([i.toXML() for i in self.certs]) + self.make_b64elt(elt, "issuer") + return elt + +class list_pdu(base_elt): """ Up-Down protocol "list" PDU. """ - def __str__(self): - return "" + def toXML(self): + return [] -class list_response_pdu(generic_pdu): +class list_response_pdu(base_elt): """ Up-Down protocol "list_response" PDU. """ @@ -110,10 +109,10 @@ class list_response_pdu(generic_pdu): stack.append(klass) klass.startElement(stack, name, attrs) - def __str__(self): - return "".join(map(str, self.classes)) + def toXML(self): + return [i.toXML() for i in self.classes] -class issue_pdu(generic_pdu): +class issue_pdu(base_elt): """ Up-Down protocol "issue" PDU. """ @@ -130,26 +129,21 @@ class issue_pdu(generic_pdu): self.pkcs10 = base64.b64decode(text) stack.pop() - def __str__(self): - xml = (' " + base64.b64encode(self.pkcs10) + "\n" + def toXML(self): + elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") + elt.text = base64.b64encode(self.pkcs10) + return [elt] class issue_response_pdu(list_response_pdu): """ Up-Down protocol "issue_response" PDU. """ - def __str__(self): + def toXML(self): assert len(self.classes) == 1 - return list_response_pdu.__str__(self) + return list_response_pdu.toXML(self) -class revoke_pdu(generic_pdu): +class revoke_pdu(base_elt): """ Up-Down protocol "revoke" PDU. """ @@ -158,8 +152,8 @@ class revoke_pdu(generic_pdu): self.class_name = attrs["class_name"] self.ski = attrs["ski"] - def __str__(self): - return (' \n' % (self.class_name, self.ski)) + def toXML(self): + return [self.make_elt("key", "class_name", "ski")] class revoke_response_pdu(revoke_pdu): """ @@ -167,13 +161,15 @@ class revoke_response_pdu(revoke_pdu): """ pass -class error_response_pdu(generic_pdu): +class error_response_pdu(base_elt): """ Up-Down protocol "error_response" PDU. """ - def __str__(self): - return ' %d\n' % self.status + def toXML(self): + elt = self.make_elt("status") + elt.text = str(self.status) + return [elt] def endElement(self, stack, name, text): if name == "status": @@ -187,24 +183,21 @@ class error_response_pdu(generic_pdu): stack.pop() stack[-1].endElement(stack, name, text) -class message_pdu(generic_pdu): +class message_pdu(base_elt): """ Up-Down protocol message wrapper. """ - def __str__(self): - return ('\n' - '\n' - '%s\n' - % (self.sender, self.recipient, self.type, self.payload)) + version = 1 + + def toXML(self): + elt = self.make_elt("message", "version", "sender", "recipient", "type") + elt.extend(self.payload.toXML()) + return elt def startElement(self, stack, name, attrs): assert name == "message", "Unexpected name %s, stack %s" % (name, stack) - self.version = attrs["version"] + assert self.version == int(attrs["version"]) self.sender = attrs["sender"] self.recipient = attrs["recipient"] self.type = attrs["type"] @@ -219,11 +212,13 @@ class message_pdu(generic_pdu): }[attrs["type"]]() stack.append(self.payload) + def __str__(self): + lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8") + class sax_handler(sax_utils.handler): """ SAX handler for Up-Down protocol. """ def create_top_level(self, name, attrs): - assert name == "message" and attrs["version"] == "1" return message_pdu() -- cgit v1.2.3