# $Id$
import base64
import glob
import os
import re
import socket
import xml.sax
def relaxng(xml, rng):
i, o = os.popen4(("xmllint", "--noout", "--relaxng", rng, "-"))
i.write(xml)
i.close()
v = o.read()
o.close()
return v
class rpki_updown_as_set(object):
def __init__(self, s):
self.as_set = []
if s != "":
vec = s.split(",")
for elt in vec:
r = re.match("^([0-9]+)-([0-9]+)$", elt)
if r:
b, e = r.groups()
self.as_set.append((long(b), long(e)))
else:
self.as_set.append((long(elt), ))
self.as_set.sort()
def __str__(self):
vec = []
for elt in self.as_set:
if len(elt) == 1:
vec.append(str(elt[0]))
else:
vec.append(str(elt[0]) + "-" + str(elt[1]))
return ",".join(vec)
class rpki_updown_ip_set(object):
def __init__(self, s):
self.vec = []
if s != "":
vec = s.split(",")
for elt in vec:
r = re.match("^(%s)-(%s)$" % (self.re, self.re), elt)
if r:
b, e = r.groups()
self.vec.append((socket.inet_pton(self.af, b),
socket.inet_pton(self.af, e)))
continue
r = re.match("^(%s)/([0-9]+)$" % (self.re), elt)
if r:
i, p = r.groups()
self.vec.append((socket.inet_pton(self.af, i), int(p)))
continue
self.vec.append((socket.inet_pton(self.af, elt), ))
self.vec.sort()
def __str__(self):
vec = []
for elt in self.vec:
if len(elt) == 1:
vec.append(socket.inet_ntop(self.af, elt[0]))
elif isinstance(elt[1], int):
vec.append(socket.inet_ntop(self.af, elt[0]) + "/" + str(elt[1]))
else:
vec.append(socket.inet_ntop(self.af, elt[0]) + "-" +
socket.inet_ntop(self.af, elt[1]))
return ",".join(vec)
class rpki_updown_ipv4_set(rpki_updown_ip_set):
re = "[0-9.]+"
af = socket.AF_INET
class rpki_updown_ipv6_set(rpki_updown_ip_set):
re = "[0-9:a-fA-F]+"
af = socket.AF_INET6
class rpki_updown_msg(object):
def toXML(self):
return ('\
\n\
\n' \
% (self.sender, self.recipient, self.msg_ref, self.type)
) + self.innerToXML() + "\n"
def innerToXML(self):
return ""
def startElement(self, name, attrs): pass
def endElement(self, name, text): pass
class rpki_updown_cert(object):
def __init__(self, attrs):
self.cert_url = attrs.getValue("cert_url")
self.cert_ski = attrs.getValue("cert_ski")
self.cert_aki = attrs.getValue("cert_aki")
self.cert_serial = attrs.getValue("cert_serial")
self.resource_set_as = rpki_updown_as_set(attrs.getValue("resource_set_as"))
self.resource_set_ipv4 = rpki_updown_ipv4_set(attrs.getValue("resource_set_ipv4"))
self.resource_set_ipv6 = rpki_updown_ipv6_set(attrs.getValue("resource_set_ipv6"))
try:
self.req_resource_set_as = rpki_updown_as_set(attrs.getValue("req_resource_set_as"))
except KeyError:
self.req_resource_set_as = None
try:
self.req_resource_set_ipv4 = rpki_updown_ipv4_set(attrs.getValue("req_resource_set_ipv4"))
except KeyError:
self.req_resource_set_ipv4 = None
try:
self.req_resource_set_ipv6 = rpki_updown_ipv6_set(attrs.getValue("req_resource_set_ipv6"))
except KeyError:
self.req_resource_set_ipv6 = None
self.status = attrs.getValue("status")
def toXML(self):
xml = ('\
' % self.status)
xml += base64.b64encode(self.cert) + "\n"
return xml
class rpki_updown_class(object):
def __init__(self, attrs):
self.class_name = attrs.getValue("class_name")
self.cert_url = attrs.getValue("cert_url")
self.cert_ski = attrs.getValue("cert_ski")
self.resource_set_as = rpki_updown_as_set(attrs.getValue("resource_set_as"))
self.resource_set_ipv4 = rpki_updown_ipv4_set(attrs.getValue("resource_set_ipv4"))
self.resource_set_ipv6 = rpki_updown_ipv6_set(attrs.getValue("resource_set_ipv6"))
try:
self.suggested_sia_head = attrs.getValue("suggested_sia_head")
except KeyError:
self.suggested_sia_head = None
self.certs = []
def toXML(self):
xml = ('\
\n"
for cert in self.certs:
xml += cert.toXML()
xml += " " + base64.b64encode(self.issuer) + "\n \n"
return xml
class rpki_updown_list(rpki_updown_msg):
def __str__(self):
return "RPKI list request"
class rpki_updown_list_response(rpki_updown_msg):
def __init__(self):
self.resource_classes = []
def startElement(self, name, attrs):
if name == "class":
self.resource_classes.append(rpki_updown_class(attrs))
elif name == "certificate":
self.resource_classes[-1].certs.append(rpki_updown_cert(attrs))
def endElement(self, name, text):
if name == "certificate":
self.resource_classes[-1].certs[-1].cert = base64.b64decode(text)
elif name == "issuer":
self.resource_classes[-1].issuer = base64.b64decode(text)
def innerToXML(self):
xml = ""
for c in self.resource_classes:
xml += c.toXML()
return xml
class rpki_updown_issue(rpki_updown_msg):
def startElement(self, name, attrs):
assert name == "request"
self.class_name = attrs.getValue("class_name")
try:
self.req_resource_set_as = rpki_updown_as_set(attrs.getValue("req_resource_set_as"))
except KeyError:
self.req_resource_set_as = None
try:
self.req_resource_set_ipv4 = rpki_updown_ipv4_set(attrs.getValue("req_resource_set_ipv4"))
except KeyError:
self.req_resource_set_ipv4 = None
try:
self.req_resource_set_ipv6 = rpki_updown_ipv6_set(attrs.getValue("req_resource_set_ipv6"))
except KeyError:
self.req_resource_set_ipv6 = None
def endElement(self, name, text):
assert name == "request"
self.pkcs10 = base64.b64decode(text)
def innerToXML(self):
xml = (' " + base64.b64encode(self.pkcs10) + "\n"
class rpki_updown_issue_response(rpki_updown_list_response):
def innerToXML(self):
assert len(self.resource_classes) == 1
return rpki_updown_list_response.innerToXML(self)
class rpki_updown_revoke(rpki_updown_msg):
def __str__(self):
return 'RPKI %s class_name %s ski %s' % (self.type, self.class_name, self.ski)
def startElement(self, name, attrs):
self.class_name = attrs.getValue("class_name")
self.ski = attrs.getValue("ski")
def innerToXML(self):
return (' \n' % (self.class_name, self.ski))
class rpki_updown_revoke_response(rpki_updown_revoke): pass
class rpki_updown_error_response(rpki_updown_msg):
def __str__(self):
return "RPKI error %d" % (self.status)
def innerToXML(self):
return ' %d\n' % self.status
def endElement(self, name, text):
if name == "status":
self.status = int(text)
elif name == "last_message_processed":
self.last_message_processed = text
elif name == "description":
self.description = text
class rpki_updown_sax_handler(xml.sax.handler.ContentHandler):
def __init__(self):
self.text = ""
self.obj = None
def startElementNS(self, name, qname, attrs):
return self.startElement(name[1], attrs)
def endElementNS(self, name, qname):
return self.endElement(name[1])
def startElement(self, name, attrs):
if name == "message":
assert int(attrs.getValue("version")) == 1
type = attrs.getValue("type")
if self.obj == None:
self.obj = {
"list" : rpki_updown_list(),
"list_response" : rpki_updown_list_response(),
"issue" : rpki_updown_issue(),
"issue_response" : rpki_updown_issue_response(),
"revoke" : rpki_updown_revoke(),
"revoke_response" : rpki_updown_revoke_response(),
"error_response" : rpki_updown_error_response()
}[type]
assert self.obj != None
self.obj.type = type
self.obj.sender = attrs.getValue("sender")
self.obj.recipient = attrs.getValue("recipient")
self.obj.msg_ref = int(attrs.getValue("msg_ref"))
else:
assert self.obj != None
self.obj.startElement(name, attrs)
def characters(self, content):
self.text += content
def endElement(self, name):
assert self.obj != None
if name != "message":
self.obj.endElement(name, self.text)
self.text = ""
files = glob.glob("up-down-protocol-samples/*.xml")
files.sort()
for f in files:
# try:
handler = rpki_updown_sax_handler()
# parser = xml.sax.make_parser()
# parser.setContentHandler(handler)
# parser.parse(f)
fh = open(f, "r")
x = fh.read()
fh.close()
xml.sax.parseString(x, handler)
obj = handler.obj
print "-- " + str(obj) + "\n"
x = obj.toXML()
print x
print relaxng(x, "up-down-medium-schema.rng")
# except Exception, err:
# print "? " + str(err) + "\n"