# $Id$ import base64, sax_utils, resource_set class base_elt(object): """ Base type for left-right message elements. """ content_attr = False base64_content = False multivalue = () def __init__(self, up=None): self.up = up for key in self.multivalue: setattr(self, key, []) def store(self, key, val): if key in self.multivalue: getattr(self, key).append(val) else: setattr(self, key, val) def startElement(self, name, attrs): if name = self.name: sax_utils.snarf_attribute(self, attrs, self.attributes) def endElement(self, name, text): if name = self.name and self.content_attr: if self.base64_content: self.store(self.content_attr, base64.b64decode(text)) else: self.store(self.content_attr, text) class self_elt(base_elt): name = "self" attributes = ("action", "self_id") class child_elt(base_elt): name = "child" attributes = ("action", "self_id", "child_id") class parent_elt(base_elt): name = "parent" attributes = ("action", "self_id", "parent_id") class route_origin_elt(base_elt): name = "route_origin" attributes = ("action", "self_id", "route_origin_id") class repository_elt(base_elt): name = "repository" attributes = ("action", "self_id", "repository_id") class bsc_elt(base_elt): name = "biz_signing_context" attributes = ("action", "self_id", "biz_signing_context_id") class list_resources_elt(base_elt): def startElement(self, name, attrs): sax_utils.snarf_attribute(self, attrs, ("self_id", "child_id")) class report_error_elt(base_elt): def startElement(self, name, attrs): sax_utils.snarf_attribute(self, attrs, ("self_id", "error_code")) class msg(list): """ Left-right PDU. """ spec_uri = "http://www.hactrn.net/uris/rpki/left-right-spec/" version = 1 dispatch = { "self" : self_elt, "child" : child_elt, "parent" : parent_elt, "repository" : repository_elt, "route_origin" : route_origin_elt, "biz_signing_context" : bsc_elt, "list_resources" : list_resources_elt, "report_error" : report_error_elt } def startElement(self, name, attrs): if name == "msg": sax_utils.snarf_attribute(self, attrs, "version", int) sax_utils.snarf_attribute(self, attrs, "type") assert self.version == 1 else: if name in self.dispatch: self.append(self.dispatch[name](self)) self[-1].startElement(name, attrs) def endElement(self, name, text): self[-1].endElement(name, text) def __str__(self): return ('\n' '\n' \ % (self.spec_uri, self.version, self.type) ) + "".join(map(str,self)) + "\n" class sax_handler(sax_utils.handler): """ SAX handler for Left-Right protocol. """ def startElement(self, name, attrs): if name == "msg": self.set_obj(msg()) self.obj.startElement(name, attrs) def endElement(self, name): self.obj.endElement(name, self.get_text())