diff options
Diffstat (limited to 'scripts/rpki/up_down.py')
-rw-r--r-- | scripts/rpki/up_down.py | 95 |
1 files changed, 55 insertions, 40 deletions
diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index b66691c5..84a76d32 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -7,17 +7,28 @@ xmlns="http://www.apnic.net/specs/rescerts/up-down/" nsmap = { None : xmlns } class base_elt(object): - """ - Generic PDU object, just provides some default methods. + """Generic PDU object. + + Virtual class, just provides some default methods. """ def startElement(self, stack, name, attrs): + """Ignore startElement() if there's no specific handler. + + Some elements have no attributes and we only care about their + text content. + """ pass def endElement(self, stack, name, text): + """Ignore endElement() if there's no specific handler. + + If we don't need to do anything else, just pop the stack. + """ stack.pop() def make_elt(self, name, *attrs): + """Construct a element, copying over a set of attributes.""" elt = lxml.etree.Element("{%s}%s" % (xmlns, name), nsmap=nsmap) for key in attrs: val = getattr(self, key, None) @@ -26,17 +37,17 @@ class base_elt(object): return elt def make_b64elt(self, elt, name, value=None): + """Construct a sub-element with Base64 text content.""" if value is None: value = getattr(self, name, None) if value is not None: lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(value) class certificate_elt(base_elt): - """ - Up-Down protocol representation of an issued certificate. - """ + """Up-Down protocol representation of an issued certificate.""" def startElement(self, stack, name, attrs): + """Handle attributes of <certificate/> element.""" assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack) self.cert_url = attrs["cert_url"] self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) @@ -44,25 +55,26 @@ class certificate_elt(base_elt): self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) def endElement(self, stack, name, text): + """Handle text content of a <certificate/> element.""" assert name == "certificate" self.cert = POW.pkix.Certificate() self.cert.fromString(base64.b64decode(text)) stack.pop() def toXML(self): + """Generate a <certificate/> element.""" elt = self.make_elt("certificate", "cert_url", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") elt.text = base64.b64encode(self.cert.toString()) return elt class class_elt(base_elt): - """ - Up-Down protocol representation of a resource class. - """ + """Up-Down protocol representation of a resource class.""" def __init__(self): self.certs = [] def startElement(self, stack, name, attrs): + """Handle <class/> elements and their children.""" if name == "certificate": cert = certificate_elt() self.certs.append(cert) @@ -78,6 +90,7 @@ class class_elt(base_elt): self.resource_set_ipv6 = resource_set.resource_set_ipv6(attrs["resource_set_ipv6"]) def endElement(self, stack, name, text): + """Handle <class/> elements and their children.""" if name == "issuer": self.issuer = POW.pkix.Certificate() self.issuer.fromString(base64.b64decode(text)) @@ -86,28 +99,27 @@ class class_elt(base_elt): stack.pop() def toXML(self): + """Generate a <class/> element.""" elt = self.make_elt("class", "class_name", "cert_url", "resource_set_as", "resource_set_ipv4", "resource_set_ipv6", "suggested_sia_head") elt.extend([i.toXML() for i in self.certs]) self.make_b64elt(elt, "issuer", self.issuer.toString()) return elt class list_pdu(base_elt): - """ - Up-Down protocol "list" PDU. - """ + """Up-Down protocol "list" PDU.""" def toXML(self): + """Generate (empty) payload of "list" PDU.""" return [] class list_response_pdu(base_elt): - """ - Up-Down protocol "list_response" PDU. - """ + """Up-Down protocol "list_response" PDU.""" def __init__(self): self.classes = [] def startElement(self, stack, name, attrs): + """Handle "list_response" PDU.""" assert name == "class", "Unexpected name %s, stack %s" % (name, stack) klass = class_elt() self.classes.append(klass) @@ -115,14 +127,14 @@ class list_response_pdu(base_elt): klass.startElement(stack, name, attrs) def toXML(self): + """Generate payload of "list_response" PDU.""" return [i.toXML() for i in self.classes] class issue_pdu(base_elt): - """ - Up-Down protocol "issue" PDU. - """ + """Up-Down protocol "issue" PDU.""" def startElement(self, stack, name, attrs): + """Handle "issue" PDU.""" assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.class_name = attrs["class_name"] self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) @@ -130,53 +142,46 @@ class issue_pdu(base_elt): self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) def endElement(self, stack, name, text): + """Handle "issue" PDU.""" assert name == "request", "Unexpected name %s, stack %s" % (name, stack) self.pkcs10 = base64.b64decode(text) stack.pop() def toXML(self): + """Generate payload of "issue" PDU.""" elt = self.make_elt("request", "class_name", "req_resource_set_as", "req_resource_set_ipv4", "req_resource_set_ipv6") elt.text = base64.b64encode(self.pkcs10) return [elt] class issue_response_pdu(list_response_pdu): - """ - Up-Down protocol "issue_response" PDU. - """ + """Up-Down protocol "issue_response" PDU.""" def toXML(self): + """Generate payload of "issue_response" PDU.""" assert len(self.classes) == 1 return list_response_pdu.toXML(self) class revoke_pdu(base_elt): - """ - Up-Down protocol "revoke" PDU. - """ + """Up-Down protocol "revoke" PDU.""" def startElement(self, stack, name, attrs): + """Handle "revoke" PDU.""" self.class_name = attrs["class_name"] self.ski = attrs["ski"] def toXML(self): + """Generate payload of "revoke" PDU.""" return [self.make_elt("key", "class_name", "ski")] class revoke_response_pdu(revoke_pdu): - """ - Up-Down protocol "revoke_response" PDU. - """ + """Up-Down protocol "revoke_response" PDU.""" pass class error_response_pdu(base_elt): - """ - Up-Down protocol "error_response" PDU. - """ - - def toXML(self): - elt = self.make_elt("status") - elt.text = str(self.status) - return [elt] + """Up-Down protocol "error_response" PDU.""" def endElement(self, stack, name, text): + """Handle "error_response" PDU.""" if name == "status": self.status = int(text) elif name == "last_message_processed": @@ -188,19 +193,30 @@ class error_response_pdu(base_elt): stack.pop() stack[-1].endElement(stack, name, text) + def toXML(self): + """Generate payload of "error_response" PDU.""" + elt = self.make_elt("status") + elt.text = str(self.status) + return [elt] + class message_pdu(base_elt): - """ - Up-Down protocol message wrapper. - """ + """Up-Down protocol message wrapper PDU.""" version = 1 def toXML(self): + """Generate payload of message PDU.""" elt = self.make_elt("message", "version", "sender", "recipient", "type") elt.extend(self.payload.toXML()) return elt def startElement(self, stack, name, attrs): + """Handle message PDU. + + Payload of the <message/> element varies depending on the "type" + attribute, so after some basic checks we have to instantiate the + right class object to handle whatever kind of PDU this is. + """ assert name == "message", "Unexpected name %s, stack %s" % (name, stack) assert self.version == int(attrs["version"]) self.sender = attrs["sender"] @@ -221,9 +237,8 @@ class message_pdu(base_elt): lxml.etree.tostring(self.toXML(), pretty_print=True, encoding="UTF-8") class sax_handler(sax_utils.handler): - """ - SAX handler for Up-Down protocol. - """ + """SAX handler for Up-Down protocol.""" def create_top_level(self, name, attrs): + """Top-level PDU for this protocol is <message/>.""" return message_pdu() |