diff options
Diffstat (limited to 'scripts/xml-parse-test.py')
-rwxr-xr-x | scripts/xml-parse-test.py | 197 |
1 files changed, 105 insertions, 92 deletions
diff --git a/scripts/xml-parse-test.py b/scripts/xml-parse-test.py index b3fd6942..e3c29480 100755 --- a/scripts/xml-parse-test.py +++ b/scripts/xml-parse-test.py @@ -11,19 +11,63 @@ def relaxng(xml, rng): if v != "- validates\n": raise RuntimeError, "RelaxNG validation failure:\n" + v -class rpki_updown_resource_set(object): +class v4addr(long): + bits = 32 + + def __new__(cls, x): + r = struct.unpack("!I", socket.inet_pton(socket.AF_INET, x)) + return long.__new__(cls, r[0]) + + def __str__(self): + return socket.inet_ntop(socket.AF_INET, struct.pack("!I", long(self))) + +class v6addr(long): + bits = 128 + + def __new__(cls, x): + r = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6, x)) + return long.__new__(cls, (r[0] << 64) | r[1]) + + def __str__(self): + return socket.inet_ntop(socket.AF_INET6, + struct.pack("!QQ", long(self) >> 64, + long(self) & 0xFFFFFFFFFFFFFFFF)) + +class resource_range(object): + + def __init__(self, min, max): + assert min <= max, "Mis-ordered range: %s before %s" % (str(min), str(max)) + self.min = min + self.max = max + + def __cmp__(self, other): + c = self.min - other.min + if c == 0: + c = self.max - other.max + return c + +class resource_range_as(resource_range): - class range(object): + def __str__(self): + if self.min == self.max: + return str(self.min) + else: + return str(self.min) + "-" + str(self.max) + +class resource_range_addr(resource_range): - def __init__(self, min, max): - self.min = min - self.max = max + def __str__(self): + mask = self.min ^ self.max + prefixlen = self.min.bits + while mask & 1: + prefixlen -= 1 + mask >>= 1 + if mask: + return str(self.min) + "-" + str(self.max) + else: + return str(self.min) + "/" + str(prefixlen) - def __cmp__(self, other): - c = self.min - other.min - if c == 0: - c = self.max - other.max - return c +class resource_set(object): def __init__(self, s): if s == "": @@ -36,72 +80,41 @@ class rpki_updown_resource_set(object): assert self.vec[i].max < self.vec[i+1].min, 'Resource overlap "%s"' % (s) def __str__(self): - vec = map(self.tostr, self.vec) + vec = map(str, self.vec) return ",".join(vec) -class rpki_updown_resource_set_as(rpki_updown_resource_set): +class resource_set_as(resource_set): - def parse(self, elt): - r = re.match("^([0-9]+)-([0-9]+)$", elt) + def parse(self, x): + r = re.match("^([0-9]+)-([0-9]+)$", x) if r: - return self.range(long(r.group(1)), long(r.group(2))) - else: - return self.range(long(elt), long(elt)) - - def tostr(self, elt): - if elt.min == elt.max: - return str(elt.min) + return resource_range_as(long(r.group(1)), long(r.group(2))) else: - return str(elt.min) + "-" + str(elt.max) + return resource_range_as(long(x), long(x)) -class rpki_updown_resource_set_ip(rpki_updown_resource_set): +class resource_set_ip(resource_set): - def parse(self, elt): - r = re.match("^([0-9:.a-fA-F]+)-([0-9:.a-fA-F]+)$", elt) + def parse(self, x): + r = re.match("^([0-9:.a-fA-F]+)-([0-9:.a-fA-F]+)$", x) if r: - return self.range(self.pton(r.group(1)), self.pton(r.group(2))) - r = re.match("^([0-9:.a-fA-F]+)/([0-9]+)$", elt) + return resource_range_addr(self.addr_type(r.group(1)), self.addr_type(r.group(2))) + r = re.match("^([0-9:.a-fA-F]+)/([0-9]+)$", x) if r: - min = self.pton(r.group(1)) + min = self.addr_type(r.group(1)) prefixlen = int(r.group(2)) - mask = (1 << (self.bits - prefixlen)) - 1 - assert (min & mask) == 0, "Resource not in canonical form: %s" % (elt) + mask = (1 << (self.addr_type.bits - prefixlen)) - 1 + assert (min & mask) == 0, "Resource not in canonical form: %s" % (x) max = min | mask - return self.range(min, max) - raise RuntimeError, 'Bad IP resource "%s"' % (elt) - - def tostr(self, elt): - mask = elt.min ^ elt.max - prefixlen = self.bits - while mask & 1: - prefixlen -= 1 - mask >>= 1 - if mask: - return self.ntop(elt.min) + "-" + self.ntop(elt.max) - else: - return self.ntop(elt.min) + "/" + str(prefixlen) - -class rpki_updown_resource_set_ipv4(rpki_updown_resource_set_ip): - bits = 32 - - def pton(self, x): - r = struct.unpack("!I", socket.inet_pton(socket.AF_INET, x)) - return r[0] - - def ntop(self, x): - return socket.inet_ntop(socket.AF_INET, struct.pack("!I", x)) + return resource_range_addr(min, max) + raise RuntimeError, 'Bad IP resource "%s"' % (x) -class rpki_updown_resource_set_ipv6(rpki_updown_resource_set_ip): - bits = 128 - - def pton(self, x): - r = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6, x)) - return (r[0] << 64) | r[1] +class resource_set_ipv4(resource_set_ip): + addr_type = v4addr - def ntop(self, x): - return socket.inet_ntop(socket.AF_INET6, struct.pack("!QQ", x >> 64, x & 0xFFFFFFFFFFFFFFFF)) +class resource_set_ipv6(resource_set_ip): + addr_type = v6addr -class rpki_updown_msg(object): +class msg(object): def msgToXML(self): return ('\ @@ -123,14 +136,14 @@ class rpki_updown_msg(object): def endElement(self, name, text): pass -class rpki_updown_cert(object): +class cert(object): def __init__(self, attrs): for k in ("cert_url", ): setattr(self, k, attrs.getValue(k).encode("ascii")) - for k,f in (("req_resource_set_as", rpki_updown_resource_set_as), - ("req_resource_set_ipv4", rpki_updown_resource_set_ipv4), - ("req_resource_set_ipv6", rpki_updown_resource_set_ipv6)): + for k,f in (("req_resource_set_as", resource_set_as), + ("req_resource_set_ipv4", resource_set_ipv4), + ("req_resource_set_ipv6", resource_set_ipv6)): try: setattr(self, k, f(attrs.getValue(k).encode("ascii"))) except KeyError: @@ -147,14 +160,14 @@ class rpki_updown_cert(object): xml += ">" + base64.b64encode(self.cert) + "</certificate>\n" return xml -class rpki_updown_class(object): +class resource_class(object): def __init__(self, attrs): for k in ("class_name", "cert_url"): setattr(self, k, attrs.getValue(k).encode("ascii")) - for k,f in (("resource_set_as", rpki_updown_resource_set_as), - ("resource_set_ipv4", rpki_updown_resource_set_ipv4), - ("resource_set_ipv6", rpki_updown_resource_set_ipv6)): + for k,f in (("resource_set_as", resource_set_as), + ("resource_set_ipv4", resource_set_ipv4), + ("resource_set_ipv6", resource_set_ipv6)): setattr(self, k, f(attrs.getValue(k).encode("ascii"))) try: self.suggested_sia_head = attrs.getValue("suggested_sia_head") @@ -179,19 +192,19 @@ class rpki_updown_class(object): xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n" return xml -class rpki_updown_list(rpki_updown_msg): +class list(msg): pass -class rpki_updown_list_response(rpki_updown_msg): +class list_response(msg): def __init__(self): self.resource_classes = [] def startElement(self, name, attrs): if name == "class": - self.resource_classes.append(rpki_updown_class(attrs)) + self.resource_classes.append(resource_class(attrs)) elif name == "certificate": - self.resource_classes[-1].certs.append(rpki_updown_cert(attrs)) + self.resource_classes[-1].certs.append(cert(attrs)) def endElement(self, name, text): if name == "certificate": @@ -205,14 +218,14 @@ class rpki_updown_list_response(rpki_updown_msg): xml += c.toXML() return xml -class rpki_updown_issue(rpki_updown_msg): +class issue(msg): def startElement(self, name, attrs): assert name == "request" self.class_name = attrs.getValue("class_name") - for k,f in (("req_resource_set_as", rpki_updown_resource_set_as), - ("req_resource_set_ipv4", rpki_updown_resource_set_ipv4), - ("req_resource_set_ipv6", rpki_updown_resource_set_ipv6)): + for k,f in (("req_resource_set_as", resource_set_as), + ("req_resource_set_ipv4", resource_set_ipv4), + ("req_resource_set_ipv6", resource_set_ipv6)): try: setattr(self, k, f(attrs.getValue(k).encode("ascii"))) except KeyError: @@ -232,13 +245,13 @@ class rpki_updown_issue(rpki_updown_msg): xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6) return xml + ">" + base64.b64encode(self.pkcs10) + "</request>\n" -class rpki_updown_issue_response(rpki_updown_list_response): +class issue_response(list_response): def toXML(self): assert len(self.resource_classes) == 1 - return rpki_updown_list_response.toXML(self) + return list_response.toXML(self) -class rpki_updown_revoke(rpki_updown_msg): +class revoke(msg): def startElement(self, name, attrs): self.class_name = attrs.getValue("class_name") @@ -247,10 +260,10 @@ class rpki_updown_revoke(rpki_updown_msg): def toXML(self): return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski)) -class rpki_updown_revoke_response(rpki_updown_revoke): +class revoke_response(revoke): pass -class rpki_updown_error_response(rpki_updown_msg): +class error_response(msg): def toXML(self): return ' <status>%d</status>\n' % self.status @@ -263,7 +276,7 @@ class rpki_updown_error_response(rpki_updown_msg): elif name == "description": self.description = text -class rpki_updown_sax_handler(xml.sax.handler.ContentHandler): +class sax_handler(xml.sax.handler.ContentHandler): def __init__(self): self.text = "" @@ -280,13 +293,13 @@ class rpki_updown_sax_handler(xml.sax.handler.ContentHandler): assert int(attrs.getValue("version")) == 1 if self.obj == None: self.obj = { - "list" : rpki_updown_list(), - "list_response" : rpki_updown_list_response(), - "issue" : rpki_updown_issue(), - "issue_response" : rpki_updown_issue_response(), - "revoke" : rpki_updown_revoke(), - "revoke_response" : rpki_updown_revoke_response(), - "error_response" : rpki_updown_error_response() + "list" : list(), + "list_response" : list_response(), + "issue" : issue(), + "issue_response" : issue_response(), + "revoke" : revoke(), + "revoke_response" : revoke_response(), + "error_response" : error_response() }[attrs.getValue("type")] assert self.obj != None for k in ("type", "sender", "recipient"): @@ -309,7 +322,7 @@ files.sort() for f in files: # try: - handler = rpki_updown_sax_handler() + handler = sax_handler() # parser = xml.sax.make_parser() # parser.setContentHandler(handler) |