1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# $Id$
import base64, sax_utils, resource_set
class base_elt(object):
"""
Base type for left-right message elements.
"""
content_attr = False
base64_content = False
multivalue = ()
def __init__(self, up=None):
self.up = up
for key in self.multivalue:
setattr(self, key, [])
def store(self, key, val):
if key in self.multivalue:
getattr(self, key).append(val)
else:
setattr(self, key, val)
def startElement(self, name, attrs):
if name = self.name:
sax_utils.snarf_attribute(self, attrs, self.attributes)
def endElement(self, name, text):
if name = self.name and self.content_attr:
if self.base64_content:
self.store(self.content_attr, base64.b64decode(text))
else:
self.store(self.content_attr, text)
class self_elt(base_elt):
name = "self"
attributes = ("action", "self_id")
class child_elt(base_elt):
name = "child"
attributes = ("action", "self_id", "child_id")
class parent_elt(base_elt):
name = "parent"
attributes = ("action", "self_id", "parent_id")
class route_origin_elt(base_elt):
name = "route_origin"
attributes = ("action", "self_id", "route_origin_id")
class repository_elt(base_elt):
name = "repository"
attributes = ("action", "self_id", "repository_id")
class bsc_elt(base_elt):
name = "biz_signing_context"
attributes = ("action", "self_id", "biz_signing_context_id")
class list_resources_elt(base_elt):
def startElement(self, name, attrs):
sax_utils.snarf_attribute(self, attrs, ("self_id", "child_id"))
class report_error_elt(base_elt):
def startElement(self, name, attrs):
sax_utils.snarf_attribute(self, attrs, ("self_id", "error_code"))
class msg(list):
"""
Left-right PDU.
"""
spec_uri = "http://www.hactrn.net/uris/rpki/left-right-spec/"
version = 1
dispatch = {
"self" : self_elt,
"child" : child_elt,
"parent" : parent_elt,
"repository" : repository_elt,
"route_origin" : route_origin_elt,
"biz_signing_context" : bsc_elt,
"list_resources" : list_resources_elt,
"report_error" : report_error_elt }
def startElement(self, name, attrs):
if name == "msg":
sax_utils.snarf_attribute(self, attrs, "version", int)
sax_utils.snarf_attribute(self, attrs, "type")
assert self.version == 1
else:
if name in self.dispatch:
self.append(self.dispatch[name](self))
self[-1].startElement(name, attrs)
def endElement(self, name, text):
self[-1].endElement(name, text)
def __str__(self):
return ('<?xml version="1.0" encoding="US-ASCII" ?>\n'
'<msg xmlns="%s" version="%d" type="%s">\n' \
% (self.spec_uri, self.version, self.type)
) + "".join(map(str,self)) + "</msg>\n"
class sax_handler(sax_utils.handler):
"""
SAX handler for Left-Right protocol.
"""
def startElement(self, name, attrs):
if name == "msg":
self.set_obj(msg())
self.obj.startElement(name, attrs)
def endElement(self, name):
self.obj.endElement(name, self.get_text())
|