aboutsummaryrefslogtreecommitdiff
path: root/scripts/xml-parse-test.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/xml-parse-test.py')
-rwxr-xr-xscripts/xml-parse-test.py197
1 files changed, 105 insertions, 92 deletions
diff --git a/scripts/xml-parse-test.py b/scripts/xml-parse-test.py
index b3fd6942..e3c29480 100755
--- a/scripts/xml-parse-test.py
+++ b/scripts/xml-parse-test.py
@@ -11,19 +11,63 @@ def relaxng(xml, rng):
if v != "- validates\n":
raise RuntimeError, "RelaxNG validation failure:\n" + v
-class rpki_updown_resource_set(object):
+class v4addr(long):
+ bits = 32
+
+ def __new__(cls, x):
+ r = struct.unpack("!I", socket.inet_pton(socket.AF_INET, x))
+ return long.__new__(cls, r[0])
+
+ def __str__(self):
+ return socket.inet_ntop(socket.AF_INET, struct.pack("!I", long(self)))
+
+class v6addr(long):
+ bits = 128
+
+ def __new__(cls, x):
+ r = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6, x))
+ return long.__new__(cls, (r[0] << 64) | r[1])
+
+ def __str__(self):
+ return socket.inet_ntop(socket.AF_INET6,
+ struct.pack("!QQ", long(self) >> 64,
+ long(self) & 0xFFFFFFFFFFFFFFFF))
+
+class resource_range(object):
+
+ def __init__(self, min, max):
+ assert min <= max, "Mis-ordered range: %s before %s" % (str(min), str(max))
+ self.min = min
+ self.max = max
+
+ def __cmp__(self, other):
+ c = self.min - other.min
+ if c == 0:
+ c = self.max - other.max
+ return c
+
+class resource_range_as(resource_range):
- class range(object):
+ def __str__(self):
+ if self.min == self.max:
+ return str(self.min)
+ else:
+ return str(self.min) + "-" + str(self.max)
+
+class resource_range_addr(resource_range):
- def __init__(self, min, max):
- self.min = min
- self.max = max
+ def __str__(self):
+ mask = self.min ^ self.max
+ prefixlen = self.min.bits
+ while mask & 1:
+ prefixlen -= 1
+ mask >>= 1
+ if mask:
+ return str(self.min) + "-" + str(self.max)
+ else:
+ return str(self.min) + "/" + str(prefixlen)
- def __cmp__(self, other):
- c = self.min - other.min
- if c == 0:
- c = self.max - other.max
- return c
+class resource_set(object):
def __init__(self, s):
if s == "":
@@ -36,72 +80,41 @@ class rpki_updown_resource_set(object):
assert self.vec[i].max < self.vec[i+1].min, 'Resource overlap "%s"' % (s)
def __str__(self):
- vec = map(self.tostr, self.vec)
+ vec = map(str, self.vec)
return ",".join(vec)
-class rpki_updown_resource_set_as(rpki_updown_resource_set):
+class resource_set_as(resource_set):
- def parse(self, elt):
- r = re.match("^([0-9]+)-([0-9]+)$", elt)
+ def parse(self, x):
+ r = re.match("^([0-9]+)-([0-9]+)$", x)
if r:
- return self.range(long(r.group(1)), long(r.group(2)))
- else:
- return self.range(long(elt), long(elt))
-
- def tostr(self, elt):
- if elt.min == elt.max:
- return str(elt.min)
+ return resource_range_as(long(r.group(1)), long(r.group(2)))
else:
- return str(elt.min) + "-" + str(elt.max)
+ return resource_range_as(long(x), long(x))
-class rpki_updown_resource_set_ip(rpki_updown_resource_set):
+class resource_set_ip(resource_set):
- def parse(self, elt):
- r = re.match("^([0-9:.a-fA-F]+)-([0-9:.a-fA-F]+)$", elt)
+ def parse(self, x):
+ r = re.match("^([0-9:.a-fA-F]+)-([0-9:.a-fA-F]+)$", x)
if r:
- return self.range(self.pton(r.group(1)), self.pton(r.group(2)))
- r = re.match("^([0-9:.a-fA-F]+)/([0-9]+)$", elt)
+ return resource_range_addr(self.addr_type(r.group(1)), self.addr_type(r.group(2)))
+ r = re.match("^([0-9:.a-fA-F]+)/([0-9]+)$", x)
if r:
- min = self.pton(r.group(1))
+ min = self.addr_type(r.group(1))
prefixlen = int(r.group(2))
- mask = (1 << (self.bits - prefixlen)) - 1
- assert (min & mask) == 0, "Resource not in canonical form: %s" % (elt)
+ mask = (1 << (self.addr_type.bits - prefixlen)) - 1
+ assert (min & mask) == 0, "Resource not in canonical form: %s" % (x)
max = min | mask
- return self.range(min, max)
- raise RuntimeError, 'Bad IP resource "%s"' % (elt)
-
- def tostr(self, elt):
- mask = elt.min ^ elt.max
- prefixlen = self.bits
- while mask & 1:
- prefixlen -= 1
- mask >>= 1
- if mask:
- return self.ntop(elt.min) + "-" + self.ntop(elt.max)
- else:
- return self.ntop(elt.min) + "/" + str(prefixlen)
-
-class rpki_updown_resource_set_ipv4(rpki_updown_resource_set_ip):
- bits = 32
-
- def pton(self, x):
- r = struct.unpack("!I", socket.inet_pton(socket.AF_INET, x))
- return r[0]
-
- def ntop(self, x):
- return socket.inet_ntop(socket.AF_INET, struct.pack("!I", x))
+ return resource_range_addr(min, max)
+ raise RuntimeError, 'Bad IP resource "%s"' % (x)
-class rpki_updown_resource_set_ipv6(rpki_updown_resource_set_ip):
- bits = 128
-
- def pton(self, x):
- r = struct.unpack("!QQ", socket.inet_pton(socket.AF_INET6, x))
- return (r[0] << 64) | r[1]
+class resource_set_ipv4(resource_set_ip):
+ addr_type = v4addr
- def ntop(self, x):
- return socket.inet_ntop(socket.AF_INET6, struct.pack("!QQ", x >> 64, x & 0xFFFFFFFFFFFFFFFF))
+class resource_set_ipv6(resource_set_ip):
+ addr_type = v6addr
-class rpki_updown_msg(object):
+class msg(object):
def msgToXML(self):
return ('\
@@ -123,14 +136,14 @@ class rpki_updown_msg(object):
def endElement(self, name, text):
pass
-class rpki_updown_cert(object):
+class cert(object):
def __init__(self, attrs):
for k in ("cert_url", ):
setattr(self, k, attrs.getValue(k).encode("ascii"))
- for k,f in (("req_resource_set_as", rpki_updown_resource_set_as),
- ("req_resource_set_ipv4", rpki_updown_resource_set_ipv4),
- ("req_resource_set_ipv6", rpki_updown_resource_set_ipv6)):
+ for k,f in (("req_resource_set_as", resource_set_as),
+ ("req_resource_set_ipv4", resource_set_ipv4),
+ ("req_resource_set_ipv6", resource_set_ipv6)):
try:
setattr(self, k, f(attrs.getValue(k).encode("ascii")))
except KeyError:
@@ -147,14 +160,14 @@ class rpki_updown_cert(object):
xml += ">" + base64.b64encode(self.cert) + "</certificate>\n"
return xml
-class rpki_updown_class(object):
+class resource_class(object):
def __init__(self, attrs):
for k in ("class_name", "cert_url"):
setattr(self, k, attrs.getValue(k).encode("ascii"))
- for k,f in (("resource_set_as", rpki_updown_resource_set_as),
- ("resource_set_ipv4", rpki_updown_resource_set_ipv4),
- ("resource_set_ipv6", rpki_updown_resource_set_ipv6)):
+ for k,f in (("resource_set_as", resource_set_as),
+ ("resource_set_ipv4", resource_set_ipv4),
+ ("resource_set_ipv6", resource_set_ipv6)):
setattr(self, k, f(attrs.getValue(k).encode("ascii")))
try:
self.suggested_sia_head = attrs.getValue("suggested_sia_head")
@@ -179,19 +192,19 @@ class rpki_updown_class(object):
xml += " <issuer>" + base64.b64encode(self.issuer) + "</issuer>\n </class>\n"
return xml
-class rpki_updown_list(rpki_updown_msg):
+class list(msg):
pass
-class rpki_updown_list_response(rpki_updown_msg):
+class list_response(msg):
def __init__(self):
self.resource_classes = []
def startElement(self, name, attrs):
if name == "class":
- self.resource_classes.append(rpki_updown_class(attrs))
+ self.resource_classes.append(resource_class(attrs))
elif name == "certificate":
- self.resource_classes[-1].certs.append(rpki_updown_cert(attrs))
+ self.resource_classes[-1].certs.append(cert(attrs))
def endElement(self, name, text):
if name == "certificate":
@@ -205,14 +218,14 @@ class rpki_updown_list_response(rpki_updown_msg):
xml += c.toXML()
return xml
-class rpki_updown_issue(rpki_updown_msg):
+class issue(msg):
def startElement(self, name, attrs):
assert name == "request"
self.class_name = attrs.getValue("class_name")
- for k,f in (("req_resource_set_as", rpki_updown_resource_set_as),
- ("req_resource_set_ipv4", rpki_updown_resource_set_ipv4),
- ("req_resource_set_ipv6", rpki_updown_resource_set_ipv6)):
+ for k,f in (("req_resource_set_as", resource_set_as),
+ ("req_resource_set_ipv4", resource_set_ipv4),
+ ("req_resource_set_ipv6", resource_set_ipv6)):
try:
setattr(self, k, f(attrs.getValue(k).encode("ascii")))
except KeyError:
@@ -232,13 +245,13 @@ class rpki_updown_issue(rpki_updown_msg):
xml += ('\n req_resource_set_ipv6="%s"' % self.req_resource_set_ipv6)
return xml + ">" + base64.b64encode(self.pkcs10) + "</request>\n"
-class rpki_updown_issue_response(rpki_updown_list_response):
+class issue_response(list_response):
def toXML(self):
assert len(self.resource_classes) == 1
- return rpki_updown_list_response.toXML(self)
+ return list_response.toXML(self)
-class rpki_updown_revoke(rpki_updown_msg):
+class revoke(msg):
def startElement(self, name, attrs):
self.class_name = attrs.getValue("class_name")
@@ -247,10 +260,10 @@ class rpki_updown_revoke(rpki_updown_msg):
def toXML(self):
return (' <key class_name="%s" ski="%s" />\n' % (self.class_name, self.ski))
-class rpki_updown_revoke_response(rpki_updown_revoke):
+class revoke_response(revoke):
pass
-class rpki_updown_error_response(rpki_updown_msg):
+class error_response(msg):
def toXML(self):
return ' <status>%d</status>\n' % self.status
@@ -263,7 +276,7 @@ class rpki_updown_error_response(rpki_updown_msg):
elif name == "description":
self.description = text
-class rpki_updown_sax_handler(xml.sax.handler.ContentHandler):
+class sax_handler(xml.sax.handler.ContentHandler):
def __init__(self):
self.text = ""
@@ -280,13 +293,13 @@ class rpki_updown_sax_handler(xml.sax.handler.ContentHandler):
assert int(attrs.getValue("version")) == 1
if self.obj == None:
self.obj = {
- "list" : rpki_updown_list(),
- "list_response" : rpki_updown_list_response(),
- "issue" : rpki_updown_issue(),
- "issue_response" : rpki_updown_issue_response(),
- "revoke" : rpki_updown_revoke(),
- "revoke_response" : rpki_updown_revoke_response(),
- "error_response" : rpki_updown_error_response()
+ "list" : list(),
+ "list_response" : list_response(),
+ "issue" : issue(),
+ "issue_response" : issue_response(),
+ "revoke" : revoke(),
+ "revoke_response" : revoke_response(),
+ "error_response" : error_response()
}[attrs.getValue("type")]
assert self.obj != None
for k in ("type", "sender", "recipient"):
@@ -309,7 +322,7 @@ files.sort()
for f in files:
# try:
- handler = rpki_updown_sax_handler()
+ handler = sax_handler()
# parser = xml.sax.make_parser()
# parser.setContentHandler(handler)