# $Id$
import base64
import xml.sax
import glob
class rpki_updown_msg(object):
def toXML(self):
return ('\
\n\
\n' \
% (self.sender, self.recipient, self.msg_ref, self.type)
) + self.innerToXML() + '\n'
def innerToXML(self):
return ""
def startElement(self, name, attrs): pass
def endElement(self, name, text): pass
class rpki_updown_cert(object):
def __init__(self, attrs):
self.cert_url = attrs.getValue('cert_url')
self.cert_ski = attrs.getValue('cert_ski')
self.cert_aki = attrs.getValue('cert_aki')
self.cert_serial = attrs.getValue('cert_serial')
self.resource_set_as = attrs.getValue('resource_set_as')
self.resource_set_ipv4 = attrs.getValue('resource_set_ipv4')
self.resource_set_ipv6 = attrs.getValue('resource_set_ipv6')
try:
self.req_resource_set_as = attrs.getValue('req_resource_set_as')
except KeyError:
self.req_resource_set_as = None
try:
self.req_resource_set_ipv4 = attrs.getValue('req_resource_set_ipv4')
except KeyError:
self.req_resource_set_ipv4 = None
try:
self.req_resource_set_ipv6 = attrs.getValue('req_resource_set_ipv6')
except KeyError:
self.req_resource_set_ipv6 = None
self.status = attrs.getValue('status')
def toXML(self):
xml = ('\
\n' % self.status)
return xml + base64.b64encode(self.cert) + '\n'
class rpki_updown_class(object):
def __init__(self, attrs):
self.class_name = attrs.getValue('class_name')
self.cert_url = attrs.getValue('cert_url')
self.cert_ski = attrs.getValue('cert_ski')
self.resource_set_as = attrs.getValue('resource_set_as')
self.resource_set_ipv4 = attrs.getValue('resource_set_ipv4')
self.resource_set_ipv6 = attrs.getValue('resource_set_ipv6')
try:
self.suggested_sia_head = attrs.getValue('suggested_sia_head')
except KeyError:
self.suggested_sia_head = None
self.certs = []
def toXML(self):
xml = ('\
\n'
for cert in self.certs:
xml += cert.toXML()
return xml + '' + base64.b64encode(self.issuer) + '\n\n'
class rpki_updown_list(rpki_updown_msg):
def __str__(self):
return 'RPKI list request'
class rpki_updown_list_response(rpki_updown_msg):
def __init__(self):
self.resource_classes = []
def startElement(self, name, attrs):
if name == 'class':
self.resource_classes.append(rpki_updown_class(attrs))
elif name == 'certificate':
self.resource_classes[-1].certs.append(rpki_updown_cert(attrs))
def endElement(self, name, text):
if name == 'certificate':
self.resource_classes[-1].certs[-1].cert = base64.b64decode(text)
elif name == 'issuer':
self.resource_classes[-1].issuer = base64.b64decode(text)
def innerToXML(self):
xml = ''
for c in self.resource_classes:
xml += c.toXML()
return xml
class rpki_updown_issue(rpki_updown_msg):
def startElement(self, name, attrs):
assert name == 'request'
self.class_name = attrs.getValue('class_name')
try:
self.req_resource_set_as = attrs.getValue('req_resource_set_as')
except KeyError:
self.req_resource_set_as = None
try:
self.req_resource_set_ipv4 = attrs.getValue('req_resource_set_ipv4')
except KeyError:
self.req_resource_set_ipv4 = None
try:
self.req_resource_set_ipv6 = attrs.getValue('req_resource_set_ipv6')
except KeyError:
self.req_resource_set_ipv6 = None
def endElement(self, name, text):
assert name == 'request'
self.pkcs10 = base64.b64decode(text)
def innerToXML(self):
xml = (' \n'
class rpki_updown_issue_response(rpki_updown_list_response):
def innerToXML(self):
assert len(self.resource_classes) < 2
rpki_updown_list_response.innerToXML(self)
class rpki_updown_revoke(rpki_updown_msg):
def __str__(self):
return 'RPKI %s class_name %s ski %s' % (self.type, self.class_name, self.ski)
def startElement(self, name, attrs):
self.class_name = attrs.getValue('class_name')
self.ski = attrs.getValue('ski')
def innerToXML(self):
return (' \n' % (self.class_name, self.ski))
class rpki_updown_revoke_response(rpki_updown_revoke): pass
class rpki_updown_error_response(rpki_updown_msg):
def __str__(self):
return 'RPKI error %d' % (self.status)
def innerToXML(self):
return '%d\n' % self.status
def endElement(self, name, text):
if name == 'status':
self.status = int(text)
elif name == 'last_message_processed':
self.last_message_processed = text
elif name == 'description':
self.description = text
class rpki_updown_sax_handler(xml.sax.handler.ContentHandler):
def __init__(self):
self.text = ''
self.obj = None
def startElementNS(self, name, qname, attrs):
# print "startElementNS()"
return self.startElement(name[1], attrs)
def endElementNS(self, name, qname):
# print "endElementNS()"
return self.endElement(name[1])
def startElement(self, name, attrs):
# print "startElement(" + name + ")"
if name == 'message':
assert int(attrs.getValue('version')) == 1
type = attrs.getValue('type')
if self.obj == None:
self.obj = {
'list' : rpki_updown_list(),
'list_response' : rpki_updown_list_response(),
'issue' : rpki_updown_issue(),
'issue_response' : rpki_updown_issue_response(),
'revoke' : rpki_updown_revoke(),
'revoke_response' : rpki_updown_revoke_response(),
'error_response' : rpki_updown_error_response()
}[type]
assert self.obj != None
self.obj.type = type
self.obj.sender = attrs.getValue('sender')
self.obj.recipient = attrs.getValue('recipient')
self.obj.msg_ref = int(attrs.getValue('msg_ref'))
else:
assert self.obj != None
self.obj.startElement(name, attrs)
def characters(self, content):
self.text += content
def endElement(self, name):
assert self.obj != None
if name != 'message':
self.obj.endElement(name, self.text)
self.text = ''
files = glob.glob("up-down-protocol-samples/*.xml")
files.sort()
for f in files:
try:
parser = xml.sax.make_parser()
handler = rpki_updown_sax_handler()
parser.setContentHandler(handler)
parser.parse(f)
obj = handler.obj
print "-- " + str(obj) + "\n"
print obj.toXML()
except Exception, err:
print "? " + str(err) + "\n"