aboutsummaryrefslogtreecommitdiff
path: root/rpki/rtr/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/rtr/client.py')
-rw-r--r--rpki/rtr/client.py816
1 files changed, 408 insertions, 408 deletions
diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py
index a35ab81d..9c7a00d6 100644
--- a/rpki/rtr/client.py
+++ b/rpki/rtr/client.py
@@ -37,13 +37,13 @@ from rpki.rtr.channels import Timestamp
class PDU(rpki.rtr.pdus.PDU):
- def consume(self, client):
- """
- Handle results in test client. Default behavior is just to print
- out the PDU; data PDU subclasses may override this.
- """
+ def consume(self, client):
+ """
+ Handle results in test client. Default behavior is just to print
+ out the PDU; data PDU subclasses may override this.
+ """
- logging.debug(self)
+ logging.debug(self)
clone_pdu = rpki.rtr.pdus.clone_pdu_root(PDU)
@@ -52,407 +52,407 @@ clone_pdu = rpki.rtr.pdus.clone_pdu_root(PDU)
@clone_pdu
class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU):
- def consume(self, client):
- """
- Respond to a SerialNotifyPDU with either a SerialQueryPDU or a
- ResetQueryPDU, depending on what we already know.
- """
+ def consume(self, client):
+ """
+ Respond to a SerialNotifyPDU with either a SerialQueryPDU or a
+ ResetQueryPDU, depending on what we already know.
+ """
- logging.debug(self)
- if client.serial is None or client.nonce != self.nonce:
- client.push_pdu(ResetQueryPDU(version = client.version))
- elif self.serial != client.serial:
- client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.serial,
- nonce = client.nonce))
- else:
- logging.debug("[Notify did not change serial number, ignoring]")
+ logging.debug(self)
+ if client.serial is None or client.nonce != self.nonce:
+ client.push_pdu(ResetQueryPDU(version = client.version))
+ elif self.serial != client.serial:
+ client.push_pdu(SerialQueryPDU(version = client.version,
+ serial = client.serial,
+ nonce = client.nonce))
+ else:
+ logging.debug("[Notify did not change serial number, ignoring]")
@clone_pdu
class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU):
- def consume(self, client):
- """
- Handle CacheResponsePDU.
- """
+ def consume(self, client):
+ """
+ Handle CacheResponsePDU.
+ """
- logging.debug(self)
- if self.nonce != client.nonce:
- logging.debug("[Nonce changed, resetting]")
- client.cache_reset()
+ logging.debug(self)
+ if self.nonce != client.nonce:
+ logging.debug("[Nonce changed, resetting]")
+ client.cache_reset()
@clone_pdu
class EndOfDataPDUv0(rpki.rtr.pdus.EndOfDataPDUv0):
- def consume(self, client):
- """
- Handle EndOfDataPDU response.
- """
+ def consume(self, client):
+ """
+ Handle EndOfDataPDU response.
+ """
- logging.debug(self)
- client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
+ logging.debug(self)
+ client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
@clone_pdu
class EndOfDataPDUv1(rpki.rtr.pdus.EndOfDataPDUv1):
- def consume(self, client):
- """
- Handle EndOfDataPDU response.
- """
+ def consume(self, client):
+ """
+ Handle EndOfDataPDU response.
+ """
- logging.debug(self)
- client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
+ logging.debug(self)
+ client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire)
@clone_pdu
class CacheResetPDU(rpki.rtr.pdus.CacheResetPDU):
- def consume(self, client):
- """
- Handle CacheResetPDU response, by issuing a ResetQueryPDU.
- """
+ def consume(self, client):
+ """
+ Handle CacheResetPDU response, by issuing a ResetQueryPDU.
+ """
- logging.debug(self)
- client.cache_reset()
- client.push_pdu(ResetQueryPDU(version = client.version))
+ logging.debug(self)
+ client.cache_reset()
+ client.push_pdu(ResetQueryPDU(version = client.version))
class PrefixPDU(rpki.rtr.pdus.PrefixPDU):
- """
- Object representing one prefix. This corresponds closely to one PDU
- in the rpki-router protocol, so closely that we use lexical ordering
- of the wire format of the PDU as the ordering for this class.
-
- This is a virtual class, but the .from_text() constructor
- instantiates the correct concrete subclass (IPv4PrefixPDU or
- IPv6PrefixPDU) depending on the syntax of its input text.
- """
-
- def consume(self, client):
"""
- Handle one incoming prefix PDU
+ Object representing one prefix. This corresponds closely to one PDU
+ in the rpki-router protocol, so closely that we use lexical ordering
+ of the wire format of the PDU as the ordering for this class.
+
+ This is a virtual class, but the .from_text() constructor
+ instantiates the correct concrete subclass (IPv4PrefixPDU or
+ IPv6PrefixPDU) depending on the syntax of its input text.
"""
- logging.debug(self)
- client.consume_prefix(self)
+ def consume(self, client):
+ """
+ Handle one incoming prefix PDU
+ """
+
+ logging.debug(self)
+ client.consume_prefix(self)
@clone_pdu
class IPv4PrefixPDU(PrefixPDU, rpki.rtr.pdus.IPv4PrefixPDU):
- pass
+ pass
@clone_pdu
class IPv6PrefixPDU(PrefixPDU, rpki.rtr.pdus.IPv6PrefixPDU):
- pass
+ pass
@clone_pdu
class ErrorReportPDU(PDU, rpki.rtr.pdus.ErrorReportPDU):
- pass
+ pass
@clone_pdu
class RouterKeyPDU(rpki.rtr.pdus.RouterKeyPDU):
- """
- Router Key PDU.
- """
-
- def consume(self, client):
"""
- Handle one incoming Router Key PDU
+ Router Key PDU.
"""
- logging.debug(self)
- client.consume_routerkey(self)
+ def consume(self, client):
+ """
+ Handle one incoming Router Key PDU
+ """
+ logging.debug(self)
+ client.consume_routerkey(self)
-class ClientChannel(rpki.rtr.channels.PDUChannel):
- """
- Client protocol engine, handles upcalls from PDUChannel.
- """
-
- serial = None
- nonce = None
- sql = None
- host = None
- port = None
- cache_id = None
- refresh = rpki.rtr.pdus.default_refresh
- retry = rpki.rtr.pdus.default_retry
- expire = rpki.rtr.pdus.default_expire
- updated = Timestamp(0)
-
- def __init__(self, sock, proc, killsig, args, host = None, port = None):
- self.killsig = killsig
- self.proc = proc
- self.args = args
- self.host = args.host if host is None else host
- self.port = args.port if port is None else port
- super(ClientChannel, self).__init__(sock = sock, root_pdu_class = PDU)
- if args.force_version is not None:
- self.version = args.force_version
- self.start_new_pdu()
- if args.sql_database:
- self.setup_sql()
-
- @classmethod
- def ssh(cls, args):
- """
- Set up ssh connection and start listening for first PDU.
- """
- if args.port is None:
- argv = ("ssh", "-s", args.host, "rpki-rtr")
- else:
- argv = ("ssh", "-p", args.port, "-s", args.host, "rpki-rtr")
- logging.debug("[Running ssh: %s]", " ".join(argv))
- s = socket.socketpair()
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, executable = "/usr/bin/ssh",
- stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGKILL, args = args)
-
- @classmethod
- def tcp(cls, args):
- """
- Set up TCP connection and start listening for first PDU.
+class ClientChannel(rpki.rtr.channels.PDUChannel):
"""
-
- logging.debug("[Starting raw TCP connection to %s:%s]", args.host, args.port)
- try:
- addrinfo = socket.getaddrinfo(args.host, args.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
- except socket.error, e:
- logging.debug("[socket.getaddrinfo() failed: %s]", e)
- else:
- for ai in addrinfo:
- af, socktype, proto, cn, sa = ai # pylint: disable=W0612
- logging.debug("[Trying addr %s port %s]", sa[0], sa[1])
+ Client protocol engine, handles upcalls from PDUChannel.
+ """
+
+ serial = None
+ nonce = None
+ sql = None
+ host = None
+ port = None
+ cache_id = None
+ refresh = rpki.rtr.pdus.default_refresh
+ retry = rpki.rtr.pdus.default_retry
+ expire = rpki.rtr.pdus.default_expire
+ updated = Timestamp(0)
+
+ def __init__(self, sock, proc, killsig, args, host = None, port = None):
+ self.killsig = killsig
+ self.proc = proc
+ self.args = args
+ self.host = args.host if host is None else host
+ self.port = args.port if port is None else port
+ super(ClientChannel, self).__init__(sock = sock, root_pdu_class = PDU)
+ if args.force_version is not None:
+ self.version = args.force_version
+ self.start_new_pdu()
+ if args.sql_database:
+ self.setup_sql()
+
+ @classmethod
+ def ssh(cls, args):
+ """
+ Set up ssh connection and start listening for first PDU.
+ """
+
+ if args.port is None:
+ argv = ("ssh", "-s", args.host, "rpki-rtr")
+ else:
+ argv = ("ssh", "-p", args.port, "-s", args.host, "rpki-rtr")
+ logging.debug("[Running ssh: %s]", " ".join(argv))
+ s = socket.socketpair()
+ return cls(sock = s[1],
+ proc = subprocess.Popen(argv, executable = "/usr/bin/ssh",
+ stdin = s[0], stdout = s[0], close_fds = True),
+ killsig = signal.SIGKILL, args = args)
+
+ @classmethod
+ def tcp(cls, args):
+ """
+ Set up TCP connection and start listening for first PDU.
+ """
+
+ logging.debug("[Starting raw TCP connection to %s:%s]", args.host, args.port)
try:
- s = socket.socket(af, socktype, proto)
+ addrinfo = socket.getaddrinfo(args.host, args.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.error, e:
- logging.debug("[socket.socket() failed: %s]", e)
- continue
+ logging.debug("[socket.getaddrinfo() failed: %s]", e)
+ else:
+ for ai in addrinfo:
+ af, socktype, proto, cn, sa = ai # pylint: disable=W0612
+ logging.debug("[Trying addr %s port %s]", sa[0], sa[1])
+ try:
+ s = socket.socket(af, socktype, proto)
+ except socket.error, e:
+ logging.debug("[socket.socket() failed: %s]", e)
+ continue
+ try:
+ s.connect(sa)
+ except socket.error, e:
+ logging.exception("[socket.connect() failed: %s]", e)
+ s.close()
+ continue
+ return cls(sock = s, proc = None, killsig = None, args = args)
+ sys.exit(1)
+
+ @classmethod
+ def loopback(cls, args):
+ """
+ Set up loopback connection and start listening for first PDU.
+ """
+
+ s = socket.socketpair()
+ logging.debug("[Using direct subprocess kludge for testing]")
+ argv = (sys.executable, sys.argv[0], "server")
+ return cls(sock = s[1],
+ proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
+ killsig = signal.SIGINT, args = args,
+ host = args.host or "none", port = args.port or "none")
+
+ @classmethod
+ def tls(cls, args):
+ """
+ Set up TLS connection and start listening for first PDU.
+
+ NB: This uses OpenSSL's "s_client" command, which does not
+ check server certificates properly, so this is not suitable for
+ production use. Fixing this would be a trivial change, it just
+ requires using a client program which does check certificates
+ properly (eg, gnutls-cli, or stunnel's client mode if that works
+ for such purposes this week).
+ """
+
+ argv = ("openssl", "s_client", "-tls1", "-quiet", "-connect", "%s:%s" % (args.host, args.port))
+ logging.debug("[Running: %s]", " ".join(argv))
+ s = socket.socketpair()
+ return cls(sock = s[1],
+ proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
+ killsig = signal.SIGKILL, args = args)
+
+ def setup_sql(self):
+ """
+ Set up an SQLite database to contain the table we receive. If
+ necessary, we will create the database.
+ """
+
+ import sqlite3
+ missing = not os.path.exists(self.args.sql_database)
+ self.sql = sqlite3.connect(self.args.sql_database, detect_types = sqlite3.PARSE_DECLTYPES)
+ self.sql.text_factory = str
+ cur = self.sql.cursor()
+ cur.execute("PRAGMA foreign_keys = on")
+ if missing:
+ cur.execute('''
+ CREATE TABLE cache (
+ cache_id INTEGER PRIMARY KEY NOT NULL,
+ host TEXT NOT NULL,
+ port TEXT NOT NULL,
+ version INTEGER,
+ nonce INTEGER,
+ serial INTEGER,
+ updated INTEGER,
+ refresh INTEGER,
+ retry INTEGER,
+ expire INTEGER,
+ UNIQUE (host, port))''')
+ cur.execute('''
+ CREATE TABLE prefix (
+ cache_id INTEGER NOT NULL
+ REFERENCES cache(cache_id)
+ ON DELETE CASCADE
+ ON UPDATE CASCADE,
+ asn INTEGER NOT NULL,
+ prefix TEXT NOT NULL,
+ prefixlen INTEGER NOT NULL,
+ max_prefixlen INTEGER NOT NULL,
+ UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''')
+ cur.execute('''
+ CREATE TABLE routerkey (
+ cache_id INTEGER NOT NULL
+ REFERENCES cache(cache_id)
+ ON DELETE CASCADE
+ ON UPDATE CASCADE,
+ asn INTEGER NOT NULL,
+ ski TEXT NOT NULL,
+ key TEXT NOT NULL,
+ UNIQUE (cache_id, asn, ski),
+ UNIQUE (cache_id, asn, key))''')
+ elif self.args.reset_session:
+ cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port))
+ cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated "
+ "FROM cache WHERE host = ? AND port = ?",
+ (self.host, self.port))
try:
- s.connect(sa)
- except socket.error, e:
- logging.exception("[socket.connect() failed: %s]", e)
- s.close()
- continue
- return cls(sock = s, proc = None, killsig = None, args = args)
- sys.exit(1)
-
- @classmethod
- def loopback(cls, args):
- """
- Set up loopback connection and start listening for first PDU.
- """
-
- s = socket.socketpair()
- logging.debug("[Using direct subprocess kludge for testing]")
- argv = (sys.executable, sys.argv[0], "server")
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGINT, args = args,
- host = args.host or "none", port = args.port or "none")
-
- @classmethod
- def tls(cls, args):
- """
- Set up TLS connection and start listening for first PDU.
-
- NB: This uses OpenSSL's "s_client" command, which does not
- check server certificates properly, so this is not suitable for
- production use. Fixing this would be a trivial change, it just
- requires using a client program which does check certificates
- properly (eg, gnutls-cli, or stunnel's client mode if that works
- for such purposes this week).
- """
-
- argv = ("openssl", "s_client", "-tls1", "-quiet", "-connect", "%s:%s" % (args.host, args.port))
- logging.debug("[Running: %s]", " ".join(argv))
- s = socket.socketpair()
- return cls(sock = s[1],
- proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True),
- killsig = signal.SIGKILL, args = args)
-
- def setup_sql(self):
- """
- Set up an SQLite database to contain the table we receive. If
- necessary, we will create the database.
- """
-
- import sqlite3
- missing = not os.path.exists(self.args.sql_database)
- self.sql = sqlite3.connect(self.args.sql_database, detect_types = sqlite3.PARSE_DECLTYPES)
- self.sql.text_factory = str
- cur = self.sql.cursor()
- cur.execute("PRAGMA foreign_keys = on")
- if missing:
- cur.execute('''
- CREATE TABLE cache (
- cache_id INTEGER PRIMARY KEY NOT NULL,
- host TEXT NOT NULL,
- port TEXT NOT NULL,
- version INTEGER,
- nonce INTEGER,
- serial INTEGER,
- updated INTEGER,
- refresh INTEGER,
- retry INTEGER,
- expire INTEGER,
- UNIQUE (host, port))''')
- cur.execute('''
- CREATE TABLE prefix (
- cache_id INTEGER NOT NULL
- REFERENCES cache(cache_id)
- ON DELETE CASCADE
- ON UPDATE CASCADE,
- asn INTEGER NOT NULL,
- prefix TEXT NOT NULL,
- prefixlen INTEGER NOT NULL,
- max_prefixlen INTEGER NOT NULL,
- UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''')
- cur.execute('''
- CREATE TABLE routerkey (
- cache_id INTEGER NOT NULL
- REFERENCES cache(cache_id)
- ON DELETE CASCADE
- ON UPDATE CASCADE,
- asn INTEGER NOT NULL,
- ski TEXT NOT NULL,
- key TEXT NOT NULL,
- UNIQUE (cache_id, asn, ski),
- UNIQUE (cache_id, asn, key))''')
- elif self.args.reset_session:
- cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port))
- cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated "
- "FROM cache WHERE host = ? AND port = ?",
- (self.host, self.port))
- try:
- self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone()
- if version is not None and self.version is not None and version != self.version:
- cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port))
- raise TypeError # Simulate lookup failure case
- if version is not None:
- self.version = version
- if refresh is not None:
+ self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone()
+ if version is not None and self.version is not None and version != self.version:
+ cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port))
+ raise TypeError # Simulate lookup failure case
+ if version is not None:
+ self.version = version
+ if refresh is not None:
+ self.refresh = refresh
+ if retry is not None:
+ self.retry = retry
+ if expire is not None:
+ self.expire = expire
+ if updated is not None:
+ self.updated = Timestamp(updated)
+ except TypeError:
+ cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port))
+ self.cache_id = cur.lastrowid
+ self.sql.commit()
+ logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]",
+ self.cache_id, self.version, self.nonce,
+ self.serial, self.refresh, self.retry, self.expire, self.updated)
+
+ def cache_reset(self):
+ """
+ Handle CacheResetPDU actions.
+ """
+
+ self.serial = None
+ if self.sql:
+ cur = self.sql.cursor()
+ cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,))
+ cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,))
+ cur.execute("UPDATE cache SET version = ?, serial = NULL WHERE cache_id = ?", (self.version, self.cache_id))
+ self.sql.commit()
+
+ def end_of_data(self, version, serial, nonce, refresh, retry, expire):
+ """
+ Handle EndOfDataPDU actions.
+ """
+
+ assert version == self.version
+ self.serial = serial
+ self.nonce = nonce
self.refresh = refresh
- if retry is not None:
- self.retry = retry
- if expire is not None:
- self.expire = expire
- if updated is not None:
- self.updated = Timestamp(updated)
- except TypeError:
- cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port))
- self.cache_id = cur.lastrowid
- self.sql.commit()
- logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]",
- self.cache_id, self.version, self.nonce,
- self.serial, self.refresh, self.retry, self.expire, self.updated)
-
- def cache_reset(self):
- """
- Handle CacheResetPDU actions.
- """
-
- self.serial = None
- if self.sql:
- cur = self.sql.cursor()
- cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,))
- cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,))
- cur.execute("UPDATE cache SET version = ?, serial = NULL WHERE cache_id = ?", (self.version, self.cache_id))
- self.sql.commit()
-
- def end_of_data(self, version, serial, nonce, refresh, retry, expire):
- """
- Handle EndOfDataPDU actions.
- """
-
- assert version == self.version
- self.serial = serial
- self.nonce = nonce
- self.refresh = refresh
- self.retry = retry
- self.expire = expire
- self.updated = Timestamp.now()
- if self.sql:
- self.sql.execute("UPDATE cache SET"
- " version = ?, serial = ?, nonce = ?,"
- " refresh = ?, retry = ?, expire = ?,"
- " updated = ? "
- "WHERE cache_id = ?",
- (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id))
- self.sql.commit()
-
- def consume_prefix(self, prefix):
- """
- Handle one prefix PDU.
- """
-
- if self.sql:
- values = (self.cache_id, prefix.asn, str(prefix.prefix), prefix.prefixlen, prefix.max_prefixlen)
- if prefix.announce:
- self.sql.execute("INSERT INTO prefix (cache_id, asn, prefix, prefixlen, max_prefixlen) "
- "VALUES (?, ?, ?, ?, ?)",
- values)
- else:
- self.sql.execute("DELETE FROM prefix "
- "WHERE cache_id = ? AND asn = ? AND prefix = ? AND prefixlen = ? AND max_prefixlen = ?",
- values)
-
- def consume_routerkey(self, routerkey):
- """
- Handle one Router Key PDU.
- """
-
- if self.sql:
- values = (self.cache_id, routerkey.asn,
- base64.urlsafe_b64encode(routerkey.ski).rstrip("="),
- base64.b64encode(routerkey.key))
- if routerkey.announce:
- self.sql.execute("INSERT INTO routerkey (cache_id, asn, ski, key) "
- "VALUES (?, ?, ?, ?)",
- values)
- else:
- self.sql.execute("DELETE FROM routerkey "
- "WHERE cache_id = ? AND asn = ? AND (ski = ? OR key = ?)",
- values)
-
- def deliver_pdu(self, pdu):
- """
- Handle received PDU.
- """
-
- pdu.consume(self)
-
- def push_pdu(self, pdu):
- """
- Log outbound PDU then write it to stream.
- """
-
- logging.debug(pdu)
- super(ClientChannel, self).push_pdu(pdu)
-
- def cleanup(self):
- """
- Force clean up this client's child process. If everything goes
- well, child will have exited already before this method is called,
- but we may need to whack it with a stick if something breaks.
- """
-
- if self.proc is not None and self.proc.returncode is None:
- try:
- os.kill(self.proc.pid, self.killsig)
- except OSError:
- pass
-
- def handle_close(self):
- """
- Intercept close event so we can log it, then shut down.
- """
-
- logging.debug("Server closed channel")
- super(ClientChannel, self).handle_close()
+ self.retry = retry
+ self.expire = expire
+ self.updated = Timestamp.now()
+ if self.sql:
+ self.sql.execute("UPDATE cache SET"
+ " version = ?, serial = ?, nonce = ?,"
+ " refresh = ?, retry = ?, expire = ?,"
+ " updated = ? "
+ "WHERE cache_id = ?",
+ (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id))
+ self.sql.commit()
+
+ def consume_prefix(self, prefix):
+ """
+ Handle one prefix PDU.
+ """
+
+ if self.sql:
+ values = (self.cache_id, prefix.asn, str(prefix.prefix), prefix.prefixlen, prefix.max_prefixlen)
+ if prefix.announce:
+ self.sql.execute("INSERT INTO prefix (cache_id, asn, prefix, prefixlen, max_prefixlen) "
+ "VALUES (?, ?, ?, ?, ?)",
+ values)
+ else:
+ self.sql.execute("DELETE FROM prefix "
+ "WHERE cache_id = ? AND asn = ? AND prefix = ? AND prefixlen = ? AND max_prefixlen = ?",
+ values)
+
+ def consume_routerkey(self, routerkey):
+ """
+ Handle one Router Key PDU.
+ """
+
+ if self.sql:
+ values = (self.cache_id, routerkey.asn,
+ base64.urlsafe_b64encode(routerkey.ski).rstrip("="),
+ base64.b64encode(routerkey.key))
+ if routerkey.announce:
+ self.sql.execute("INSERT INTO routerkey (cache_id, asn, ski, key) "
+ "VALUES (?, ?, ?, ?)",
+ values)
+ else:
+ self.sql.execute("DELETE FROM routerkey "
+ "WHERE cache_id = ? AND asn = ? AND (ski = ? OR key = ?)",
+ values)
+
+ def deliver_pdu(self, pdu):
+ """
+ Handle received PDU.
+ """
+
+ pdu.consume(self)
+
+ def push_pdu(self, pdu):
+ """
+ Log outbound PDU then write it to stream.
+ """
+
+ logging.debug(pdu)
+ super(ClientChannel, self).push_pdu(pdu)
+
+ def cleanup(self):
+ """
+ Force clean up this client's child process. If everything goes
+ well, child will have exited already before this method is called,
+ but we may need to whack it with a stick if something breaks.
+ """
+
+ if self.proc is not None and self.proc.returncode is None:
+ try:
+ os.kill(self.proc.pid, self.killsig)
+ except OSError:
+ pass
+
+ def handle_close(self):
+ """
+ Intercept close event so we can log it, then shut down.
+ """
+
+ logging.debug("Server closed channel")
+ super(ClientChannel, self).handle_close()
# Hack to let us subclass this from scripts without needing to rewrite client_main().
@@ -460,73 +460,73 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
ClientChannelClass = ClientChannel
def client_main(args):
- """
- Test client, intended primarily for debugging.
- """
+ """
+ Test client, intended primarily for debugging.
+ """
- logging.debug("[Startup]")
+ logging.debug("[Startup]")
- assert issubclass(ClientChannelClass, ClientChannel)
- constructor = getattr(ClientChannelClass, args.protocol)
+ assert issubclass(ClientChannelClass, ClientChannel)
+ constructor = getattr(ClientChannelClass, args.protocol)
- client = None
- try:
- client = constructor(args)
+ client = None
+ try:
+ client = constructor(args)
- polled = client.updated
- wakeup = None
+ polled = client.updated
+ wakeup = None
- while True:
+ while True:
- now = Timestamp.now()
+ now = Timestamp.now()
- if client.serial is not None and now > client.updated + client.expire:
- logging.info("[Expiring client data: serial %s, last updated %s, expire %s]",
- client.serial, client.updated, client.expire)
- client.cache_reset()
+ if client.serial is not None and now > client.updated + client.expire:
+ logging.info("[Expiring client data: serial %s, last updated %s, expire %s]",
+ client.serial, client.updated, client.expire)
+ client.cache_reset()
- if client.serial is None or client.nonce is None:
- polled = now
- client.push_pdu(ResetQueryPDU(version = client.version))
+ if client.serial is None or client.nonce is None:
+ polled = now
+ client.push_pdu(ResetQueryPDU(version = client.version))
- elif now >= client.updated + client.refresh:
- polled = now
- client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.serial,
- nonce = client.nonce))
+ elif now >= client.updated + client.refresh:
+ polled = now
+ client.push_pdu(SerialQueryPDU(version = client.version,
+ serial = client.serial,
+ nonce = client.nonce))
- remaining = 1
+ remaining = 1
- while remaining > 0:
- now = Timestamp.now()
- timer = client.retry if (now >= client.updated + client.refresh) else client.refresh
- wokeup = wakeup
- wakeup = max(now, Timestamp(max(polled, client.updated) + timer))
- remaining = wakeup - now
- if wakeup != wokeup:
- logging.info("[Last client poll %s, next %s]", polled, wakeup)
- asyncore.loop(timeout = remaining, count = 1)
+ while remaining > 0:
+ now = Timestamp.now()
+ timer = client.retry if (now >= client.updated + client.refresh) else client.refresh
+ wokeup = wakeup
+ wakeup = max(now, Timestamp(max(polled, client.updated) + timer))
+ remaining = wakeup - now
+ if wakeup != wokeup:
+ logging.info("[Last client poll %s, next %s]", polled, wakeup)
+ asyncore.loop(timeout = remaining, count = 1)
- except KeyboardInterrupt:
- sys.exit(0)
+ except KeyboardInterrupt:
+ sys.exit(0)
- finally:
- if client is not None:
- client.cleanup()
+ finally:
+ if client is not None:
+ client.cleanup()
def argparse_setup(subparsers):
- """
- Set up argparse stuff for commands in this module.
- """
-
- subparser = subparsers.add_parser("client", description = client_main.__doc__,
- help = "Test client for RPKI-RTR protocol")
- subparser.set_defaults(func = client_main, default_log_to = "stderr")
- subparser.add_argument("--sql-database", help = "filename for sqlite3 database of client state")
- subparser.add_argument("--force-version", type = int, choices = PDU.version_map, help = "force specific protocol version")
- subparser.add_argument("--reset-session", action = "store_true", help = "reset any existing session found in sqlite3 database")
- subparser.add_argument("protocol", choices = ("loopback", "tcp", "ssh", "tls"), help = "connection protocol")
- subparser.add_argument("host", nargs = "?", help = "server host")
- subparser.add_argument("port", nargs = "?", help = "server port")
- return subparser
+ """
+ Set up argparse stuff for commands in this module.
+ """
+
+ subparser = subparsers.add_parser("client", description = client_main.__doc__,
+ help = "Test client for RPKI-RTR protocol")
+ subparser.set_defaults(func = client_main, default_log_to = "stderr")
+ subparser.add_argument("--sql-database", help = "filename for sqlite3 database of client state")
+ subparser.add_argument("--force-version", type = int, choices = PDU.version_map, help = "force specific protocol version")
+ subparser.add_argument("--reset-session", action = "store_true", help = "reset any existing session found in sqlite3 database")
+ subparser.add_argument("protocol", choices = ("loopback", "tcp", "ssh", "tls"), help = "connection protocol")
+ subparser.add_argument("host", nargs = "?", help = "server host")
+ subparser.add_argument("port", nargs = "?", help = "server port")
+ return subparser