#!/usr/bin/env python # $Id$ # # Copyright (C) 2014 Dragon Research Labs ("DRL") # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND DRL DISCLAIMS ALL WARRANTIES WITH # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS. IN NO EVENT SHALL DRL BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM # LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE # OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # PERFORMANCE OF THIS SOFTWARE. """ Test tool for prototype RRDP implementation. Eventually some of this code will likely be refactored into more user-friendly form, but for the moment this just does whatever insane thing I need to do this week for testing. """ import rpki.relaxng import rpki.x509 import lxml.etree import argparse import os class Tags(object): def __init__(self, *tags): for tag in tags: setattr(self, tag, rpki.relaxng.rrdp.xmlns + tag) tags = Tags("notification", "deltas", "delta", "snapshot", "publish", "withdraw") class main(object): def __init__(self): parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("--rcynic-tree", default = "rcynic-data/unauthenticated", help = "directory tree in which to write extracted RPKI objects") parser.add_argument("--serial-filename", help = "file name in which to store RRDP serial number") parser.add_argument("rrdp_file", nargs = "+", help = "RRDP snapshot or deltas file") self.args = parser.parse_args() if not os.path.isdir(self.args.rcynic_tree): os.makedirs(self.args.rcynic_tree) for rrdp_file in self.args.rrdp_file: xml = lxml.etree.ElementTree(file = rrdp_file).getroot() rpki.relaxng.rrdp.assertValid(xml) getattr(self, xml.tag[len(rpki.relaxng.rrdp.xmlns):])(xml) @property def serial_filename(self): return self.args.serial_filename or os.path.join(self.args.rcynic_tree, "serial") def get_serial(self): with open(self.serial_filename, "r") as f: return f.read().strip() def set_serial(self, value): with open(self.serial_filename, "w") as f: f.write("%s\n" % value) def notification(self, xml): print "Notification version %s session %s serial %s" % ( xml.get("version"), xml.get("session_id"), xml.get("serial")) assert xml[0].tag == tags.snapshot print " Snapshot URI %s hash %s" % ( xml[0].get("uri"), xml[0].get("hash")) for i, elt in enumerate(xml.iterchildren(tags.delta)): print " Delta %3d from %6s to %6s URI %s hash %s" % ( i, elt.get("from"), elt.get("to"), elt.get("uri"), elt.get("hash")) def uri_to_filename(self, uri): assert uri.startswith("rsync://") return os.path.join(self.args.rcynic_tree, uri[len("rsync://"):]) def add_obj(self, uri, obj): fn = self.uri_to_filename(uri) dn = os.path.dirname(fn) if not os.path.isdir(dn): os.makedirs(dn) with open(fn, "wb") as f: f.write(obj) def del_obj(self, uri, hash): fn = self.uri_to_filename(uri) with open(fn, "rb") as f: if hash != rpki.x509.sha256(f.read()).encode("hex"): raise RuntimeError("Hash mismatch for URI %s" % uri) os.unlink(fn) dn = os.path.dirname(fn) while True: try: os.rmdir(dn) except OSError: break else: dn = os.path.dirname(dn) def snapshot(self, xml): print "Unpacking snapshot version %s session %s serial %6s" % ( xml.get("version"), xml.get("session_id"), xml.get("serial")) for elt in xml.iterchildren(tags.publish): print " ", elt.get("uri") self.add_obj(elt.get("uri"), elt.text.decode("base64")) self.set_serial(xml.get("serial")) def deltas(self, xml): cur = int(self.get_serial()) old = int(xml.get("from")) new = int(xml.get("to")) print "Unpacking deltas version %s session %s from %s to %s" % ( xml.get("version"), xml.get("session_id"), old, new) if cur != old: raise RuntimeError("Can't apply deltas: current %s old %s new %s" % (cur, old, new)) for i, delta in enumerate(xml.iterchildren(tags.delta)): serial = int(delta.get("serial")) print " Delta %3d serial %d" % (i, serial) if cur != serial - 1: raise RuntimeError("Can't apply delta: current %s delta serial %s" % (cur, serial)) for j, elt in enumerate(delta.iterchildren(tags.withdraw)): uri = elt.get("uri") hash = elt.get("hash") print " %3d withdraw URI %s hash %s" % (j, uri, hash) self.del_obj(uri, hash) for j, elt in enumerate(delta.iterchildren(tags.publish)): uri = elt.get("uri") hash = elt.get("hash", None) print " %3d publish URI %s hash %s" % (j, uri, hash) if hash is not None: self.del_obj(uri, hash) self.add_obj(elt.get("uri"), elt.text.decode("base64")) cur += 1 self.set_serial(cur) if __name__ == "__main__": main()