aboutsummaryrefslogtreecommitdiff
path: root/rpki/rtr/generator.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rtr/generator.py')
-rw-r--r--rpki/rtr/generator.py575
1 files changed, 575 insertions, 0 deletions
diff --git a/rpki/rtr/generator.py b/rpki/rtr/generator.py
new file mode 100644
index 00000000..26e25b6e
--- /dev/null
+++ b/rpki/rtr/generator.py
@@ -0,0 +1,575 @@
+# $Id$
+#
+# Copyright (C) 2014 Dragon Research Labs ("DRL")
+# Portions copyright (C) 2009-2013 Internet Systems Consortium ("ISC")
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notices and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL
+# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR
+# ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
+# DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA
+# OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
+# TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+"""
+Database generator for RPKI-RTR server (RFC 6810 et sequalia).
+"""
+
+import os
+import sys
+import glob
+import socket
+import base64
+import random
+import logging
+import subprocess
+import rpki.POW
+import rpki.oids
+import rpki.rtr.pdus
+import rpki.rtr.channels
+import rpki.rtr.server
+
+from rpki.rtr.channels import Timestamp
+
+class PrefixPDU(rpki.rtr.pdus.PrefixPDU):
+ """
+ Object representing one prefix. This corresponds closely to one PDU
+ in the rpki-router protocol, so closely that we use lexical ordering
+ of the wire format of the PDU as the ordering for this class.
+
+ This is a virtual class, but the .from_text() constructor
+ instantiates the correct concrete subclass (IPv4PrefixPDU or
+ IPv6PrefixPDU) depending on the syntax of its input text.
+ """
+
+ @staticmethod
+ def from_text(version, asn, addr):
+ """
+ Construct a prefix from its text form.
+ """
+
+ cls = IPv6PrefixPDU if ":" in addr else IPv4PrefixPDU
+ self = cls(version = version)
+ self.asn = long(asn)
+ p, l = addr.split("/")
+ self.prefix = rpki.POW.IPAddress(p)
+ if "-" in l:
+ self.prefixlen, self.max_prefixlen = tuple(int(i) for i in l.split("-"))
+ else:
+ self.prefixlen = self.max_prefixlen = int(l)
+ self.announce = 1
+ self.check()
+ return self
+
+ @staticmethod
+ def from_roa(version, asn, prefix_tuple):
+ """
+ Construct a prefix from a ROA.
+ """
+
+ address, length, maxlength = prefix_tuple
+ cls = IPv6PrefixPDU if address.version == 6 else IPv4PrefixPDU
+ self = cls(version = version)
+ self.asn = asn
+ self.prefix = address
+ self.prefixlen = length
+ self.max_prefixlen = length if maxlength is None else maxlength
+ self.announce = 1
+ self.check()
+ return self
+
+
+class IPv4PrefixPDU(PrefixPDU):
+ """
+ IPv4 flavor of a prefix.
+ """
+
+ pdu_type = 4
+ address_byte_count = 4
+
+class IPv6PrefixPDU(PrefixPDU):
+ """
+ IPv6 flavor of a prefix.
+ """
+
+ pdu_type = 6
+ address_byte_count = 16
+
+class RouterKeyPDU(rpki.rtr.pdus.RouterKeyPDU):
+ """
+ Router Key PDU.
+ """
+
+ @classmethod
+ def from_text(cls, version, asn, gski, key):
+ """
+ Construct a router key from its text form.
+ """
+
+ self = cls(version = version)
+ self.asn = long(asn)
+ self.ski = base64.urlsafe_b64decode(gski + "=")
+ self.key = base64.b64decode(key)
+ self.announce = 1
+ self.check()
+ return self
+
+ @classmethod
+ def from_certificate(cls, version, asn, ski, key):
+ """
+ Construct a router key from a certificate.
+ """
+
+ self = cls(version = version)
+ self.asn = asn
+ self.ski = ski
+ self.key = key
+ self.announce = 1
+ self.check()
+ return self
+
+
+class ROA(rpki.POW.ROA): # pylint: disable=W0232
+ """
+ Minor additions to rpki.POW.ROA.
+ """
+
+ @classmethod
+ def derReadFile(cls, fn): # pylint: disable=E1002
+ self = super(ROA, cls).derReadFile(fn)
+ self.extractWithoutVerifying()
+ return self
+
+ @property
+ def prefixes(self):
+ v4, v6 = self.getPrefixes()
+ if v4 is not None:
+ for p in v4:
+ yield p
+ if v6 is not None:
+ for p in v6:
+ yield p
+
+class X509(rpki.POW.X509): # pylint: disable=W0232
+ """
+ Minor additions to rpki.POW.X509.
+ """
+
+ @property
+ def asns(self):
+ resources = self.getRFC3779()
+ if resources is not None and resources[0] is not None:
+ for min_asn, max_asn in resources[0]:
+ for asn in xrange(min_asn, max_asn + 1):
+ yield asn
+
+
+class PDUSet(list):
+ """
+ Object representing a set of PDUs, that is, one versioned and
+ (theoretically) consistant set of prefixes and router keys extracted
+ from rcynic's output.
+ """
+
+ def __init__(self, version):
+ assert version in rpki.rtr.pdus.PDU.version_map
+ super(PDUSet, self).__init__()
+ self.version = version
+
+ @classmethod
+ def _load_file(cls, filename, version):
+ """
+ Low-level method to read PDUSet from a file.
+ """
+
+ self = cls(version = version)
+ f = open(filename, "rb")
+ r = rpki.rtr.channels.ReadBuffer()
+ while True:
+ p = rpki.rtr.pdus.PDU.read_pdu(r)
+ while p is None:
+ b = f.read(r.needed())
+ if b == "":
+ assert r.available() == 0
+ return self
+ r.put(b)
+ p = r.retry()
+ assert p.version == self.version
+ self.append(p)
+
+ @staticmethod
+ def seq_ge(a, b):
+ return ((a - b) % (1 << 32)) < (1 << 31)
+
+
+class AXFRSet(PDUSet):
+ """
+ Object representing a complete set of PDUs, that is, one versioned
+ and (theoretically) consistant set of prefixes and router
+ certificates extracted from rcynic's output, all with the announce
+ field set.
+ """
+
+ @classmethod
+ def parse_rcynic(cls, rcynic_dir, version, scan_roas = None, scan_routercerts = None):
+ """
+ Parse ROAS and router certificates fetched (and validated!) by
+ rcynic to create a new AXFRSet.
+
+ In normal operation, we use os.walk() and the rpki.POW library to
+ parse these data directly, but we can, if so instructed, use
+ external programs instead, for testing, simulation, or to provide
+ a way to inject local data.
+
+ At some point the ability to parse these data from external
+ programs may move to a separate constructor function, so that we
+ can make this one a bit simpler and faster.
+ """
+
+ self = cls(version = version)
+ self.serial = rpki.rtr.channels.Timestamp.now()
+
+ include_routercerts = RouterKeyPDU.pdu_type in rpki.rtr.pdus.PDU.version_map[version]
+
+ if scan_roas is None or (scan_routercerts is None and include_routercerts):
+ for root, dirs, files in os.walk(rcynic_dir): # pylint: disable=W0612
+ for fn in files:
+ if scan_roas is None and fn.endswith(".roa"):
+ roa = ROA.derReadFile(os.path.join(root, fn))
+ asn = roa.getASID()
+ self.extend(PrefixPDU.from_roa(version = version, asn = asn, prefix_tuple = prefix_tuple)
+ for prefix_tuple in roa.prefixes)
+ if include_routercerts and scan_routercerts is None and fn.endswith(".cer"):
+ x = X509.derReadFile(os.path.join(root, fn))
+ eku = x.getEKU()
+ if eku is not None and rpki.oids.id_kp_bgpsec_router in eku:
+ ski = x.getSKI()
+ key = x.getPublicKey().derWritePublic()
+ self.extend(RouterKeyPDU.from_certificate(version = version, asn = asn, ski = ski, key = key)
+ for asn in x.asns)
+
+ if scan_roas is not None:
+ try:
+ p = subprocess.Popen((scan_roas, rcynic_dir), stdout = subprocess.PIPE)
+ for line in p.stdout:
+ line = line.split()
+ asn = line[1]
+ self.extend(PrefixPDU.from_text(version = version, asn = asn, addr = addr)
+ for addr in line[2:])
+ except OSError, e:
+ sys.exit("Could not run %s: %s" % (scan_roas, e))
+
+ if include_routercerts and scan_routercerts is not None:
+ try:
+ p = subprocess.Popen((scan_routercerts, rcynic_dir), stdout = subprocess.PIPE)
+ for line in p.stdout:
+ line = line.split()
+ gski = line[0]
+ key = line[-1]
+ self.extend(RouterKeyPDU.from_text(version = version, asn = asn, gski = gski, key = key)
+ for asn in line[1:-1])
+ except OSError, e:
+ sys.exit("Could not run %s: %s" % (scan_routercerts, e))
+
+ self.sort()
+ for i in xrange(len(self) - 2, -1, -1):
+ if self[i] == self[i + 1]:
+ del self[i + 1]
+ return self
+
+ @classmethod
+ def load(cls, filename):
+ """
+ Load an AXFRSet from a file, parse filename to obtain version and serial.
+ """
+
+ fn1, fn2, fn3 = os.path.basename(filename).split(".")
+ assert fn1.isdigit() and fn2 == "ax" and fn3.startswith("v") and fn3[1:].isdigit()
+ version = int(fn3[1:])
+ self = cls._load_file(filename, version)
+ self.serial = rpki.rtr.channels.Timestamp(fn1)
+ return self
+
+ def filename(self):
+ """
+ Generate filename for this AXFRSet.
+ """
+
+ return "%d.ax.v%d" % (self.serial, self.version)
+
+ @classmethod
+ def load_current(cls, version):
+ """
+ Load current AXFRSet. Return None if can't.
+ """
+
+ serial = rpki.rtr.server.read_current(version)[0]
+ if serial is None:
+ return None
+ try:
+ return cls.load("%d.ax.v%d" % (serial, version))
+ except IOError:
+ return None
+
+ def save_axfr(self):
+ """
+ Write AXFRSet to file with magic filename.
+ """
+
+ f = open(self.filename(), "wb")
+ for p in self:
+ f.write(p.to_pdu())
+ f.close()
+
+ def destroy_old_data(self):
+ """
+ Destroy old data files, presumably because our nonce changed and
+ the old serial numbers are no longer valid.
+ """
+
+ for i in glob.iglob("*.ix.*.v%d" % self.version):
+ os.unlink(i)
+ for i in glob.iglob("*.ax.v%d" % self.version):
+ if i != self.filename():
+ os.unlink(i)
+
+ @staticmethod
+ def new_nonce(force_zero_nonce):
+ """
+ Create and return a new nonce value.
+ """
+
+ if force_zero_nonce:
+ return 0
+ try:
+ return int(random.SystemRandom().getrandbits(16))
+ except NotImplementedError:
+ return int(random.getrandbits(16))
+
+ def mark_current(self, force_zero_nonce = False):
+ """
+ Save current serial number and nonce, creating new nonce if
+ necessary. Creating a new nonce triggers cleanup of old state, as
+ the new nonce invalidates all old serial numbers.
+ """
+
+ assert self.version in rpki.rtr.pdus.PDU.version_map
+ old_serial, nonce = rpki.rtr.server.read_current(self.version)
+ if old_serial is None or self.seq_ge(old_serial, self.serial):
+ logging.debug("Creating new nonce and deleting stale data")
+ nonce = self.new_nonce(force_zero_nonce)
+ self.destroy_old_data()
+ rpki.rtr.server.write_current(self.serial, nonce, self.version)
+
+ def save_ixfr(self, other):
+ """
+ Comparing this AXFRSet with an older one and write the resulting
+ IXFRSet to file with magic filename. Since we store PDUSets
+ in sorted order, computing the difference is a trivial linear
+ comparison.
+ """
+
+ f = open("%d.ix.%d.v%d" % (self.serial, other.serial, self.version), "wb")
+ old = other
+ new = self
+ len_old = len(old)
+ len_new = len(new)
+ i_old = i_new = 0
+ while i_old < len_old and i_new < len_new:
+ if old[i_old] < new[i_new]:
+ f.write(old[i_old].to_pdu(announce = 0))
+ i_old += 1
+ elif old[i_old] > new[i_new]:
+ f.write(new[i_new].to_pdu(announce = 1))
+ i_new += 1
+ else:
+ i_old += 1
+ i_new += 1
+ for i in xrange(i_old, len_old):
+ f.write(old[i].to_pdu(announce = 0))
+ for i in xrange(i_new, len_new):
+ f.write(new[i].to_pdu(announce = 1))
+ f.close()
+
+ def show(self):
+ """
+ Print this AXFRSet.
+ """
+
+ logging.debug("# AXFR %d (%s) v%d", self.serial, self.serial, self.version)
+ for p in self:
+ logging.debug(p)
+
+
+class IXFRSet(PDUSet):
+ """
+ Object representing an incremental set of PDUs, that is, the
+ differences between one versioned and (theoretically) consistant set
+ of prefixes and router certificates extracted from rcynic's output
+ and another, with the announce fields set or cleared as necessary to
+ indicate the changes.
+ """
+
+ @classmethod
+ def load(cls, filename):
+ """
+ Load an IXFRSet from a file, parse filename to obtain version and serials.
+ """
+
+ fn1, fn2, fn3, fn4 = os.path.basename(filename).split(".")
+ assert fn1.isdigit() and fn2 == "ix" and fn3.isdigit() and fn4.startswith("v") and fn4[1:].isdigit()
+ version = int(fn4[1:])
+ self = cls._load_file(filename, version)
+ self.from_serial = rpki.rtr.channels.Timestamp(fn3)
+ self.to_serial = rpki.rtr.channels.Timestamp(fn1)
+ return self
+
+ def filename(self):
+ """
+ Generate filename for this IXFRSet.
+ """
+
+ return "%d.ix.%d.v%d" % (self.to_serial, self.from_serial, self.version)
+
+ def show(self):
+ """
+ Print this IXFRSet.
+ """
+
+ logging.debug("# IXFR %d (%s) -> %d (%s) v%d",
+ self.from_serial, self.from_serial,
+ self.to_serial, self.to_serial,
+ self.version)
+ for p in self:
+ logging.debug(p)
+
+
+def kick_all(serial):
+ """
+ Kick any existing server processes to wake them up.
+ """
+
+ try:
+ os.stat(rpki.rtr.server.kickme_dir)
+ except OSError:
+ logging.debug('# Creating directory "%s"', rpki.rtr.server.kickme_dir)
+ os.makedirs(rpki.rtr.server.kickme_dir)
+
+ msg = "Good morning, serial %d is ready" % serial
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ for name in glob.iglob("%s.*" % rpki.rtr.server.kickme_base):
+ try:
+ logging.debug("# Kicking %s", name)
+ sock.sendto(msg, name)
+ except socket.error:
+ try:
+ logging.exception("# Failed to kick %s, probably dead socket, attempting cleanup", name)
+ os.unlink(name)
+ except Exception, e:
+ logging.exception("# Couldn't unlink suspected dead socket %s: %s", name, e)
+ except Exception, e:
+ logging.warning("# Failed to kick %s and don't understand why: %s", name, e)
+ sock.close()
+
+
+def cronjob_main(args):
+ """
+ Run this right after running rcynic to wade through the ROAs and
+ router certificates that rcynic collects and translate that data
+ into the form used in the rpki-router protocol. Output is an
+ updated database containing both full dumps (AXFR) and incremental
+ dumps against a specific prior version (IXFR). After updating the
+ database, kicks any active servers, so that they can notify their
+ clients that a new version is available.
+ """
+
+ if args.rpki_rtr_dir:
+ try:
+ if not os.path.isdir(args.rpki_rtr_dir):
+ os.makedirs(args.rpki_rtr_dir)
+ os.chdir(args.rpki_rtr_dir)
+ except OSError, e:
+ logging.critical(str(e))
+ sys.exit(1)
+
+ for version in sorted(rpki.rtr.server.PDU.version_map.iterkeys(), reverse = True):
+
+ logging.debug("# Generating updates for protocol version %d", version)
+
+ old_ixfrs = glob.glob("*.ix.*.v%d" % version)
+
+ current = rpki.rtr.server.read_current(version)[0]
+ cutoff = Timestamp.now(-(24 * 60 * 60))
+ for f in glob.iglob("*.ax.v%d" % version):
+ t = Timestamp(int(f.split(".")[0]))
+ if t < cutoff and t != current:
+ logging.debug("# Deleting old file %s, timestamp %s", f, t)
+ os.unlink(f)
+
+ pdus = rpki.rtr.generator.AXFRSet.parse_rcynic(args.rcynic_dir, version, args.scan_roas, args.scan_routercerts)
+ if pdus == rpki.rtr.generator.AXFRSet.load_current(version):
+ logging.debug("# No change, new serial not needed")
+ continue
+ pdus.save_axfr()
+ for axfr in glob.iglob("*.ax.v%d" % version):
+ if axfr != pdus.filename():
+ pdus.save_ixfr(rpki.rtr.generator.AXFRSet.load(axfr))
+ pdus.mark_current(args.force_zero_nonce)
+
+ logging.debug("# New serial is %d (%s)", pdus.serial, pdus.serial)
+
+ rpki.rtr.generator.kick_all(pdus.serial)
+
+ old_ixfrs.sort()
+ for ixfr in old_ixfrs:
+ try:
+ logging.debug("# Deleting old file %s", ixfr)
+ os.unlink(ixfr)
+ except OSError:
+ pass
+
+
+def show_main(args):
+ """
+ Display current rpki-rtr server database in textual form.
+ """
+
+ if args.rpki_rtr_dir:
+ try:
+ os.chdir(args.rpki_rtr_dir)
+ except OSError, e:
+ sys.exit(e)
+
+ g = glob.glob("*.ax.v*")
+ g.sort()
+ for f in g:
+ rpki.rtr.generator.AXFRSet.load(f).show()
+
+ g = glob.glob("*.ix.*.v*")
+ g.sort()
+ for f in g:
+ rpki.rtr.generator.IXFRSet.load(f).show()
+
+def argparse_setup(subparsers):
+ """
+ Set up argparse stuff for commands in this module.
+ """
+
+ subparser = subparsers.add_parser("cronjob", description = cronjob_main.__doc__,
+ help = "Generate RPKI-RTR database from rcynic output")
+ subparser.set_defaults(func = cronjob_main, default_log_to = "syslog")
+ subparser.add_argument("--scan-roas", help = "specify an external scan_roas program")
+ subparser.add_argument("--scan-routercerts", help = "specify an external scan_routercerts program")
+ subparser.add_argument("--force_zero_nonce", action = "store_true", help = "force nonce value of zero")
+ subparser.add_argument("rcynic_dir", help = "directory containing validated rcynic output tree")
+ subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")
+
+ subparser = subparsers.add_parser("show", description = show_main.__doc__,
+ help = "Display content of RPKI-RTR database")
+ subparser.set_defaults(func = show_main, default_log_to = "stderr")
+ subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")