aboutsummaryrefslogtreecommitdiff
path: root/rpki/rpki_rtr/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rpki_rtr/client.py')
-rw-r--r--rpki/rpki_rtr/client.py530
1 files changed, 0 insertions, 530 deletions
diff --git a/rpki/rpki_rtr/client.py b/rpki/rpki_rtr/client.py
deleted file mode 100644
index 0ba3688f..00000000
--- a/rpki/rpki_rtr/client.py
+++ /dev/null
@@ -1,530 +0,0 @@
-# $Id$
-#
-# Copyright (C) 2014 Dragon Research Labs ("DRL")
-# Portions copyright (C) 2009-2013 Internet Systems Consortium ("ISC")
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notices and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND DRL AND ISC DISCLAIM ALL
-# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL OR
-# ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
-# DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA
-# OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
-# TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-# PERFORMANCE OF THIS SOFTWARE.
-
-"""
-Client implementation for the RPKI-RTR protocol (RFC 6810 et sequalia).
-"""
-
-import os
-import sys
-import time
-import base64
-import socket
-import signal
-import logging
-import asyncore
-import subprocess
-import rpki.rpki_rtr.pdus
-import rpki.rpki_rtr.channels
-
-from rpki.rpki_rtr.pdus import ResetQueryPDU, SerialQueryPDU
-from rpki.rpki_rtr.channels import Timestamp
-
-
-class PDU(rpki.rpki_rtr.pdus.PDU):
- """
- Object representing a generic PDU in the rpki-router protocol.
- Real PDUs are subclasses of this class.
- """
-
- def consume(self, client):
- """
- Handle results in test client. Default behavior is just to print
- out the PDU; data PDU subclasses may override this.
- """
-
- logging.debug(self)
-
-
-clone_pdu = rpki.rpki_rtr.pdus.clone_pdu_root(PDU)
-
-
-@clone_pdu
-class SerialNotifyPDU(rpki.rpki_rtr.pdus.SerialNotifyPDU):
- """
- Serial Notify PDU.
- """
-
- def consume(self, client):
- """
- Respond to a SerialNotifyPDU with either a SerialQueryPDU or a
- ResetQueryPDU, depending on what we already know.
- """
-
- logging.debug(self)
- if client.current_serial is None or client.current_nonce != self.nonce:
- client.push_pdu(ResetQueryPDU(version = client.version))
- elif self.serial != client.current_serial:
- client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
- else:
- logging.debug("[Notify did not change serial number, ignoring]")
-
-
-@clone_pdu
-class CacheResponsePDU(rpki.rpki_rtr.pdus.CacheResponsePDU):
- """
- Cache Response PDU.
- """
-
- def consume(self, client):
- """
- Handle CacheResponsePDU.
- """
-
- logging.debug(self)
- if self.nonce != client.current_nonce:
- logging.debug("[Nonce changed, resetting]")
- client.cache_reset()
-
-@clone_pdu
-class EndOfDataPDUv0(rpki.rpki_rtr.pdus.EndOfDataPDUv0):
- """
- End of Data PDU, protocol version 0.
- """
-
- def consume(self, client):
- """
- Handle EndOfDataPDU response.
- """
-
- logging.debug(self)
- client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
-
-@clone_pdu
-class EndOfDataPDUv1(rpki.rpki_rtr.pdus.EndOfDataPDUv1):
- """
- End of Data PDU, protocol version 1.
- """
-
- def consume(self, client):
- """
- Handle EndOfDataPDU response.
- """
-
- logging.debug(self)
- client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
-
-
-@clone_pdu
-class CacheResetPDU(rpki.rpki_rtr.pdus.CacheResetPDU):
- """
- Cache reset PDU.
- """
-
- def consume(self, client):
- """
- Handle CacheResetPDU response, by issuing a ResetQueryPDU.
- """
-
- logging.debug(self)
- client.cache_reset()
- client.push_pdu(ResetQueryPDU(version = client.version))
-
-
-class PrefixPDU(rpki.rpki_rtr.pdus.PrefixPDU):
- """
- Object representing one prefix. This corresponds closely to one PDU
- in the rpki-router protocol, so closely that we use lexical ordering
- of the wire format of the PDU as the ordering for this class.
-
- This is a virtual class, but the .from_text() constructor
- instantiates the correct concrete subclass (IPv4PrefixPDU or
- IPv6PrefixPDU) depending on the syntax of its input text.
- """
-
- def consume(self, client):
- """
- Handle one incoming prefix PDU
- """
-
- logging.debug(self)
- client.consume_prefix(self)
-
-
-@clone_pdu
-class IPv4PrefixPDU(PrefixPDU, rpki.rpki_rtr.pdus.IPv4PrefixPDU):
- """
- IPv4 flavor of a prefix.
- """
-
- pass
-
-@clone_pdu
-class IPv6PrefixPDU(PrefixPDU, rpki.rpki_rtr.pdus.IPv6PrefixPDU):
- """
- IPv6 flavor of a prefix.
- """
-
- pass
-
-@clone_pdu
-class RouterKeyPDU(rpki.rpki_rtr.pdus.RouterKeyPDU):
- """
- Router Key PDU.
- """
-
- def consume(self, client):
- """
- Handle one incoming Router Key PDU
- """
-
- logging.debug(self)
- client.consume_routerkey(self)
-
-
-class ClientChannel(rpki.rpki_rtr.channels.PDUChannel):
- """
- Client protocol engine, handles upcalls from PDUChannel.
- """
-
- current_serial = None
- current_nonce = None
- sql = None
- host = None
- port = None
- cache_id = None
-
- # For initial test purposes, let's use the minimum allowed values
- # from the RFC 6810 bis I-D as the initial defaults for refresh and
- # retry, and the maximum allowed for expire; these will be overriden
- # as soon as we receive an EndOfDataPDU.
- #
- refresh = 120
- retry = 120
- expire = 172800
-
- def __init__(self, sock, proc, killsig, host, port):
- self.killsig = killsig
- self.proc = proc
- self.host = host
- self.port = port
- super(ClientChannel, self).__init__(sock = sock, root_pdu_class = PDU)
- self.start_new_pdu()
-
- @classmethod
- def ssh(cls, host, port):
- """
- Set up ssh connection and start listening for first PDU.
- """
-
- argv = ("ssh", "-p", port, "-s", host, "rpki-rtr")
- logging.debug("[Running ssh: %s]", " ".join(argv))
- s = socket.socketpair()
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, executable = "/usr/bin/ssh",
- stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGKILL,
- host = host, port = port)
-
- @classmethod
- def tcp(cls, host, port):
- """
- Set up TCP connection and start listening for first PDU.
- """
-
- logging.debug("[Starting raw TCP connection to %s:%s]", host, port)
- try:
- addrinfo = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- except socket.error, e:
- logging.debug("[socket.getaddrinfo() failed: %s]", e)
- else:
- for ai in addrinfo:
- af, socktype, proto, cn, sa = ai # pylint: disable=W0612
- logging.debug("[Trying addr %s port %s]", sa[0], sa[1])
- try:
- s = socket.socket(af, socktype, proto)
- except socket.error, e:
- logging.debug("[socket.socket() failed: %s]", e)
- continue
- try:
- s.connect(sa)
- except socket.error, e:
- logging.exception("[socket.connect() failed: %s]", e)
- s.close()
- continue
- return cls(sock = s, proc = None, killsig = None,
- host = host, port = port)
- sys.exit(1)
-
- @classmethod
- def loopback(cls, host, port):
- """
- Set up loopback connection and start listening for first PDU.
- """
-
- s = socket.socketpair()
- logging.debug("[Using direct subprocess kludge for testing]")
- argv = (sys.executable, sys.argv[0], "server")
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGINT,
- host = host, port = port)
-
- @classmethod
- def tls(cls, host, port):
- """
- Set up TLS connection and start listening for first PDU.
-
- NB: This uses OpenSSL's "s_client" command, which does not
- check server certificates properly, so this is not suitable for
- production use. Fixing this would be a trivial change, it just
- requires using a client program which does check certificates
- properly (eg, gnutls-cli, or stunnel's client mode if that works
- for such purposes this week).
- """
-
- argv = ("openssl", "s_client", "-tls1", "-quiet", "-connect", "%s:%s" % (host, port))
- logging.debug("[Running: %s]", " ".join(argv))
- s = socket.socketpair()
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGKILL,
- host = host, port = port)
-
- def setup_sql(self, sqlname):
- """
- Set up an SQLite database to contain the table we receive. If
- necessary, we will create the database.
- """
-
- import sqlite3
- missing = not os.path.exists(sqlname)
- self.sql = sqlite3.connect(sqlname, detect_types = sqlite3.PARSE_DECLTYPES)
- self.sql.text_factory = str
- cur = self.sql.cursor()
- cur.execute("PRAGMA foreign_keys = on")
- if missing:
- cur.execute('''
- CREATE TABLE cache (
- cache_id INTEGER PRIMARY KEY NOT NULL,
- host TEXT NOT NULL,
- port TEXT NOT NULL,
- version INTEGER,
- nonce INTEGER,
- serial INTEGER,
- updated INTEGER,
- refresh INTEGER,
- retry INTEGER,
- expire INTEGER,
- UNIQUE (host, port))''')
- cur.execute('''
- CREATE TABLE prefix (
- cache_id INTEGER NOT NULL
- REFERENCES cache(cache_id)
- ON DELETE CASCADE
- ON UPDATE CASCADE,
- asn INTEGER NOT NULL,
- prefix TEXT NOT NULL,
- prefixlen INTEGER NOT NULL,
- max_prefixlen INTEGER NOT NULL,
- UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''')
-
- cur.execute('''
- CREATE TABLE routerkey (
- cache_id INTEGER NOT NULL
- REFERENCES cache(cache_id)
- ON DELETE CASCADE
- ON UPDATE CASCADE,
- asn INTEGER NOT NULL,
- ski TEXT NOT NULL,
- key TEXT NOT NULL,
- UNIQUE (cache_id, asn, ski),
- UNIQUE (cache_id, asn, key))''')
-
- cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire "
- "FROM cache WHERE host = ? AND port = ?",
- (self.host, self.port))
- try:
- self.cache_id, version, self.current_nonce, self.current_serial, refresh, retry, expire = cur.fetchone()
- if version is not None:
- self.version = version
- if refresh is not None:
- self.refresh = refresh
- if retry is not None:
- self.retry = retry
- if expire is not None:
- self.expire = expire
- except TypeError:
- cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port))
- self.cache_id = cur.lastrowid
- self.sql.commit()
- logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s]",
- self.cache_id, self.version, self.current_nonce,
- self.current_serial, self.refresh, self.retry, self.expire)
-
- def cache_reset(self):
- """
- Handle CacheResetPDU actions.
- """
-
- self.current_serial = None
- if self.sql:
- #
- # For some reason there was no commit here. Dunno why.
- # See if adding one breaks anything....
- #
- cur = self.sql.cursor()
- cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,))
- cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,))
- cur.execute("UPDATE cache SET version = ?, serial = NULL WHERE cache_id = ?", (self.version, self.cache_id))
- self.sql.commit()
-
- def end_of_data(self, version, serial, nonce, refresh, retry, expire):
- """
- Handle EndOfDataPDU actions.
- """
-
- assert version == self.version
- self.current_serial = serial
- self.current_nonce = nonce
- self.refresh = refresh
- self.retry = retry
- self.expire = expire
- if self.sql:
- self.sql.execute("UPDATE cache SET"
- " version = ?, serial = ?, nonce = ?,"
- " refresh = ?, retry = ?, expire = ?,"
- " updated = datetime('now') "
- "WHERE cache_id = ?",
- (version, serial, nonce, refresh, retry, expire, self.cache_id))
- self.sql.commit()
-
- def consume_prefix(self, prefix):
- """
- Handle one prefix PDU.
- """
-
- if self.sql:
- values = (self.cache_id, prefix.asn, str(prefix.prefix), prefix.prefixlen, prefix.max_prefixlen)
- if prefix.announce:
- self.sql.execute("INSERT INTO prefix (cache_id, asn, prefix, prefixlen, max_prefixlen) "
- "VALUES (?, ?, ?, ?, ?)",
- values)
- else:
- self.sql.execute("DELETE FROM prefix "
- "WHERE cache_id = ? AND asn = ? AND prefix = ? AND prefixlen = ? AND max_prefixlen = ?",
- values)
-
- def consume_routerkey(self, routerkey):
- """
- Handle one Router Key PDU.
- """
-
- if self.sql:
- values = (self.cache_id, routerkey.asn,
- base64.urlsafe_b64encode(routerkey.ski).rstrip("="),
- base64.b64encode(routerkey.key))
- if routerkey.announce:
- self.sql.execute("INSERT INTO routerkey (cache_id, asn, ski, key) "
- "VALUES (?, ?, ?, ?)",
- values)
- else:
- self.sql.execute("DELETE FROM routerkey "
- "WHERE cache_id = ? AND asn = ? AND (ski = ? OR key = ?)",
- values)
-
- def deliver_pdu(self, pdu):
- """
- Handle received PDU.
- """
-
- pdu.consume(self)
-
- def push_pdu(self, pdu):
- """
- Log outbound PDU then write it to stream.
- """
-
- logging.debug(pdu)
- super(ClientChannel, self).push_pdu(pdu)
-
- def cleanup(self):
- """
- Force clean up this client's child process. If everything goes
- well, child will have exited already before this method is called,
- but we may need to whack it with a stick if something breaks.
- """
-
- if self.proc is not None and self.proc.returncode is None:
- try:
- os.kill(self.proc.pid, self.killsig)
- except OSError:
- pass
-
- def handle_close(self):
- """
- Intercept close event so we can log it, then shut down.
- """
-
- logging.debug("Server closed channel")
- super(ClientChannel, self).handle_close()
-
-
-def client_main(args):
- """
- Test client, intended primarily for debugging.
- """
-
- logging.debug("[Startup]")
-
- constructor = getattr(rpki.rpki_rtr.client.ClientChannel, args.protocol)
-
- client = None
- try:
- client = constructor(args.host, args.port)
- if args.sql_database:
- client.setup_sql(args.sql_database)
- while True:
- if client.current_serial is None or client.current_nonce is None:
- client.push_pdu(ResetQueryPDU(version = client.version))
- else:
- client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
- polled = Timestamp.now()
- wakeup = None
- while True:
- if wakeup != polled + client.refresh:
- wakeup = Timestamp(polled + client.refresh)
- logging.info("[Last client poll %s, next %s]", polled, wakeup)
- remaining = wakeup - time.time()
- if remaining < 0:
- break
- asyncore.loop(timeout = remaining, count = 1)
-
- except KeyboardInterrupt:
- sys.exit(0)
- finally:
- if client is not None:
- client.cleanup()
-
-
-def argparse_setup(subparsers):
- """
- Set up argparse stuff for commands in this module.
- """
-
- subparser = subparsers.add_parser("client", description = client_main.__doc__,
- help = "Test client for RPKI-RTR protocol")
- subparser.set_defaults(func = client_main, default_log_to = "stderr")
- subparser.add_argument("--sql-database", help = "filename for sqlite3 database of client state")
- subparser.add_argument("protocol", choices = ("loopback", "tcp", "ssh", "tls"), help = "connection protocol")
- subparser.add_argument("host", nargs = "?", help = "server host")
- subparser.add_argument("port", nargs = "?", help = "server port")