# $Id$
import base64, sax_utils, resource_set
# This is still pretty nasty, feels much too complex for a relatively
# simple task.
class base_elt(object):
"""
Base type for left-right message elements.
"""
def startElement(self, stack, name, attrs):
pass
def endElement(self, stack, name, text):
stack.pop()
def attr_maybe(self, key):
val = getattr(self, key, None)
if val is None:
return ''
else:
return ' %s="%s"' % (key, val)
class extension_preference_elt(base_elt):
"""
Container for extension preferences.
"""
def startElement(self, stack, name, attrs):
assert name == "extension_preference", "Unexpected name %s, stack %s" % (name, stack)
self.name = attrs["name"]
def endElement(self, stack, name, text):
self.value = text.strip()
stack.pop()
def __str__(self):
return (' %s\n'
% (self.name, self.value))
class self_elt(base_elt):
booleans = ("rekey", "reissue", "revoke", "run_now", "publish_world_now")
rekey = False
reissue = False
revoke = False
run_now = False
publish_world_now = False
def __init__(self):
self.prefs = []
def startElement(self, stack, name, attrs):
if name == "extension_preference":
pref = extension_preference_elt()
self.prefs.append(pref)
stack.append(pref)
pref.startElement(stack, name, attrs)
elif name in self.booleans:
setattr(self, name, True)
else:
assert name == "self", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs.get("self_id")
def endElement(self, stack, name, text):
if name not in self.booleans:
assert name == "self", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = ' \n' % (self.action, self.attr_maybe("self_id"))
for i in self.prefs:
xml += str(i)
for i in self.booleans:
if getattr(self, i):
xml += ' <%s/>\n' % i
return xml + ' \n'
class bsc_elt(base_elt):
generate_keypair = False
def __init__(self):
self.signing_cert = []
def startElement(self, stack, name, attrs):
if name == "generate_keypair":
self.generate_keypair = True
self.key_type = attrs["key_type"]
self.hash_alg = attrs["hash_alg"]
self.key_length = attrs["key_length"]
elif not name in ("signing_cert", "public_key", "pkcs10_cert_request"):
assert name == "bsc", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs["self_id"]
self.bsc_id = attrs.get("bsc_id")
def endElement(self, stack, name, text):
if name == "signing_cert":
self.signing_cert.append(base64.b64decode(text))
elif name == "public_key":
self.public_key = base64.b64decode(text)
elif name == "pkcs10_cert_request":
self.pkcs10_cert_request = base64.b64decode(text)
elif name != "generate_keypair":
assert name == "bsc", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = (' \n'
% (self.action, self.self_id, self.attr_maybe("bsc_id")))
for i in self.signing_cert:
xml += ' ' + base64.b64encode(i) + '\n'
i = getattr(self, "pkcs10_cert_request", None)
if i is not None:
xml += ' ' + base64.b64encode(i) + '\n'
i = getattr(self, "public_key", None)
if i is not None:
xml += ' ' + base64.b64encode(i) + '\n'
return xml + ' \n'
class parent_elt(base_elt):
ids = ("bsc_link", "repository_link")
uris = ("peer_contact", "sia_base")
booleans = ("rekey", "reissue", "revoke")
rekey = False
reissue = False
revoke = False
def startElement(self, stack, name, attrs):
if name in self.uris:
setattr(self, name, attrs["uri"])
elif name in self.ids:
setattr(self, name, attrs["id"])
elif name in self.booleans:
setattr(self, name, True)
elif name != "peer_ta":
assert name == "parent", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs["self_id"]
self.parent_id = attrs.get("parent_id")
def endElement(self, stack, name, text):
if name == "peer_ta":
self.peer_ta = base64.b64decode(text)
elif name not in self.booleans + self.ids + self.uris:
assert name == "parent", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = (' \n'
% (self.action, self.self_id, self.attr_maybe("parent_id")))
i = getattr(self, "peer_ta", None)
if i is not None:
xml += ' ' + base64.b64encode(i) + '\n'
i = getattr(self, "peer_contact", None)
if i is not None:
xml += ' \n' % i
i = getattr(self, "sia_base", None)
if i is not None:
xml += ' \n' % i
i = getattr(self, "bsc_link", None)
if i is not None:
xml += ' \n' % i
i = getattr(self, "repository_link", None)
if i is not None:
xml += ' \n' % i
for i in self.booleans:
if getattr(self, i):
xml += ' <%s/>\n' % i
return xml + ' \n'
class child_elt(base_elt):
ids = ("bsc_link", "child_db_id")
booleans = ("reissue", )
reissue = False
def startElement(self, stack, name, attrs):
if name in self.ids:
setattr(self, name, attrs["id"])
elif name in self.booleans:
setattr(self, name, True)
elif name != "peer_ta":
assert name == "child", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs["self_id"]
self.child_id = attrs.get("child_id")
def endElement(self, stack, name, text):
if name == "peer_ta":
self.peer_ta = base64.b64decode(text)
elif name not in self.booleans + self.ids:
assert name == "child", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = (' \n'
% (self.action, self.self_id, self.attr_maybe("child_id")))
i = getattr(self, "peer_ta", None)
if i is not None:
xml += ' ' + base64.b64encode(i) + '\n'
i = getattr(self, "bsc_link", None)
if i is not None:
xml += ' \n' % i
i = getattr(self, "child_db_id", None)
if i is not None:
xml += ' \n' % i
for i in self.booleans:
if getattr(self, i):
xml += ' <%s/>\n' % i
return xml + ' \n'
class repository_elt(base_elt):
def startElement(self, stack, name, attrs):
if name == "bsc_link":
self.bsc_link = attrs["id"]
elif name == "peer_contact":
self.peer_contact = attrs["uri"]
elif name != "peer_ta":
assert name == "repository", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs["self_id"]
self.repository_id = attrs.get("repository_id")
def endElement(self, stack, name, text):
if name == "peer_ta":
self.peer_ta = base64.b64decode(text)
elif name not in ("bsc_link", "peer_contact"):
assert name == "repository", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = (' \n'
% (self.action, self.self_id, self.attr_maybe("repository_id")))
i = getattr(self, "peer_ta", None)
if i is not None:
xml += ' ' + base64.b64encode(i) + '\n'
i = getattr(self, "peer_contact", None)
if i is not None:
xml += ' \n' % i
i = getattr(self, "bsc_link", None)
if i is not None:
xml += ' \n' % i
return xml + ' \n'
class route_origin_elt(base_elt):
suppress_publication = False
ipv4 = None
ipv6 = None
def startElement(self, stack, name, attrs):
if name == "suppress_publication":
self.suppress_publication = True
elif name == "resources":
self.asn = long(attrs["asn"])
if "ipv4" in attrs:
self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"])
if "ipv6" in attrs:
self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"])
else:
assert name == "route_origin", "Unexpected name %s, stack %s" % (name, stack)
self.action = attrs["action"]
self.self_id = attrs["self_id"]
self.route_origin_id = attrs.get("route_origin_id")
def endElement(self, stack, name, text):
if name not in ("suppress_publication", "resources"):
assert name == "route_origin", "Unexpected name %s, stack %s" % (name, stack)
stack.pop()
def __str__(self):
xml = (' \n'
% (self.action, self.self_id, self.attr_maybe("route_origin_id")))
asn = getattr(self, "asn", None)
if asn is not None:
xml += ' \n'
return xml + ' \n'
class resource_class_elt(base_elt):
def startElement(self, stack, name, attrs):
assert name == "resource_class", "Unexpected name %s, stack %s" % (name, stack)
if "as" in attrs:
self.as = resource_set.resource_set_as(attrs["as"])
if "req_as" in attrs:
self.req_as = resource_set.resource_set_as(attrs["req_as"])
if "ipv4" in attrs:
self.ipv4 = resource_set.resource_set_ipv4(attrs["ipv4"])
if "req_ipv4" in attrs:
self.req_ipv4 = resource_set.resource_set_ipv4(attrs["req_ipv4"])
if "ipv6" in attrs:
self.ipv6 = resource_set.resource_set_ipv6(attrs["ipv6"])
if "req_ipv6" in attrs:
self.req_ipv6 = resource_set.resource_set_ipv6(attrs["req_ipv6"])
def __str__(self):
xml = ' \n'
class list_resources_elt(base_elt):
def __init__(self):
self.resources = []
def startElement(self, stack, name, attrs):
if name == "resource_class":
rc = resource_class_elt()
self.resources.append(rc)
stack.append(rc)
rc.startElement(stack, name, attrs)
else:
assert name == "list_resources", "Unexpected name %s, stack %s" % (name, stack)
self.self_id = attrs["self_id"]
self.child_id = attrs.get("child_id")
self.valid_until = attrs.get("valid_until")
def __str__(self):
xml = (' \n'
% (self.self_id, self.attr_maybe("child_id"), self.attr_maybe("valid_until")))
for i in self.resources:
xml += str(i)
return xml + ' \n'
class report_error_elt(base_elt):
def startElement(self, stack, name, attrs):
assert name == "report_error", "Unexpected name %s, stack %s" % (name, stack)
self.self_id = attrs["self_id"]
self.error_code = attrs["error_code"]
def __str__(self):
return ' \n' % (self.self_id, self.error_code)
class msg(list):
"""
Left-right PDU.
"""
spec_uri = "http://www.hactrn.net/uris/rpki/left-right-spec/"
version = 1
def startElement(self, stack, name, attrs):
if name == "msg":
self.version = int(attrs["version"])
self.type = attrs["type"]
assert self.version == 1
else:
elt = {
"self" : self_elt,
"child" : child_elt,
"parent" : parent_elt,
"repository" : repository_elt,
"route_origin" : route_origin_elt,
"bsc" : bsc_elt,
"list_resources" : list_resources_elt,
"report_error" : report_error_elt
}[name]()
self.append(elt)
stack.append(elt)
elt.startElement(stack, name, attrs)
def endElement(self, stack, name, text):
assert name == "msg", "Unexpected name %s, stack %s" % (name, stack)
assert len(stack) == 1
stack.pop()
def __str__(self):
return ('\n'
'\n'
'%s\n'
% (self.spec_uri, self.version, self.type,
"".join(map(str, self))))
class sax_handler(sax_utils.handler):
"""
SAX handler for Left-Right protocol.
"""
def create_top_level(self, name, attrs):
assert name == "msg" and attrs["version"] == "1"
return msg()