diff options
Diffstat (limited to 'scripts/rpki/left_right.py')
-rw-r--r-- | scripts/rpki/left_right.py | 324 |
1 files changed, 205 insertions, 119 deletions
diff --git a/scripts/rpki/left_right.py b/scripts/rpki/left_right.py index b3a72d7a..9b926ddd 100644 --- a/scripts/rpki/left_right.py +++ b/scripts/rpki/left_right.py @@ -10,131 +10,228 @@ class base_elt(object): Base type for left-right message elements. """ - # This isn't quite right, as_number is only multivalue in some elements. - # Live with it for the moment, fix after code stablizes. + def startElement(self, stack, name, attrs): + pass - multivalue = ("peer_contact", "signing_cert", "extension_preference", "resource_class", - "as_number", "as_range", "subset_as_number", "subset_as_range", - "ipv4_prefix", "ipv4_range", "subset_ipv4_prefix", "subset_ipv4_range", - "ipv6_prefix", "ipv6_range", "subset_ipv6_prefix", "subset_ipv6_range") + def endElement(self, stack, name, text): + stack.pop() - b64content = ("peer_ta", "pkcs10_cert_request", "public_key", "signing_cert") +class extension_preference_elt(base_elt): + """ + Container for extension preferences. + """ - pdu_objects = ("self", "child", "parent", "bsc", "repository", "route_origin", - "list_resources", "report_error") + def startElement(self, stack, name, attrs): + assert name == "extension_preference" + self.name = attrs["name"] - def store(self, key, val): - if key not in self.multivalue: - assert not hasattr(self, key) - setattr(self, key, val) - elif hasattr(self, key): - getattr(self, key).append(val) - else: - setattr(self, key, [val]) + def endElement(self, stack, name, text): + self.value = text + stack.pop() + +class self_elt(base_elt): + + booleans = ("rekey", "reissue", "revoke", "run_now", "publish_world_now") + + rekey = False + reissue = False + revoke = False + run_now = False + publish_world_now = False + + def __init__(self): + self.prefs = [] def startElement(self, stack, name, attrs): - if name not in type_map: - getattr(self, "handle_" + name)(stack, name, attrs) - elif type(self) is type_map(name): - sax_utils.snarf_attribute(self, attrs, self.attributes) + if name == "extension_preference": + pref = extension_preference_elt() + self.prefs.append(pref) + stack.append(prefs) + pref.startElement(stack, name, attrs) + elif name in self.booleans: + setattr(self, name, True) else: - elt = type_map[name]() - stack.append(elt) - self.store(name, elt) - elt.startElement(stack, name, attrs) + assert name == "self" + self.action = attrs["action"] + self.self_id = attrs.get("self_id") def endElement(self, stack, name, text): - if name in self.b64content: - self.store(name, base64.b64decode(text)) - if name in type_map: + if name not in self.booleans: + assert name == "self" stack.pop() - def boolean_handler(stack, name, attrs): - setattr(self, name, True) +class bsc_elt(base_elt): - handle_publish_world_now = boolean_handler - handle_reissue = boolean_handler - handle_rekey = boolean_handler - handle_revoke = boolean_handler - handle_run_now = boolean_handler - handle_suppress_publication = boolean_handler + generate_keypair = False - def handle_generate_keypair(stack, name, attrs): - self.boolean_handler(stack, name, attrs) - sax_utils.snarf_attribute(self, attrs, ("key_type", "hash_alg", "key_length")) + def __init__(self): + self.signing_cert = [] - def id_handler(stack, name, attrs): - sax_utils.snarf_attribute(self, attrs, "id") + def startElement(self, stack, name, attrs): + if name == "generate_keypair": + self.generate_keypair = True + self.key_type = attrs["key_type"] + self.hash_alg = attrs["hash_alg"] + self.key_length = attrs["key_length"] + elif not name in ("signing_cert", "public_key", "pkcs10_cert_request"): + assert name == "bsc" + self.action = attrs["action"] + self.self_id = attrs["self_id"] + self.bsc_id = attrs.get("bsc_id") - handle_bsc_link = id_handler - handle_child_db_id = id_handler - handle_repository_link = id_handler - handle_sia_base = id_handler + def endElement(self, stack, name, text): + if name == "signing_cert": + self.signing_cert.append(base64.b64decode(text)) + elif name == "public_key": + self.public_key = base64.b64decode(text) + elif name == "pkcs10_cert_request": + self.pkcs10_cert_request = base64.b64decode(text) + else: + assert name == "bsc" + stack.pop() - def handle_peer_contact(stack, name, attrs): - self.peer_contact = attrs.getValue("uri").encode("ascii") +class parent_elt(base_elt): - # Mumble, really should be using resource_set types here, more - # idiocy due to premature optimization + ids = ("bsc_link", "repository_link") + uris = ("peer_contact", "sia_base") + booleans = ("rekey", "reissue", "revoke") - # Special case for dumb reasons, fix later - def handle_as_number(stack, name, attrs): - asn = long(attrs.getValue("asn")) - if isinstance(self, route_origin_elt): - assert not hasattr(self, name) - self.as_number = asn - else: - self.store(name, resource_range_as(asn, asn)) + rekey = False + reissue = False + revoke = False - def handle_subset_as_number(stack, name, attrs): - asn = long(attrs.getValue("asn")) - self.store(name, resource_range_as(asn, asn)) + def startElement(self, stack, name, attrs): + if name in self.uris: + setattr(self, name, attrs["uri"]) + elif name in self.ids: + setattr(self, name, attrs["id"]) + elif name in self.booleans: + setattr(self, name, True) + elif name != "peer_ta": + assert name == "parent" + self.action = attrs["action"] + self.self_id = attrs["self_id"] + self.parent_id = attrs.get("parent_id") - def handle_as_range(stack, name, attrs): - self.store(name, resource_range_as(long(attrs.getValue("min")), - long(attrs.getValue("max")))) + def endElement(self, stack, name, text): + if name == "peer_ta": + self.peer_ta = base64.b64decode(text) + elif name not in self.booleans + self.ids + self.uris: + assert name == "parent" + stack.pop() - handle_subset_as_range = handle_as_range +class child_elt(base_elt): - def handle_ipv4_range(stack, name, attrs): - self.store(name, resource_range_ipv4(ipaddrs.v4addr(attrs.getValue("min")), - ipaddrs.v4addr(attrs.getValue("max")))) + ids = ("bsc_link", "child_db_id") + booleans = ("reissue") - handle_subset_ipv4_range = handle_ipv4_range + rekey = False + reissue = False + revoke = False - def handle_ipv6_range(stack, name, attrs): - self.store(name, resource_range_ipv6(ipaddrs.v6addr(attrs.getValue("min")), - ipaddrs.v6addr(attrs.getValue("max")))) + def startElement(self, stack, name, attrs): + if name in self.ids: + setattr(self, name, attrs["id"]) + elif name in self.booleans: + setattr(self, name, True) + elif name != "peer_ta": + assert name == "child" + self.action = attrs["action"] + self.self_id = attrs["self_id"] + self.child_id = attrs.get("child_id") - handle_subset_ipv6_range = handle_ipv6_range + def endElement(self, stack, name, text): + if name == "peer_ta": + self.peer_ta = base64.b64decode(text) + elif name not in self.booleans + self.ids: + assert name == "child" + stack.pop() - # Haven't written (subset_)?ipv[46]_prefix() handlers yet. Some of the code for - # that might belong in resource_set.py instead of here. +class repository_elt(base_elt): + def startElement(self, stack, name, attrs): + if name == "bsc_link": + self.bsc_link = attrs["id"] + elif name == "peer_contact": + self.peer_contact = attrs["uri"] + elif name != "peer_ta": + assert name == "repository" + self.action = attrs["action"] + self.self_id = attrs["self_id"] + self.repository_id = attrs.get("repository_id") -class self_elt(base_elt): - attributes = ("action", "self_id") + def endElement(self, stack, name, text): + if name == "peer_ta": + self.peer_ta = base64.b64decode(text) + elif name not in ("bsc_link", "peer_contact"): + assert name == "repository" + stack.pop() -class bsc_elt(base_elt): - attributes = ("action", "self_id", "bsc_id") +class route_origin_elt(base_elt): -class parent_elt(base_elt): - attributes = ("action", "self_id", "parent_id") + suppress_publication = False + ipv4 = None + ipv6 = None -class child_elt(base_elt): - attributes = ("action", "self_id", "child_id") + def startElement(self, stack, name, attrs): + if name == "suppress_publication": + self.suppress_publication = True + elif name == "resources": + self.asn = attrs["asn"] + if "ipv4" in attrs: + self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"]) + if "ipv6" in attrs: + self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"]) + else: + assert name == "route_origin" + self.action = attrs["action"] + self.self_id = attrs["self_id"] + self.route_origin_id = attrs.get("route_origin_id") -class repository_elt(base_elt): - attributes = ("action", "self_id", "repository_id") + def endElement(self, stack, name, text): + if name not in ("suppress_publication", "resources"): + assert name == "route_origin" + stack.pop() -class route_origin_elt(base_elt): - attributes = ("action", "self_id", "route_origin_id") +class resource_class_elt(base_elt): + def startElement(self, stack, name, attrs): + assert name == "resource_class" + if "as" in attrs: + self.as = resource_set.resource_set_as(attrs["as"]) + if "req_as" in attrs: + self.req_as = resource_set.resource_set_as(attrs["req_as"]) + if "ipv4" in attrs: + self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"]) + if "req_ipv4" in attrs: + self.req_ipv4 = resource_set.resource_set_ipv4(attrs["req_ipv4"]) + if "ipv6" in attrs: + self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"]) + if "req_ipv6" in attrs: + self.req_ipv6 = resource_set.resource_set_ipv6(attrs["req_ipv6"]) + class list_resources_elt(base_elt): - attributes = ("self_id", "child_id") + + def __init__(self): + self.resources = [] + + def startElement(self, stack, name, attrs): + if name == "resource_class": + rc = resource_class_elt() + self.resources.append(rc) + stack.append(rc) + rc.startElement(stack, name, attrs) + else: + assert name == "list_resources" + self.self_id = attrs["self_id"] + self.child_id = attrs.get("child_id") class report_error_elt(base_elt): - attributes = ("self_id", "error_code") + + def startElement(self, stack, name, attrs): + assert name == "report_error" + self.self_id = attrs["self_id"] + self.error_code = attrs["error_code"] class msg(list): """ @@ -146,12 +243,20 @@ class msg(list): def startElement(self, stack, name, attrs): if name == "msg": - sax_utils.snarf_attribute(self, attrs, "version", int) - sax_utils.snarf_attribute(self, attrs, "type") + self.version = int(attrs["version"]) + self.type = attrs["type"] assert self.version == 1 else: - assert name in type_map - elt = type_map[name](self) + elt = { + "self" : self_elt, + "child" : child_elt, + "parent" : parent_elt, + "repository" : repository_elt, + "route_origin" : route_origin_elt, + "bsc" : bsc_elt, + "list_resources" : list_resources_elt, + "report_error" : report_error_elt + }[name]() self.append(elt) stack.append(elt) elt.startElement(stack, name, attrs) @@ -159,39 +264,20 @@ class msg(list): def endElement(self, stack, name, text): assert name == "msg" assert len(stack) == 1 + stack.pop() def __str__(self): return ('<?xml version="1.0" encoding="US-ASCII" ?>\n' - '<msg xmlns="%s" version="%d" type="%s">\n' \ - % (self.spec_uri, self.version, self.type) - ) + "".join(map(str,self)) + "</msg>\n" + '<msg xmlns="%s" version="%d" type="%s">\n' + '%s</msg>\n' + % (self.spec_uri, self.version, self.type, + "".join(map(str, self)))) class sax_handler(sax_utils.handler): """ SAX handler for Left-Right protocol. """ - def startElement(self, name, attrs): - if name == "msg": - self.stack = [msg()] - self.stack[-1].startElement(self.stack, name, attrs) - - def endElement(self, name): - self.stack[-1].endElement(self.stack, name, self.get_text()) - if name == "msg": - assert len(self.stack) == 1 - self.set_obj(self.stack[0]) - -type_map = { - "msg" : msg_elt, - "self" : self_elt, - "child" : child_elt, - "parent" : parent_elt, - "repository" : repository_elt, - "route_origin" : route_origin_elt, - "bsc" : bsc_elt, - "list_resources" : list_resources_elt, - "report_error" : report_error_elt, - "extension_preference" : extension_preference_elt, - "resource_class" : resource_class_elt -} + def create_top_level(self, name, attrs): + assert name == "msg" and attrs["version"] == "1" + return msg_elt() |