#!/usr/bin/env python # # $Id$ # # Copyright (C) 2014 Dragon Research Labs ("DRL") # # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND DRL DISCLAIMS ALL WARRANTIES WITH # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS. IN NO EVENT SHALL DRL BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM # LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE # OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR # PERFORMANCE OF THIS SOFTWARE. """ Searches authenticated result tree from an rcynic run for ROAs matching specified prefixes. """ import os import sys import base64 import argparse import rpki.POW import rpki.oids def check_dir(s): if os.path.isdir(s): return os.path.abspath(s) else: raise argparse.ArgumentTypeError("%r is not a directory" % s) def filename_to_uri(filename): if not filename.startswith(args.rcynic_dir): raise ValueError return "rsync://" + filename[len(args.rcynic_dir):].lstrip("/") def uri_to_filename(uri): if not uri.startswith("rsync://"): raise ValueError return os.path.join(args.rcynic_dir, uri[len("rsync://"):]) class Prefix(object): """ One prefix parsed from the command line. """ def __init__(self, val): addr, length = val.split("/") length, sep, maxlength = length.partition("-") self.prefix = rpki.POW.IPAddress(addr) self.length = int(length) self.maxlength = int(maxlength) if maxlength else self.length if self.maxlength < self.length or self.length < 0 or self.length > self.prefix.bits: raise ValueError if self.prefix & ((1 << (self.prefix.bits - self.length)) - 1) != 0: raise ValueError def matches(self, roa): return any(self.prefix == prefix and self.length == length and (not args.match_maxlength or self.maxlength == maxlength or (maxlength is None and self.length == self.maxlength)) for prefix, length, maxlength in roa.prefixes) class ROA(rpki.POW.ROA): """ Aspects of a ROA that we care about. """ @classmethod def parse(cls, fn): assert fn.startswith(args.rcynic_dir) self = cls.derReadFile(fn) self.fn = fn self.extractWithoutVerifying() v4, v6 = self.getPrefixes() self.prefixes = (v4 or ()) + (v6 or ()) return self @property def uri(self): return filename_to_uri(self.fn) @property def formatted_prefixes(self): for prefix in self.prefixes: if prefix[2] is None or prefix[1] == prefix[2]: yield "%s/%d" % (prefix[0], prefix[1]) else: yield "%s/%d-%d" % (prefix[0], prefix[1], prefix[2]) def __str__(self): prefixes = " ".join(self.formatted_prefixes) plural = "es" if " " in prefixes else "" if args.show_inception: return "signingTime %s ASN %s prefix%s %s" % (self.signingTime(), self.getASID(), plural, prefixes) else: return "ASN %s prefix%s %s" % (self.getASID(), plural, prefixes) def show(self): print "%s %s" % (self, self.fn if args.show_filenames else self.uri) def show_expiration(self): print self x = self.certs()[0] fn = self.fn uri = self.uri while uri is not None: name = fn if args.show_filenames else uri if args.show_inception: print "notBefore", x.getNotBefore(), "notAfter", x.getNotAfter(), name else: print x.getNotAfter(), name for uri in x.getAIA() or (): if uri.startswith("rsync://"): break else: break fn = uri_to_filename(uri) if not os.path.exists(fn): print "***** MISSING ******", uri break x = rpki.POW.X509.derReadFile(fn) print parser = argparse.ArgumentParser(description = __doc__) parser.add_argument("-a", "--all", action = "store_true", help = "show all ROAs, do no prefix matching at all") parser.add_argument("-m", "--match-maxlength", action = "store_true", help = "pay attention to maxLength values") parser.add_argument("-e", "--show-expiration", action = "store_true", help = "show ROA chain expiration dates") parser.add_argument("-f", "--show-filenames", action = "store_true", help = "show filenames instead of URIs") parser.add_argument("-i", "--show-inception", action = "store_true", help = "show inception dates") parser.add_argument("rcynic_dir", type = check_dir, help = "rcynic authenticated output directory") parser.add_argument("prefixes", type = Prefix, nargs = "*", help = "ROA prefix(es) to match") args = parser.parse_args() # If there's some way to automate this in the parser, I don't know what it is, so just catch it here. if args.all != (not args.prefixes): parser.error("--all and prefix list are mutually exclusive") for root, dirs, files in os.walk(args.rcynic_dir): for fn in files: if fn.endswith(".roa"): roa = ROA.parse(os.path.join(root, fn)) if args.all or any(prefix.matches(roa) for prefix in args.prefixes): if args.show_expiration: roa.show_expiration() else: roa.show()