1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#!/usr/bin/env python
# $Id$
#
# Copyright (C) 2013 Dragon Research Labs ("DRL")
#
# Permission to use, copy, modify, and distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND DRL DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL DRL BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
"""
Test tool for prototype RRDP implementation. Eventually some of this
code will likely be refactored into more user-friendly form, but for
the moment this just does whatever insane thing I need to do this week
for testing.
"""
import rpki.relaxng
import rpki.x509
import lxml.etree
import argparse
import os
class Tags(object):
def __init__(self, *tags):
for tag in tags:
setattr(self, tag, rpki.relaxng.rrdp.xmlns + tag)
tags = Tags("notification", "deltas", "delta", "snapshot", "publish", "withdraw")
class main(object):
def __init__(self):
parser = argparse.ArgumentParser(description = __doc__)
parser.add_argument("--rcynic-tree", default = "rcynic-data/unauthenticated",
help = "directory tree in which to write extracted RPKI objects")
parser.add_argument("--serial-filename",
help = "file name in which to store RRDP serial number")
parser.add_argument("rrdp_file", nargs = "+",
help = "RRDP snapshot or deltas file")
self.args = parser.parse_args()
if not os.path.isdir(self.args.rcynic_tree):
os.makedirs(self.args.rcynic_tree)
for rrdp_file in self.args.rrdp_file:
xml = lxml.etree.ElementTree(file = rrdp_file).getroot()
rpki.relaxng.rrdp.assertValid(xml)
getattr(self, xml.tag[len(rpki.relaxng.rrdp.xmlns):])(xml)
@property
def serial_filename(self):
return self.args.serial_filename or os.path.join(self.args.rcynic_tree, "serial")
def get_serial(self):
with open(self.serial_filename, "r") as f:
return f.read().strip()
def set_serial(self, value):
with open(self.serial_filename, "w") as f:
f.write("%s\n" % value)
def notification(self, xml):
print "Notification version %s session %s serial %s" % (
xml.get("version"), xml.get("session_id"), xml.get("serial"))
assert xml[0].tag == tags.snapshot
print " Snapshot URI %s hash %s" % (
xml[0].get("uri"), xml[0].get("hash"))
for i, elt in enumerate(xml.iterchildren(tags.delta)):
print " Delta %3d from %6s to %6s URI %s hash %s" % (
i, elt.get("from"), elt.get("to"), elt.get("uri"), elt.get("hash"))
def uri_to_filename(self, uri):
assert uri.startswith("rsync://")
return os.path.join(self.args.rcynic_tree, uri[len("rsync://"):])
def add_obj(self, uri, obj):
fn = self.uri_to_filename(uri)
dn = os.path.dirname(fn)
if not os.path.isdir(dn):
os.makedirs(dn)
with open(fn, "wb") as f:
f.write(obj)
def del_obj(self, uri, hash):
fn = self.uri_to_filename(uri)
with open(fn, "rb") as f:
if hash != rpki.x509.sha256(f.read()).encode("hex"):
raise RuntimeError("Hash mismatch for URI %s" % uri)
os.unlink(fn)
dn = os.path.dirname(fn)
while True:
try:
os.rmdir(dn)
except OSError:
break
else:
dn = os.path.dirname(dn)
def snapshot(self, xml):
print "Unpacking snapshot version %s session %s serial %6s" % (
xml.get("version"), xml.get("session_id"), xml.get("serial"))
for elt in xml.iterchildren(tags.publish):
print " ", elt.get("uri")
self.add_obj(elt.get("uri"), elt.text.decode("base64"))
self.set_serial(xml.get("serial"))
def deltas(self, xml):
cur = int(self.get_serial())
old = int(xml.get("from"))
new = int(xml.get("to"))
print "Unpacking deltas version %s session %s from %s to %s" % (
xml.get("version"), xml.get("session_id"), old, new)
if cur != old:
raise RuntimeError("Can't apply deltas: current %s old %s new %s" % (cur, old, new))
for i, delta in enumerate(xml.iterchildren(tags.delta)):
serial = int(delta.get("serial"))
print " Delta %3d serial %d" % (i, serial)
if cur != serial - 1:
raise RuntimeError("Can't apply delta: current %s delta serial %s" % (cur, serial))
for j, elt in enumerate(delta.iterchildren(tags.withdraw)):
uri = elt.get("uri")
hash = elt.get("hash")
print " %3d withdraw URI %s hash %s" % (j, uri, hash)
self.del_obj(uri, hash)
for j, elt in enumerate(delta.iterchildren(tags.publish)):
uri = elt.get("uri")
hash = elt.get("hash", None)
print " %3d publish URI %s hash %s" % (j, uri, hash)
if hash is not None:
self.del_obj(uri, hash)
self.add_obj(elt.get("uri"), elt.text.decode("base64"))
cur += 1
self.set_serial(cur)
if __name__ == "__main__":
main()
|