# $Id$ import base64, sax_utils, resource_set class msg_pdu(object): """ Base type for all Up-Down PDUs. """ def __str__(self): return ('\ \n\ \n' \ % (self.sender, self.recipient, self.type) ) + self.toXML() + "\n" def toXML(self): return "" def startElement(self, name, attrs): pass def endElement(self, name, text): pass class cert_elt(object): """ Up-Down protocol representation of an issued certificate. """ def __init__(self, attrs): sax_utils.snarf_attribute(self, attrs, "cert_url") sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as) sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4) sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6) def __str__(self): xml = (' \n" return xml class class_elt(object): """ Up-Down protocol representation of a resource class. """ def __init__(self, attrs): sax_utils.snarf_attribute(self, attrs, ("class_name", "cert_url", "suggested_sia_head")) sax_utils.snarf_attribute(self, attrs, "resource_set_as", resource_set.resource_set_as) sax_utils.snarf_attribute(self, attrs, "resource_set_ipv4", resource_set.resource_set_ipv4) sax_utils.snarf_attribute(self, attrs, "resource_set_ipv6", resource_set.resource_set_ipv6) self.certs = [] def __str__(self): xml = ('\ \n \n" return xml class list_pdu(msg_pdu): """ Up-Down protocol "list" PDU. """ pass class list_response_pdu(msg_pdu): """ Up-Down protocol "list_response" PDU. """ def __init__(self): self.classes = [] def startElement(self, name, attrs): if name == "class": self.classes.append(class_elt(attrs)) elif name == "certificate": self.classes[-1].certs.append(cert_elt(attrs)) def endElement(self, name, text): if name == "certificate": self.classes[-1].certs[-1].cert = base64.b64decode(text) elif name == "issuer": self.classes[-1].issuer = base64.b64decode(text) def toXML(self): return "".join(map(str, self.classes)) class issue_pdu(msg_pdu): """ Up-Down protocol "issue" PDU. """ def startElement(self, name, attrs): assert name == "request" sax_utils.snarf_attribute(self, attrs, "class_name") sax_utils.snarf_attribute(self, attrs, "req_resource_set_as", resource_set.resource_set_as) sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv4", resource_set.resource_set_ipv4) sax_utils.snarf_attribute(self, attrs, "req_resource_set_ipv6", resource_set.resource_set_ipv6) def endElement(self, name, text): assert name == "request" self.pkcs10 = base64.b64decode(text) def toXML(self): xml = (' " + base64.b64encode(self.pkcs10) + "\n" class issue_response_pdu(list_response_pdu): """ Up-Down protocol "issue_response" PDU. """ def toXML(self): assert len(self.classes) == 1 return list_response_pdu.toXML(self) class revoke_pdu(msg_pdu): """ Up-Down protocol "revoke" PDU. """ def startElement(self, name, attrs): sax_utils.snarf_attribute(self, attrs, ("class_name", "ski")) def toXML(self): return (' \n' % (self.class_name, self.ski)) class revoke_response_pdu(revoke_pdu): """ Up-Down protocol "revoke_response" PDU. """ pass class error_response_pdu(msg_pdu): """ Up-Down protocol "error_response" PDU. """ def toXML(self): return ' %d\n' % self.status def endElement(self, name, text): if name == "status": self.status = int(text) elif name == "last_message_processed": self.last_message_processed = text elif name == "description": self.description = text class sax_handler(sax_utils.handler): """ SAX handler for Up-Down protocol. Builds message PDU then dispatches to that class's handler for nested data. """ def startElement(self, name, attrs): if name == "message": assert int(attrs.getValue("version")) == 1 self.set_obj({ "list" : list_pdu(), "list_response" : list_response_pdu(), "issue" : issue_pdu(), "issue_response" : issue_response_pdu(), "revoke" : revoke_pdu(), "revoke_response" : revoke_response_pdu(), "error_response" : error_response_pdu() }[attrs.getValue("type").encode("ascii")]) sax_utils.snarf_attribute(self.obj, attrs, ("sender", "recipient", "type")) else: self.obj.startElement(name, attrs) def endElement(self, name): if name != "message": self.obj.endElement(name, self.get_text())