# $Id$ import base64, sax_utils, resource_set class generic_pdu(object): """ Generic PDU object, just provides some default methods. """ def startElement(self, stack, name, attrs): pass def endElement(self, stack, name, text): stack.pop() class cert_elt(object): """ Up-Down protocol representation of an issued certificate. """ def __init__(self, attrs): self.cert_url = attrs["cert_url"] self.req_resource_set_as = resource_set.resource_set_as(attrs["req_resource_set_as"]) self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["req_resource_set_ipv4"]) self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["req_resource_set_ipv6"]) def __str__(self): xml = (' \n" return xml class class_elt(object): """ Up-Down protocol representation of a resource class. """ def __init__(self, attrs): self.class_name = attrs["class_name"] self.cert_url = attrs["cert_url"] self.suggested_sia_head = attrs.get("suggested_sia_head") self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"]) self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"]) self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"]) self.certs = [] def __str__(self): xml = ('\ \n \n" return xml class list_pdu(generic_pdu): """ Up-Down protocol "list" PDU. """ def __str__(self): return "" class list_response_pdu(generic_pdu): """ Up-Down protocol "list_response" PDU. """ def __init__(self): self.classes = [] def startElement(self, stack, name, attrs): if name == "class": self.classes.append(class_elt(attrs)) elif name == "certificate": self.classes[-1].certs.append(cert_elt(attrs)) elif name != "issuer": assert name == "list_response", "Unexpected name %s, stack %s" % (name, stack) def endElement(self, stack, name, text): if name == "certificate": self.classes[-1].certs[-1].cert = base64.b64decode(text) elif name == "issuer": self.classes[-1].issuer = base64.b64decode(text) elif name != "class": assert name == "message", "Unexpected name %s, stack %s" % (name, stack) stack.pop() stack[-1].endElement(stack, name, text) def __str__(self): return "".join(map(str, self.classes)) class issue_pdu(generic_pdu): """ Up-Down protocol "issue" PDU. """ def startElement(self, stack, name, attrs): assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.class_name = attrs["class_name"] self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) def endElement(self, stack, name, text): assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.pkcs10 = base64.b64decode(text) stack.pop() def __str__(self): xml = (' " + base64.b64encode(self.pkcs10) + "\n" class issue_response_pdu(list_response_pdu): """ Up-Down protocol "issue_response" PDU. """ def __str__(self): assert len(self.classes) == 1 return list_response_pdu.__str__(self) class revoke_pdu(generic_pdu): """ Up-Down protocol "revoke" PDU. """ def startElement(self, stack, name, attrs): self.class_name = attrs["class_name"] self.ski = attrs["ski"] def __str__(self): return (' \n' % (self.class_name, self.ski)) class revoke_response_pdu(revoke_pdu): """ Up-Down protocol "revoke_response" PDU. """ pass class error_response_pdu(generic_pdu): """ Up-Down protocol "error_response" PDU. """ def __str__(self): return ' %d\n' % self.status def endElement(self, stack, name, text): if name == "status": self.status = int(text) elif name == "last_message_processed": self.last_message_processed = text elif name == "description": self.description = text else: assert name == "message", "Unexpected name %s, stack %s" % (name, stack) stack.pop() stack[-1].endElement(stack, name, text) class message_pdu(generic_pdu): """ Up-Down protocol message wrapper. """ def __str__(self): return ('\ \n\ \n' \ % (self.sender, self.recipient, self.type) ) + str(self.payload) + "\n" def startElement(self, stack, name, attrs): assert name == "message", "Unexpected name %s, stack %s" % (name, stack) self.version = attrs["version"] self.sender = attrs["sender"] self.recipient = attrs["recipient"] self.type = attrs["type"] self.payload = { "list" : list_pdu, "list_response" : list_response_pdu, "issue" : issue_pdu, "issue_response" : issue_response_pdu, "revoke" : revoke_pdu, "revoke_response" : revoke_response_pdu, "error_response" : error_response_pdu }[attrs["type"]]() stack.append(self.payload) class sax_handler(sax_utils.handler): """ SAX handler for Up-Down protocol. """ def create_top_level(self, name, attrs): assert name == "message" and attrs["version"] == "1" return message_pdu()