#!/usr/bin/env python # # $Id$ # # Copyright (C) 2015-2016 Parsons Government Services ("PARSONS") # Portions copyright (C) 2014 Dragon Research Labs ("DRL") # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND PARSONS AND DRL DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # PARSONS OR DRL BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ Searches authenticated result tree from an rcynic run for ROAs matching specified prefixes. """ import os import argparse import rpki.POW import rpki.oids import rpki.config def check_dir(s): if os.path.isdir(s): return os.path.abspath(s) else: raise argparse.ArgumentTypeError("%r is not a directory" % s) def filename_to_uri(filename): if not filename.startswith(args.rcynic_dir): raise ValueError return "rsync://" + filename[len(args.rcynic_dir):].lstrip("/") def uri_to_filename(uri): if not uri.startswith("rsync://"): raise ValueError return os.path.join(args.rcynic_dir, uri[len("rsync://"):]) class Prefix(object): """ One prefix parsed from the command line. """ def __init__(self, val): addr, length = val.split("/") length, sep, maxlength = length.partition("-") # pylint: disable=W0612 self.prefix = rpki.POW.IPAddress(addr) self.length = int(length) self.maxlength = int(maxlength) if maxlength else self.length if self.maxlength < self.length or self.length < 0 or self.length > self.prefix.bits: raise ValueError if self.prefix & ((1 << (self.prefix.bits - self.length)) - 1) != 0: raise ValueError def matches(self, roa): # pylint: disable=W0621 return any(self.prefix == prefix and self.length == length and (not args.match_maxlength or self.maxlength == maxlength or (maxlength is None and self.length == self.maxlength)) for prefix, length, maxlength in roa.prefixes) class ROA(rpki.POW.ROA): # pylint: disable=W0232 """ Aspects of a ROA that we care about. """ @classmethod def parse(cls, fn): # pylint: disable=W0621 assert fn.startswith(args.rcynic_dir) self = cls.derReadFile(fn) # pylint: disable=E1101 self.fn = fn self.extractWithoutVerifying() v4, v6 = self.getPrefixes() self.prefixes = (v4 or ()) + (v6 or ()) return self @property def uri(self): return filename_to_uri(self.fn) # pylint: disable=E1101 @property def formatted_prefixes(self): for prefix in self.prefixes: # pylint: disable=E1101 if prefix[2] is None or prefix[1] == prefix[2]: yield "%s/%d" % (prefix[0], prefix[1]) else: yield "%s/%d-%d" % (prefix[0], prefix[1], prefix[2]) def __str__(self): # pylint: disable=E1101 prefixes = " ".join(self.formatted_prefixes) plural = "es" if " " in prefixes else "" if args.show_inception: return "signingTime %s ASN %s prefix%s %s" % (self.signingTime(), self.getASID(), plural, prefixes) else: return "ASN %s prefix%s %s" % (self.getASID(), plural, prefixes) def show(self): # pylint: disable=E1101 print "%s %s" % (self, self.fn if args.show_filenames else self.uri) def show_expiration(self): print self x = self.certs()[0] # pylint: disable=E1101 fn = self.fn # pylint: disable=E1101,W0621 uri = self.uri while uri is not None: name = fn if args.show_filenames else uri if args.show_inception: print "notBefore", x.getNotBefore(), "notAfter", x.getNotAfter(), name else: print x.getNotAfter(), name for uri in x.getAIA() or (): if uri.startswith("rsync://"): break else: break fn = uri_to_filename(uri) if not os.path.exists(fn): print "***** MISSING ******", uri break x = rpki.POW.X509.derReadFile(fn) print cfg = rpki.config.argparser(doc = __doc__) cfg.argparser.add_argument("-a", "--all", action = "store_true", help = "show all ROAs, do no prefix matching at all") cfg.argparser.add_argument("-m", "--match-maxlength", action = "store_true", help = "pay attention to maxLength values") cfg.argparser.add_argument("-e", "--show-expiration", action = "store_true", help = "show ROA chain expiration dates") cfg.argparser.add_argument("-f", "--show-filenames", action = "store_true", help = "show filenames instead of URIs") cfg.argparser.add_argument("-i", "--show-inception", action = "store_true", help = "show inception dates") cfg.argparser.add_argument("rcynic_dir", type = check_dir, help = "rcynic authenticated output directory") cfg.argparser.add_argument("prefixes", type = Prefix, nargs = "*", help = "ROA prefix(es) to match") args = cfg.argparser.parse_args() # If there's some way to automate this in the parser, I don't know what it is, so just catch it here. if args.all != (not args.prefixes): parser.error("--all and prefix list are mutually exclusive") for root, dirs, files in os.walk(args.rcynic_dir): for fn in files: if fn.endswith(".roa"): roa = ROA.parse(os.path.join(root, fn)) if args.all or any(prefix.matches(roa) for prefix in args.prefixes): if args.show_expiration: roa.show_expiration() else: roa.show()