# $Id$ # # Copyright (C) 2015-2016 Parsons Government Services ("PARSONS") # Portions copyright (C) 2014 Dragon Research Labs ("DRL") # Portions copyright (C) 2009-2013 Internet Systems Consortium ("ISC") # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notices and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND PARSONS, DRL, AND ISC DISCLAIM # ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # PARSONS, DRL, OR ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ Database generator for RPKI-RTR server (RFC 6810 et sequalia). """ import os import sys import glob import socket import base64 import random import logging import subprocess import rpki.POW import rpki.oids import rpki.rtr.pdus import rpki.rtr.channels import rpki.rtr.server from rpki.rtr.channels import Timestamp from rpki.rcynicdb.iterator import authenticated_objects class PrefixPDU(rpki.rtr.pdus.PrefixPDU): """ Object representing one prefix. This corresponds closely to one PDU in the rpki-router protocol, so closely that we use lexical ordering of the wire format of the PDU as the ordering for this class. This is a virtual class, but the .from_text() constructor instantiates the correct concrete subclass (IPv4PrefixPDU or IPv6PrefixPDU) depending on the syntax of its input text. """ @staticmethod def from_text(version, asn, addr): """ Construct a prefix from its text form. """ cls = IPv6PrefixPDU if ":" in addr else IPv4PrefixPDU self = cls(version = version) self.asn = long(asn) p, l = addr.split("/") self.prefix = rpki.POW.IPAddress(p) if "-" in l: self.prefixlen, self.max_prefixlen = tuple(int(i) for i in l.split("-")) else: self.prefixlen = self.max_prefixlen = int(l) self.announce = 1 self.check() return self @staticmethod def from_roa(version, asn, prefix_tuple): """ Construct a prefix from a ROA. """ address, length, maxlength = prefix_tuple cls = IPv6PrefixPDU if address.version == 6 else IPv4PrefixPDU self = cls(version = version) self.asn = asn self.prefix = address self.prefixlen = length self.max_prefixlen = length if maxlength is None else maxlength self.announce = 1 self.check() return self class IPv4PrefixPDU(PrefixPDU): """ IPv4 flavor of a prefix. """ pdu_type = 4 address_byte_count = 4 class IPv6PrefixPDU(PrefixPDU): """ IPv6 flavor of a prefix. """ pdu_type = 6 address_byte_count = 16 class RouterKeyPDU(rpki.rtr.pdus.RouterKeyPDU): """ Router Key PDU. """ announce = None ski = None asn = None key = None @classmethod def from_text(cls, version, asn, gski, key): """ Construct a router key from its text form. """ self = cls(version = version) self.asn = long(asn) self.ski = base64.urlsafe_b64decode(gski + "=") self.key = base64.b64decode(key) self.announce = 1 self.check() return self @classmethod def from_certificate(cls, version, asn, ski, key): """ Construct a router key from a certificate. """ self = cls(version = version) self.asn = asn self.ski = ski self.key = key self.announce = 1 self.check() return self class ROA(rpki.POW.ROA): # pylint: disable=W0232 """ Minor additions to rpki.POW.ROA. """ @classmethod def derReadFile(cls, fn): # pylint: disable=E1002 self = super(ROA, cls).derReadFile(fn) self.extractWithoutVerifying() return self @property def prefixes(self): v4, v6 = self.getPrefixes() # pylint: disable=E1101 if v4 is not None: for p in v4: yield p if v6 is not None: for p in v6: yield p class X509(rpki.POW.X509): # pylint: disable=W0232 """ Minor additions to rpki.POW.X509. """ @property def asns(self): resources = self.getRFC3779() # pylint: disable=E1101 if resources is not None and resources[0] is not None: for min_asn, max_asn in resources[0]: for asn in xrange(min_asn, max_asn + 1): yield asn class PDUSet(list): """ Object representing a set of PDUs, that is, one versioned and (theoretically) consistant set of prefixes and router keys extracted from rcynic's output. """ def __init__(self, version): assert version in rpki.rtr.pdus.PDU.version_map super(PDUSet, self).__init__() self.version = version @classmethod def _load_file(cls, filename, version): """ Low-level method to read PDUSet from a file. """ self = cls(version = version) f = open(filename, "rb") r = rpki.rtr.channels.ReadBuffer() while True: p = rpki.rtr.pdus.PDU.read_pdu(r) while p is None: b = f.read(r.needed()) if b == "": assert r.available() == 0 return self r.put(b) p = r.retry() assert p.version == self.version self.append(p) @staticmethod def seq_ge(a, b): return ((a - b) % (1 << 32)) < (1 << 31) class AXFRSet(PDUSet): """ Object representing a complete set of PDUs, that is, one versioned and (theoretically) consistant set of prefixes and router certificates extracted from rcynic's output, all with the announce field set. """ class_map = dict(cer = X509, roa = ROA) serial = None @classmethod def parse_rcynic(cls, rcynic_dir, version, scan_roas = None, scan_routercerts = None): """ Parse ROAS and router certificates fetched (and validated!) by rcynic to create a new AXFRSet. In normal operation, we parse these data directly from whatever rcynic is using as a validator this week, but we can, if so instructed, use external programs instead, for testing, simulation, or to provide a way to inject local data. At some point the ability to parse these data from external programs may move to a separate constructor function, so that we can make this one a bit simpler and faster. """ self = cls(version = version) self.serial = rpki.rtr.channels.Timestamp.now() include_routercerts = RouterKeyPDU.pdu_type in rpki.rtr.pdus.PDU.version_map[version] if scan_roas is None: for uri, roa in authenticated_objects(rcynic_dir, uri_suffix = ".roa", class_map = self.class_map): roa.extractWithoutVerifying() asn = roa.getASID() self.extend(PrefixPDU.from_roa(version = version, asn = asn, prefix_tuple = prefix_tuple) for prefix_tuple in roa.prefixes) if scan_routercerts is None and include_routercerts: for uri, cer in authenticated_objects(rcynic_dir, uri_suffix = ".cer", class_map = self.class_map): eku = cer.getEKU() if eku is not None and rpki.oids.id_kp_bgpsec_router in eku: ski = cer.getSKI() key = cer.getPublicKey().derWritePublic() self.extend(RouterKeyPDU.from_certificate(version = version, asn = asn, ski = ski, key = key) for asn in cer.asns) if scan_roas is not None: try: p = subprocess.Popen((scan_roas, rcynic_dir), stdout = subprocess.PIPE) for line in p.stdout: line = line.split() asn = line[1] self.extend(PrefixPDU.from_text(version = version, asn = asn, addr = addr) for addr in line[2:]) except OSError, e: sys.exit("Could not run %s: %s" % (scan_roas, e)) if include_routercerts and scan_routercerts is not None: try: p = subprocess.Popen((scan_routercerts, rcynic_dir), stdout = subprocess.PIPE) for line in p.stdout: line = line.split() gski = line[0] key = line[-1] self.extend(RouterKeyPDU.from_text(version = version, asn = asn, gski = gski, key = key) for asn in line[1:-1]) except OSError, e: sys.exit("Could not run %s: %s" % (scan_routercerts, e)) self.sort() for i in xrange(len(self) - 2, -1, -1): if self[i] == self[i + 1]: del self[i + 1] return self @classmethod def load(cls, filename): """ Load an AXFRSet from a file, parse filename to obtain version and serial. """ fn1, fn2, fn3 = os.path.basename(filename).split(".") assert fn1.isdigit() and fn2 == "ax" and fn3.startswith("v") and fn3[1:].isdigit() version = int(fn3[1:]) self = cls._load_file(filename, version) self.serial = rpki.rtr.channels.Timestamp(fn1) return self def filename(self): """ Generate filename for this AXFRSet. """ return "%d.ax.v%d" % (self.serial, self.version) @classmethod def load_current(cls, version): """ Load current AXFRSet. Return None if can't. """ serial = rpki.rtr.server.read_current(version)[0] if serial is None: return None try: return cls.load("%d.ax.v%d" % (serial, version)) except IOError: return None def save_axfr(self): """ Write AXFRSet to file with magic filename. """ f = open(self.filename(), "wb") for p in self: f.write(p.to_pdu()) f.close() def destroy_old_data(self): """ Destroy old data files, presumably because our nonce changed and the old serial numbers are no longer valid. """ for i in glob.iglob("*.ix.*.v%d" % self.version): os.unlink(i) for i in glob.iglob("*.ax.v%d" % self.version): if i != self.filename(): os.unlink(i) @staticmethod def new_nonce(force_zero_nonce): """ Create and return a new nonce value. """ if force_zero_nonce: return 0 try: return int(random.SystemRandom().getrandbits(16)) except NotImplementedError: return int(random.getrandbits(16)) def mark_current(self, force_zero_nonce = False): """ Save current serial number and nonce, creating new nonce if necessary. Creating a new nonce triggers cleanup of old state, as the new nonce invalidates all old serial numbers. """ assert self.version in rpki.rtr.pdus.PDU.version_map old_serial, nonce = rpki.rtr.server.read_current(self.version) if old_serial is None or self.seq_ge(old_serial, self.serial): logging.debug("Creating new nonce and deleting stale data") nonce = self.new_nonce(force_zero_nonce) self.destroy_old_data() rpki.rtr.server.write_current(self.serial, nonce, self.version) def save_ixfr(self, other): """ Comparing this AXFRSet with an older one and write the resulting IXFRSet to file with magic filename. Since we store PDUSets in sorted order, computing the difference is a trivial linear comparison. """ f = open("%d.ix.%d.v%d" % (self.serial, other.serial, self.version), "wb") old = other new = self len_old = len(old) len_new = len(new) i_old = i_new = 0 while i_old < len_old and i_new < len_new: if old[i_old] < new[i_new]: f.write(old[i_old].to_pdu(announce = 0)) i_old += 1 elif old[i_old] > new[i_new]: f.write(new[i_new].to_pdu(announce = 1)) i_new += 1 else: i_old += 1 i_new += 1 for i in xrange(i_old, len_old): f.write(old[i].to_pdu(announce = 0)) for i in xrange(i_new, len_new): f.write(new[i].to_pdu(announce = 1)) f.close() def show(self): """ Print this AXFRSet. """ logging.debug("# AXFR %d (%s) v%d", self.serial, self.serial, self.version) for p in self: logging.debug(p) class IXFRSet(PDUSet): """ Object representing an incremental set of PDUs, that is, the differences between one versioned and (theoretically) consistant set of prefixes and router certificates extracted from rcynic's output and another, with the announce fields set or cleared as necessary to indicate the changes. """ from_serial = None to_serial = None @classmethod def load(cls, filename): """ Load an IXFRSet from a file, parse filename to obtain version and serials. """ fn1, fn2, fn3, fn4 = os.path.basename(filename).split(".") assert fn1.isdigit() and fn2 == "ix" and fn3.isdigit() and fn4.startswith("v") and fn4[1:].isdigit() version = int(fn4[1:]) self = cls._load_file(filename, version) self.from_serial = rpki.rtr.channels.Timestamp(fn3) self.to_serial = rpki.rtr.channels.Timestamp(fn1) return self def filename(self): """ Generate filename for this IXFRSet. """ return "%d.ix.%d.v%d" % (self.to_serial, self.from_serial, self.version) def show(self): """ Print this IXFRSet. """ logging.debug("# IXFR %d (%s) -> %d (%s) v%d", self.from_serial, self.from_serial, self.to_serial, self.to_serial, self.version) for p in self: logging.debug(p) def kick_all(serial): """ Kick any existing server processes to wake them up. """ try: os.stat(rpki.rtr.server.kickme_dir) except OSError: logging.debug('# Creating directory "%s"', rpki.rtr.server.kickme_dir) os.makedirs(rpki.rtr.server.kickme_dir) msg = "Good morning, serial %d is ready" % serial sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) for name in glob.iglob("%s.*" % rpki.rtr.server.kickme_base): try: logging.debug("# Kicking %s", name) sock.sendto(msg, name) except socket.error: try: logging.exception("# Failed to kick %s, probably dead socket, attempting cleanup", name) os.unlink(name) except Exception, e: logging.exception("# Couldn't unlink suspected dead socket %s: %s", name, e) except Exception, e: logging.warning("# Failed to kick %s and don't understand why: %s", name, e) sock.close() def cronjob_main(args): """ Run this right after running rcynic to wade through the ROAs and router certificates that rcynic collects and translate that data into the form used in the rpki-router protocol. Output is an updated database containing both full dumps (AXFR) and incremental dumps against a specific prior version (IXFR). After updating the database, kicks any active servers, so that they can notify their clients that a new version is available. """ if args.rpki_rtr_dir: try: if not os.path.isdir(args.rpki_rtr_dir): os.makedirs(args.rpki_rtr_dir) os.chdir(args.rpki_rtr_dir) except OSError, e: logging.critical(str(e)) sys.exit(1) for version in sorted(rpki.rtr.server.PDU.version_map.iterkeys(), reverse = True): logging.debug("# Generating updates for protocol version %d", version) old_ixfrs = glob.glob("*.ix.*.v%d" % version) current = rpki.rtr.server.read_current(version)[0] cutoff = Timestamp.now(-(24 * 60 * 60)) for f in glob.iglob("*.ax.v%d" % version): t = Timestamp(int(f.split(".")[0])) if t < cutoff and t != current: logging.debug("# Deleting old file %s, timestamp %s", f, t) os.unlink(f) pdus = rpki.rtr.generator.AXFRSet.parse_rcynic(args.rcynic_dir, version, args.scan_roas, args.scan_routercerts) if pdus == rpki.rtr.generator.AXFRSet.load_current(version): logging.debug("# No change, new serial not needed") continue pdus.save_axfr() for axfr in glob.iglob("*.ax.v%d" % version): if axfr != pdus.filename(): pdus.save_ixfr(rpki.rtr.generator.AXFRSet.load(axfr)) pdus.mark_current(args.force_zero_nonce) logging.debug("# New serial is %d (%s)", pdus.serial, pdus.serial) rpki.rtr.generator.kick_all(pdus.serial) old_ixfrs.sort() for ixfr in old_ixfrs: try: logging.debug("# Deleting old file %s", ixfr) os.unlink(ixfr) except OSError: pass def show_main(args): """ Display current rpki-rtr server database in textual form. """ if args.rpki_rtr_dir: try: os.chdir(args.rpki_rtr_dir) except OSError, e: sys.exit(e) g = glob.glob("*.ax.v*") g.sort() for f in g: rpki.rtr.generator.AXFRSet.load(f).show() g = glob.glob("*.ix.*.v*") g.sort() for f in g: rpki.rtr.generator.IXFRSet.load(f).show() def argparse_setup(subparsers): """ Set up argparse stuff for commands in this module. """ subparser = subparsers.add_parser("cronjob", description = cronjob_main.__doc__, help = "Generate RPKI-RTR database from rcynic output") subparser.set_defaults(func = cronjob_main, default_log_destination = "syslog") subparser.add_argument("--scan-roas", help = "specify an external scan_roas program") subparser.add_argument("--scan-routercerts", help = "specify an external scan_routercerts program") subparser.add_argument("--force_zero_nonce", action = "store_true", help = "force nonce value of zero") subparser.add_argument("rcynic_dir", nargs = "?", help = "directory containing validated rcynic output tree") subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database") subparser = subparsers.add_parser("show", description = show_main.__doc__, help = "Display content of RPKI-RTR database") subparser.set_defaults(func = show_main, default_log_destination = "stderr") subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")