From 32b7a7b22c11129c2c17d8adc3f6aceac0e6de0b Mon Sep 17 00:00:00 2001 From: Rob Austein Date: Sun, 16 Sep 2007 21:06:15 +0000 Subject: Switch to using APNIC's preferred version of the up-down protocol schema. I still think the folks at APNIC are wrong about allowing bogus error codes to slip past schema checking, but coding around this problem is less work in the long run than maintaining a forked schema would be. Time to bury the hatchet and move on. svn path=/scripts/Makefile; revision=975 --- scripts/rpki/up_down.py | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) (limited to 'scripts/rpki') diff --git a/scripts/rpki/up_down.py b/scripts/rpki/up_down.py index 256ef790..d1437775 100644 --- a/scripts/rpki/up_down.py +++ b/scripts/rpki/up_down.py @@ -45,13 +45,37 @@ class base_elt(object): if value is not None: lxml.etree.SubElement(elt, "{%s}%s" % (xmlns, name), nsmap=nsmap).text = base64.b64encode(value) +class multi_uri(list): + """Container for a set of URIs.""" + + def __init__(self, ini): + """Initialize a set of URIs, which includes basic some syntax checking.""" + if isinstance(ini, (list, tuple)): + self[:] = ini + elif isinstance(ini, str): + self[:] = ini.split(",") + for s in self: + assert s.strip() == s and s.find("://") >= 0, "Bad URI \"%s\"" % s + else: + raise TypeError + + def __str__(self): + return ",".join(self) + + def rsync(self): + """Find first rsync://... URI in self.""" + for s in self: + if s.startswith("rsync://"): + return s + return None + class certificate_elt(base_elt): """Up-Down protocol representation of an issued certificate.""" def startElement(self, stack, name, attrs): """Handle attributes of element.""" assert name == "certificate", "Unexpected name %s, stack %s" % (name, stack) - self.cert_url = attrs["cert_url"] + self.cert_url = multi_uri(attrs["cert_url"]) self.req_resource_set_as = resource_set.resource_set_as(attrs.get("req_resource_set_as")) self.req_resource_set_ipv4 = resource_set.resource_set_ipv4(attrs.get("req_resource_set_ipv4")) self.req_resource_set_ipv6 = resource_set.resource_set_ipv6(attrs.get("req_resource_set_ipv6")) @@ -84,7 +108,7 @@ class class_elt(base_elt): elif name != "issuer": assert name == "class", "Unexpected name %s, stack %s" % (name, stack) self.class_name = attrs["class_name"] - self.cert_url = attrs["cert_url"] + self.cert_url = multi_uri(attrs["cert_url"]) self.suggested_sia_head = attrs.get("suggested_sia_head") self.resource_set_as = resource_set.resource_set_as(attrs["resource_set_as"]) self.resource_set_ipv4 = resource_set.resource_set_ipv4(attrs["resource_set_ipv4"]) @@ -180,10 +204,23 @@ class revoke_response_pdu(revoke_pdu): class error_response_pdu(base_elt): """Up-Down protocol "error_response" PDU.""" + codes = { + 1101 : "Already processing request", + 1102 : "Version number error", + 1103 : "Unrecognised request type", + 1201 : "Request - no such resource class", + 1202 : "Request - no resources allocated in resource class", + 1203 : "Request - badly formed certificate request", + 1301 : "Revoke - no such resource class", + 1302 : "Revoke - no such key", + 2001 : "Internal Server Error - Request not performed" } + def endElement(self, stack, name, text): """Handle "error_response" PDU.""" if name == "status": - self.status = int(text) + code = int(text) + assert code in self.codes + self.status = code elif name == "last_message_processed": self.last_message_processed = text elif name == "description": @@ -195,6 +232,7 @@ class error_response_pdu(base_elt): def toXML(self): """Generate payload of "error_response" PDU.""" + assert self.status in self.codes elt = self.make_elt("status") elt.text = str(self.status) return [elt] -- cgit v1.2.3