aboutsummaryrefslogtreecommitdiff
path: root/scripts/rpki/left_right.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2007-07-17 06:26:17 +0000
committerRob Austein <sra@hactrn.net>2007-07-17 06:26:17 +0000
commit397416f3c847b3fb8d47dfd3330ffb25f72049ae (patch)
treebf5c93111be06647935f5470dbe490c9a320d300 /scripts/rpki/left_right.py
parent8c4b91087d4dfe5d9f702d20fd7c666620570779 (diff)
Checkpoint
svn path=/scripts/rpki/left_right.py; revision=763
Diffstat (limited to 'scripts/rpki/left_right.py')
-rw-r--r--scripts/rpki/left_right.py324
1 files changed, 205 insertions, 119 deletions
diff --git a/scripts/rpki/left_right.py b/scripts/rpki/left_right.py
index b3a72d7a..9b926ddd 100644
--- a/scripts/rpki/left_right.py
+++ b/scripts/rpki/left_right.py
@@ -10,131 +10,228 @@ class base_elt(object):
Base type for left-right message elements.
"""
- # This isn't quite right, as_number is only multivalue in some elements.
- # Live with it for the moment, fix after code stablizes.
+ def startElement(self, stack, name, attrs):
+ pass
- multivalue = ("peer_contact", "signing_cert", "extension_preference", "resource_class",
- "as_number", "as_range", "subset_as_number", "subset_as_range",
- "ipv4_prefix", "ipv4_range", "subset_ipv4_prefix", "subset_ipv4_range",
- "ipv6_prefix", "ipv6_range", "subset_ipv6_prefix", "subset_ipv6_range")
+ def endElement(self, stack, name, text):
+ stack.pop()
- b64content = ("peer_ta", "pkcs10_cert_request", "public_key", "signing_cert")
+class extension_preference_elt(base_elt):
+ """
+ Container for extension preferences.
+ """
- pdu_objects = ("self", "child", "parent", "bsc", "repository", "route_origin",
- "list_resources", "report_error")
+ def startElement(self, stack, name, attrs):
+ assert name == "extension_preference"
+ self.name = attrs["name"]
- def store(self, key, val):
- if key not in self.multivalue:
- assert not hasattr(self, key)
- setattr(self, key, val)
- elif hasattr(self, key):
- getattr(self, key).append(val)
- else:
- setattr(self, key, [val])
+ def endElement(self, stack, name, text):
+ self.value = text
+ stack.pop()
+
+class self_elt(base_elt):
+
+ booleans = ("rekey", "reissue", "revoke", "run_now", "publish_world_now")
+
+ rekey = False
+ reissue = False
+ revoke = False
+ run_now = False
+ publish_world_now = False
+
+ def __init__(self):
+ self.prefs = []
def startElement(self, stack, name, attrs):
- if name not in type_map:
- getattr(self, "handle_" + name)(stack, name, attrs)
- elif type(self) is type_map(name):
- sax_utils.snarf_attribute(self, attrs, self.attributes)
+ if name == "extension_preference":
+ pref = extension_preference_elt()
+ self.prefs.append(pref)
+ stack.append(prefs)
+ pref.startElement(stack, name, attrs)
+ elif name in self.booleans:
+ setattr(self, name, True)
else:
- elt = type_map[name]()
- stack.append(elt)
- self.store(name, elt)
- elt.startElement(stack, name, attrs)
+ assert name == "self"
+ self.action = attrs["action"]
+ self.self_id = attrs.get("self_id")
def endElement(self, stack, name, text):
- if name in self.b64content:
- self.store(name, base64.b64decode(text))
- if name in type_map:
+ if name not in self.booleans:
+ assert name == "self"
stack.pop()
- def boolean_handler(stack, name, attrs):
- setattr(self, name, True)
+class bsc_elt(base_elt):
- handle_publish_world_now = boolean_handler
- handle_reissue = boolean_handler
- handle_rekey = boolean_handler
- handle_revoke = boolean_handler
- handle_run_now = boolean_handler
- handle_suppress_publication = boolean_handler
+ generate_keypair = False
- def handle_generate_keypair(stack, name, attrs):
- self.boolean_handler(stack, name, attrs)
- sax_utils.snarf_attribute(self, attrs, ("key_type", "hash_alg", "key_length"))
+ def __init__(self):
+ self.signing_cert = []
- def id_handler(stack, name, attrs):
- sax_utils.snarf_attribute(self, attrs, "id")
+ def startElement(self, stack, name, attrs):
+ if name == "generate_keypair":
+ self.generate_keypair = True
+ self.key_type = attrs["key_type"]
+ self.hash_alg = attrs["hash_alg"]
+ self.key_length = attrs["key_length"]
+ elif not name in ("signing_cert", "public_key", "pkcs10_cert_request"):
+ assert name == "bsc"
+ self.action = attrs["action"]
+ self.self_id = attrs["self_id"]
+ self.bsc_id = attrs.get("bsc_id")
- handle_bsc_link = id_handler
- handle_child_db_id = id_handler
- handle_repository_link = id_handler
- handle_sia_base = id_handler
+ def endElement(self, stack, name, text):
+ if name == "signing_cert":
+ self.signing_cert.append(base64.b64decode(text))
+ elif name == "public_key":
+ self.public_key = base64.b64decode(text)
+ elif name == "pkcs10_cert_request":
+ self.pkcs10_cert_request = base64.b64decode(text)
+ else:
+ assert name == "bsc"
+ stack.pop()
- def handle_peer_contact(stack, name, attrs):
- self.peer_contact = attrs.getValue("uri").encode("ascii")
+class parent_elt(base_elt):
- # Mumble, really should be using resource_set types here, more
- # idiocy due to premature optimization
+ ids = ("bsc_link", "repository_link")
+ uris = ("peer_contact", "sia_base")
+ booleans = ("rekey", "reissue", "revoke")
- # Special case for dumb reasons, fix later
- def handle_as_number(stack, name, attrs):
- asn = long(attrs.getValue("asn"))
- if isinstance(self, route_origin_elt):
- assert not hasattr(self, name)
- self.as_number = asn
- else:
- self.store(name, resource_range_as(asn, asn))
+ rekey = False
+ reissue = False
+ revoke = False
- def handle_subset_as_number(stack, name, attrs):
- asn = long(attrs.getValue("asn"))
- self.store(name, resource_range_as(asn, asn))
+ def startElement(self, stack, name, attrs):
+ if name in self.uris:
+ setattr(self, name, attrs["uri"])
+ elif name in self.ids:
+ setattr(self, name, attrs["id"])
+ elif name in self.booleans:
+ setattr(self, name, True)
+ elif name != "peer_ta":
+ assert name == "parent"
+ self.action = attrs["action"]
+ self.self_id = attrs["self_id"]
+ self.parent_id = attrs.get("parent_id")
- def handle_as_range(stack, name, attrs):
- self.store(name, resource_range_as(long(attrs.getValue("min")),
- long(attrs.getValue("max"))))
+ def endElement(self, stack, name, text):
+ if name == "peer_ta":
+ self.peer_ta = base64.b64decode(text)
+ elif name not in self.booleans + self.ids + self.uris:
+ assert name == "parent"
+ stack.pop()
- handle_subset_as_range = handle_as_range
+class child_elt(base_elt):
- def handle_ipv4_range(stack, name, attrs):
- self.store(name, resource_range_ipv4(ipaddrs.v4addr(attrs.getValue("min")),
- ipaddrs.v4addr(attrs.getValue("max"))))
+ ids = ("bsc_link", "child_db_id")
+ booleans = ("reissue")
- handle_subset_ipv4_range = handle_ipv4_range
+ rekey = False
+ reissue = False
+ revoke = False
- def handle_ipv6_range(stack, name, attrs):
- self.store(name, resource_range_ipv6(ipaddrs.v6addr(attrs.getValue("min")),
- ipaddrs.v6addr(attrs.getValue("max"))))
+ def startElement(self, stack, name, attrs):
+ if name in self.ids:
+ setattr(self, name, attrs["id"])
+ elif name in self.booleans:
+ setattr(self, name, True)
+ elif name != "peer_ta":
+ assert name == "child"
+ self.action = attrs["action"]
+ self.self_id = attrs["self_id"]
+ self.child_id = attrs.get("child_id")
- handle_subset_ipv6_range = handle_ipv6_range
+ def endElement(self, stack, name, text):
+ if name == "peer_ta":
+ self.peer_ta = base64.b64decode(text)
+ elif name not in self.booleans + self.ids:
+ assert name == "child"
+ stack.pop()
- # Haven't written (subset_)?ipv[46]_prefix() handlers yet. Some of the code for
- # that might belong in resource_set.py instead of here.
+class repository_elt(base_elt):
+ def startElement(self, stack, name, attrs):
+ if name == "bsc_link":
+ self.bsc_link = attrs["id"]
+ elif name == "peer_contact":
+ self.peer_contact = attrs["uri"]
+ elif name != "peer_ta":
+ assert name == "repository"
+ self.action = attrs["action"]
+ self.self_id = attrs["self_id"]
+ self.repository_id = attrs.get("repository_id")
-class self_elt(base_elt):
- attributes = ("action", "self_id")
+ def endElement(self, stack, name, text):
+ if name == "peer_ta":
+ self.peer_ta = base64.b64decode(text)
+ elif name not in ("bsc_link", "peer_contact"):
+ assert name == "repository"
+ stack.pop()
-class bsc_elt(base_elt):
- attributes = ("action", "self_id", "bsc_id")
+class route_origin_elt(base_elt):
-class parent_elt(base_elt):
- attributes = ("action", "self_id", "parent_id")
+ suppress_publication = False
+ ipv4 = None
+ ipv6 = None
-class child_elt(base_elt):
- attributes = ("action", "self_id", "child_id")
+ def startElement(self, stack, name, attrs):
+ if name == "suppress_publication":
+ self.suppress_publication = True
+ elif name == "resources":
+ self.asn = attrs["asn"]
+ if "ipv4" in attrs:
+ self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"])
+ if "ipv6" in attrs:
+ self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"])
+ else:
+ assert name == "route_origin"
+ self.action = attrs["action"]
+ self.self_id = attrs["self_id"]
+ self.route_origin_id = attrs.get("route_origin_id")
-class repository_elt(base_elt):
- attributes = ("action", "self_id", "repository_id")
+ def endElement(self, stack, name, text):
+ if name not in ("suppress_publication", "resources"):
+ assert name == "route_origin"
+ stack.pop()
-class route_origin_elt(base_elt):
- attributes = ("action", "self_id", "route_origin_id")
+class resource_class_elt(base_elt):
+ def startElement(self, stack, name, attrs):
+ assert name == "resource_class"
+ if "as" in attrs:
+ self.as = resource_set.resource_set_as(attrs["as"])
+ if "req_as" in attrs:
+ self.req_as = resource_set.resource_set_as(attrs["req_as"])
+ if "ipv4" in attrs:
+ self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"])
+ if "req_ipv4" in attrs:
+ self.req_ipv4 = resource_set.resource_set_ipv4(attrs["req_ipv4"])
+ if "ipv6" in attrs:
+ self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"])
+ if "req_ipv6" in attrs:
+ self.req_ipv6 = resource_set.resource_set_ipv6(attrs["req_ipv6"])
+
class list_resources_elt(base_elt):
- attributes = ("self_id", "child_id")
+
+ def __init__(self):
+ self.resources = []
+
+ def startElement(self, stack, name, attrs):
+ if name == "resource_class":
+ rc = resource_class_elt()
+ self.resources.append(rc)
+ stack.append(rc)
+ rc.startElement(stack, name, attrs)
+ else:
+ assert name == "list_resources"
+ self.self_id = attrs["self_id"]
+ self.child_id = attrs.get("child_id")
class report_error_elt(base_elt):
- attributes = ("self_id", "error_code")
+
+ def startElement(self, stack, name, attrs):
+ assert name == "report_error"
+ self.self_id = attrs["self_id"]
+ self.error_code = attrs["error_code"]
class msg(list):
"""
@@ -146,12 +243,20 @@ class msg(list):
def startElement(self, stack, name, attrs):
if name == "msg":
- sax_utils.snarf_attribute(self, attrs, "version", int)
- sax_utils.snarf_attribute(self, attrs, "type")
+ self.version = int(attrs["version"])
+ self.type = attrs["type"]
assert self.version == 1
else:
- assert name in type_map
- elt = type_map[name](self)
+ elt = {
+ "self" : self_elt,
+ "child" : child_elt,
+ "parent" : parent_elt,
+ "repository" : repository_elt,
+ "route_origin" : route_origin_elt,
+ "bsc" : bsc_elt,
+ "list_resources" : list_resources_elt,
+ "report_error" : report_error_elt
+ }[name]()
self.append(elt)
stack.append(elt)
elt.startElement(stack, name, attrs)
@@ -159,39 +264,20 @@ class msg(list):
def endElement(self, stack, name, text):
assert name == "msg"
assert len(stack) == 1
+ stack.pop()
def __str__(self):
return ('<?xml version="1.0" encoding="US-ASCII" ?>\n'
- '<msg xmlns="%s" version="%d" type="%s">\n' \
- % (self.spec_uri, self.version, self.type)
- ) + "".join(map(str,self)) + "</msg>\n"
+ '<msg xmlns="%s" version="%d" type="%s">\n'
+ '%s</msg>\n'
+ % (self.spec_uri, self.version, self.type,
+ "".join(map(str, self))))
class sax_handler(sax_utils.handler):
"""
SAX handler for Left-Right protocol.
"""
- def startElement(self, name, attrs):
- if name == "msg":
- self.stack = [msg()]
- self.stack[-1].startElement(self.stack, name, attrs)
-
- def endElement(self, name):
- self.stack[-1].endElement(self.stack, name, self.get_text())
- if name == "msg":
- assert len(self.stack) == 1
- self.set_obj(self.stack[0])
-
-type_map = {
- "msg" : msg_elt,
- "self" : self_elt,
- "child" : child_elt,
- "parent" : parent_elt,
- "repository" : repository_elt,
- "route_origin" : route_origin_elt,
- "bsc" : bsc_elt,
- "list_resources" : list_resources_elt,
- "report_error" : report_error_elt,
- "extension_preference" : extension_preference_elt,
- "resource_class" : resource_class_elt
-}
+ def create_top_level(self, name, attrs):
+ assert name == "msg" and attrs["version"] == "1"
+ return msg_elt()