# $Id$ import base64, sax_utils, resource_set class base_elt(object): """ Base type for left-right message elements. """ attributes = () booleans = () def startElement(self, stack, name, attrs): self.read_attrs(attrs) def endElement(self, stack, name, text): stack.pop() def read_attrs(self, attrs): for key in self.attributes: setattr(self, key, attrs.get(key, None)) for key in self.booleans: setattr(self, key, attrs.get(key, False)) def print_attrs(self): xml ="" for key in self.attributes: val = getattr(self, key, None) if val is not None: xml += ' %s="%s"' % (key, val) for key in self.booleans: if getattr(self, key, False): xml += ' %s="yes"' % key return xml class extension_preference_elt(base_elt): """ Container for extension preferences. """ def startElement(self, stack, name, attrs): assert name == "extension_preference", "Unexpected name %s, stack %s" % (name, stack) self.name = attrs["name"] def endElement(self, stack, name, text): self.value = text stack.pop() def __str__(self): return (' %s\n' % (self.name, self.value)) class self_elt(base_elt): attributes = ("action", "type", "self_id") booleans = ("rekey", "reissue", "revoke", "run_now", "publish_world_now") def __init__(self): self.prefs = [] def startElement(self, stack, name, attrs): if name == "extension_preference": pref = extension_preference_elt() self.prefs.append(pref) stack.append(pref) pref.startElement(stack, name, attrs) else: assert name == "self", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def endElement(self, stack, name, text): assert name == "self", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): xml = ' \n' % self.print_attrs() for i in self.prefs: xml += str(i) return xml + ' \n' class bsc_elt(base_elt): attributes = ("action", "type", "self_id", "bsc_id", "key_type", "hash_alg", "key_length") booleans = ("generate_keypair",) pkcs10_cert_request = None public_key = None def __init__(self): self.signing_cert = [] def startElement(self, stack, name, attrs): if not name in ("signing_cert", "public_key", "pkcs10_cert_request"): assert name == "bsc", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def endElement(self, stack, name, text): if name == "signing_cert": self.signing_cert.append(base64.b64decode(text)) elif name == "public_key": self.public_key = base64.b64decode(text) elif name == "pkcs10_cert_request": self.pkcs10_cert_request = base64.b64decode(text) else: assert name == "bsc", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): xml = ' \n' % self.print_attrs() for i in self.signing_cert: xml += ' ' + base64.b64encode(i) + '\n' if self.pkcs10_cert_request: xml += ' ' + base64.b64encode(self.pkcs10_cert_request) + '\n' if self.public_key: xml += ' ' + base64.b64encode(self.public_key) + '\n' return xml + ' \n' class parent_elt(base_elt): attributes = ("action", "type", "self_id", "parent_id", "bsc_link", "repository_link", "peer_contact", "sia_base") booleans = ("rekey", "reissue", "revoke") peer_ta = None def startElement(self, stack, name, attrs): if name != "peer_ta": assert name == "parent", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def endElement(self, stack, name, text): if name == "peer_ta": self.peer_ta = base64.b64decode(text) else: assert name == "parent", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): xml = ' \n' % self.print_attrs() if self.peer_ta: xml += ' ' + base64.b64encode(self.peer_ta) + '\n' return xml + ' \n' class child_elt(base_elt): attributes = ("action", "type", "self_id", "child_id", "bsc_link", "child_db_id") booleans = ("reissue", ) peer_ta = None def startElement(self, stack, name, attrs): if name != "peer_ta": assert name == "child", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def endElement(self, stack, name, text): if name == "peer_ta": self.peer_ta = base64.b64decode(text) else: assert name == "child", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): xml = ' \n' % self.print_attrs() i = getattr(self, "peer_ta", None) if self.peer_ta: xml += ' ' + base64.b64encode(self.peer_ta) + '\n' return xml + ' \n' class repository_elt(base_elt): attributes = ("action", "type", "self_id", "repository_id", "bsc_link", "peer_contact") peer_ta = None def startElement(self, stack, name, attrs): if name != "peer_ta": assert name == "repository", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def endElement(self, stack, name, text): if name == "peer_ta": self.peer_ta = base64.b64decode(text) else: assert name == "repository", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): xml = ' \n' % self.print_attrs() if self.peer_ta: xml += ' ' + base64.b64encode(self.peer_ta) + '\n' return xml + ' \n' class route_origin_elt(base_elt): attributes = ("action", "type", "self_id", "route_origin_id", "asn", "ipv4", "ipv6") booleans = ("suppress_publication",) def startElement(self, stack, name, attrs): assert name == "route_origin", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) if self.asn is not None: self.asn = long(self.asn) if self.ipv4 is not None: self.ipv4 = resource_set.resource_set_ipv4(self.ipv4) if self.ipv6 is not None: self.ipv6 = resource_set.resource_set_ipv6(self.ipv4) def endElement(self, stack, name, text): assert name == "route_origin", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): return ' \n' % self.print_attrs() class resource_class_elt(base_elt): attributes = ("as", "req_as", "ipv4", "req_ipv4", "ipv6", "req_ipv6") def startElement(self, stack, name, attrs): assert name == "resource_class", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) if self.as is not None: self.as = resource_set.resource_set_as(self.as) if self.req_as is not None: self.req_as = resource_set.resource_set_as(self.req_as) if self.ipv4 is not None: self.ipv4 = resource_set.resource_set_ipv4(self.ipv4) if self.req_ipv4 is not None: self.req_ipv4 = resource_set.resource_set_ipv4(self.req_ipv4) if self.ipv6 is not None: self.ipv6 = resource_set.resource_set_ipv6(self.ipv6) if self.req_ipv6 is not None: self.req_ipv6 = resource_set.resource_set_ipv6(self.req_ipv6) def endElement(self, stack, name, text): assert name == "resource_class", "Unexpected name %s, stack %s" % (name, stack) stack.pop() def __str__(self): return ' \n' % self.print_attrs() class list_resources_elt(base_elt): attributes = ("type", "self_id", "child_id", "valid_until") def __init__(self): self.resources = [] def startElement(self, stack, name, attrs): if name == "resource_class": rc = resource_class_elt() self.resources.append(rc) stack.append(rc) rc.startElement(stack, name, attrs) else: assert name == "list_resources", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def __str__(self): xml = ' \n' % self.print_attrs() for i in self.resources: xml += str(i) return xml + ' \n' class report_error_elt(base_elt): attributes = ("self_id", "error_code") def startElement(self, stack, name, attrs): assert name == "report_error", "Unexpected name %s, stack %s" % (name, stack) self.read_attrs(attrs) def __str__(self): return ' \n' % self.print_attrs() class msg(list): """ Left-right PDU. """ spec_uri = "http://www.hactrn.net/uris/rpki/left-right-spec/" version = 1 def startElement(self, stack, name, attrs): if name == "msg": self.version = int(attrs["version"]) assert self.version == 1 else: elt = { "self" : self_elt, "child" : child_elt, "parent" : parent_elt, "repository" : repository_elt, "route_origin" : route_origin_elt, "bsc" : bsc_elt, "list_resources" : list_resources_elt, "report_error" : report_error_elt }[name]() self.append(elt) stack.append(elt) elt.startElement(stack, name, attrs) def endElement(self, stack, name, text): assert name == "msg", "Unexpected name %s, stack %s" % (name, stack) assert len(stack) == 1 stack.pop() def __str__(self): return ('\n' '\n' '%s\n' % (self.spec_uri, self.version, "".join(map(str, self)))) class sax_handler(sax_utils.handler): """ SAX handler for Left-Right protocol. """ def create_top_level(self, name, attrs): assert name == "msg" and attrs["version"] == "1" return msg()