diff options
author | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2015-10-26 06:29:00 +0000 |
commit | b46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 (patch) | |
tree | ca0dc0276d1adc168bc3337ce0564c4ec4957c1b /rpki/rtr/client.py | |
parent | 397beaf6d9900dc3b3cb612c89ebf1d57b1d16f6 (diff) |
"Any programmer who fails to comply with the standard naming, formatting,
or commenting conventions should be shot. If it so happens that it is
inconvenient to shoot him, then he is to be politely requested to recode
his program in adherence to the above standard."
-- Michael Spier, Digital Equipment Corporation
svn path=/branches/tk705/; revision=6152
Diffstat (limited to 'rpki/rtr/client.py')
-rw-r--r-- | rpki/rtr/client.py | 816 |
1 files changed, 408 insertions, 408 deletions
diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py index a35ab81d..9c7a00d6 100644 --- a/rpki/rtr/client.py +++ b/rpki/rtr/client.py @@ -37,13 +37,13 @@ from rpki.rtr.channels import Timestamp class PDU(rpki.rtr.pdus.PDU): - def consume(self, client): - """ - Handle results in test client. Default behavior is just to print - out the PDU; data PDU subclasses may override this. - """ + def consume(self, client): + """ + Handle results in test client. Default behavior is just to print + out the PDU; data PDU subclasses may override this. + """ - logging.debug(self) + logging.debug(self) clone_pdu = rpki.rtr.pdus.clone_pdu_root(PDU) @@ -52,407 +52,407 @@ clone_pdu = rpki.rtr.pdus.clone_pdu_root(PDU) @clone_pdu class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU): - def consume(self, client): - """ - Respond to a SerialNotifyPDU with either a SerialQueryPDU or a - ResetQueryPDU, depending on what we already know. - """ + def consume(self, client): + """ + Respond to a SerialNotifyPDU with either a SerialQueryPDU or a + ResetQueryPDU, depending on what we already know. + """ - logging.debug(self) - if client.serial is None or client.nonce != self.nonce: - client.push_pdu(ResetQueryPDU(version = client.version)) - elif self.serial != client.serial: - client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.serial, - nonce = client.nonce)) - else: - logging.debug("[Notify did not change serial number, ignoring]") + logging.debug(self) + if client.serial is None or client.nonce != self.nonce: + client.push_pdu(ResetQueryPDU(version = client.version)) + elif self.serial != client.serial: + client.push_pdu(SerialQueryPDU(version = client.version, + serial = client.serial, + nonce = client.nonce)) + else: + logging.debug("[Notify did not change serial number, ignoring]") @clone_pdu class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU): - def consume(self, client): - """ - Handle CacheResponsePDU. - """ + def consume(self, client): + """ + Handle CacheResponsePDU. + """ - logging.debug(self) - if self.nonce != client.nonce: - logging.debug("[Nonce changed, resetting]") - client.cache_reset() + logging.debug(self) + if self.nonce != client.nonce: + logging.debug("[Nonce changed, resetting]") + client.cache_reset() @clone_pdu class EndOfDataPDUv0(rpki.rtr.pdus.EndOfDataPDUv0): - def consume(self, client): - """ - Handle EndOfDataPDU response. - """ + def consume(self, client): + """ + Handle EndOfDataPDU response. + """ - logging.debug(self) - client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire) + logging.debug(self) + client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire) @clone_pdu class EndOfDataPDUv1(rpki.rtr.pdus.EndOfDataPDUv1): - def consume(self, client): - """ - Handle EndOfDataPDU response. - """ + def consume(self, client): + """ + Handle EndOfDataPDU response. + """ - logging.debug(self) - client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire) + logging.debug(self) + client.end_of_data(self.version, self.serial, self.nonce, self.refresh, self.retry, self.expire) @clone_pdu class CacheResetPDU(rpki.rtr.pdus.CacheResetPDU): - def consume(self, client): - """ - Handle CacheResetPDU response, by issuing a ResetQueryPDU. - """ + def consume(self, client): + """ + Handle CacheResetPDU response, by issuing a ResetQueryPDU. + """ - logging.debug(self) - client.cache_reset() - client.push_pdu(ResetQueryPDU(version = client.version)) + logging.debug(self) + client.cache_reset() + client.push_pdu(ResetQueryPDU(version = client.version)) class PrefixPDU(rpki.rtr.pdus.PrefixPDU): - """ - Object representing one prefix. This corresponds closely to one PDU - in the rpki-router protocol, so closely that we use lexical ordering - of the wire format of the PDU as the ordering for this class. - - This is a virtual class, but the .from_text() constructor - instantiates the correct concrete subclass (IPv4PrefixPDU or - IPv6PrefixPDU) depending on the syntax of its input text. - """ - - def consume(self, client): """ - Handle one incoming prefix PDU + Object representing one prefix. This corresponds closely to one PDU + in the rpki-router protocol, so closely that we use lexical ordering + of the wire format of the PDU as the ordering for this class. + + This is a virtual class, but the .from_text() constructor + instantiates the correct concrete subclass (IPv4PrefixPDU or + IPv6PrefixPDU) depending on the syntax of its input text. """ - logging.debug(self) - client.consume_prefix(self) + def consume(self, client): + """ + Handle one incoming prefix PDU + """ + + logging.debug(self) + client.consume_prefix(self) @clone_pdu class IPv4PrefixPDU(PrefixPDU, rpki.rtr.pdus.IPv4PrefixPDU): - pass + pass @clone_pdu class IPv6PrefixPDU(PrefixPDU, rpki.rtr.pdus.IPv6PrefixPDU): - pass + pass @clone_pdu class ErrorReportPDU(PDU, rpki.rtr.pdus.ErrorReportPDU): - pass + pass @clone_pdu class RouterKeyPDU(rpki.rtr.pdus.RouterKeyPDU): - """ - Router Key PDU. - """ - - def consume(self, client): """ - Handle one incoming Router Key PDU + Router Key PDU. """ - logging.debug(self) - client.consume_routerkey(self) + def consume(self, client): + """ + Handle one incoming Router Key PDU + """ + logging.debug(self) + client.consume_routerkey(self) -class ClientChannel(rpki.rtr.channels.PDUChannel): - """ - Client protocol engine, handles upcalls from PDUChannel. - """ - - serial = None - nonce = None - sql = None - host = None - port = None - cache_id = None - refresh = rpki.rtr.pdus.default_refresh - retry = rpki.rtr.pdus.default_retry - expire = rpki.rtr.pdus.default_expire - updated = Timestamp(0) - - def __init__(self, sock, proc, killsig, args, host = None, port = None): - self.killsig = killsig - self.proc = proc - self.args = args - self.host = args.host if host is None else host - self.port = args.port if port is None else port - super(ClientChannel, self).__init__(sock = sock, root_pdu_class = PDU) - if args.force_version is not None: - self.version = args.force_version - self.start_new_pdu() - if args.sql_database: - self.setup_sql() - - @classmethod - def ssh(cls, args): - """ - Set up ssh connection and start listening for first PDU. - """ - if args.port is None: - argv = ("ssh", "-s", args.host, "rpki-rtr") - else: - argv = ("ssh", "-p", args.port, "-s", args.host, "rpki-rtr") - logging.debug("[Running ssh: %s]", " ".join(argv)) - s = socket.socketpair() - return cls(sock = s[1], - proc = subprocess.Popen(argv, executable = "/usr/bin/ssh", - stdin = s[0], stdout = s[0], close_fds = True), - killsig = signal.SIGKILL, args = args) - - @classmethod - def tcp(cls, args): - """ - Set up TCP connection and start listening for first PDU. +class ClientChannel(rpki.rtr.channels.PDUChannel): """ - - logging.debug("[Starting raw TCP connection to %s:%s]", args.host, args.port) - try: - addrinfo = socket.getaddrinfo(args.host, args.port, socket.AF_UNSPEC, socket.SOCK_STREAM) - except socket.error, e: - logging.debug("[socket.getaddrinfo() failed: %s]", e) - else: - for ai in addrinfo: - af, socktype, proto, cn, sa = ai # pylint: disable=W0612 - logging.debug("[Trying addr %s port %s]", sa[0], sa[1]) + Client protocol engine, handles upcalls from PDUChannel. + """ + + serial = None + nonce = None + sql = None + host = None + port = None + cache_id = None + refresh = rpki.rtr.pdus.default_refresh + retry = rpki.rtr.pdus.default_retry + expire = rpki.rtr.pdus.default_expire + updated = Timestamp(0) + + def __init__(self, sock, proc, killsig, args, host = None, port = None): + self.killsig = killsig + self.proc = proc + self.args = args + self.host = args.host if host is None else host + self.port = args.port if port is None else port + super(ClientChannel, self).__init__(sock = sock, root_pdu_class = PDU) + if args.force_version is not None: + self.version = args.force_version + self.start_new_pdu() + if args.sql_database: + self.setup_sql() + + @classmethod + def ssh(cls, args): + """ + Set up ssh connection and start listening for first PDU. + """ + + if args.port is None: + argv = ("ssh", "-s", args.host, "rpki-rtr") + else: + argv = ("ssh", "-p", args.port, "-s", args.host, "rpki-rtr") + logging.debug("[Running ssh: %s]", " ".join(argv)) + s = socket.socketpair() + return cls(sock = s[1], + proc = subprocess.Popen(argv, executable = "/usr/bin/ssh", + stdin = s[0], stdout = s[0], close_fds = True), + killsig = signal.SIGKILL, args = args) + + @classmethod + def tcp(cls, args): + """ + Set up TCP connection and start listening for first PDU. + """ + + logging.debug("[Starting raw TCP connection to %s:%s]", args.host, args.port) try: - s = socket.socket(af, socktype, proto) + addrinfo = socket.getaddrinfo(args.host, args.port, socket.AF_UNSPEC, socket.SOCK_STREAM) except socket.error, e: - logging.debug("[socket.socket() failed: %s]", e) - continue + logging.debug("[socket.getaddrinfo() failed: %s]", e) + else: + for ai in addrinfo: + af, socktype, proto, cn, sa = ai # pylint: disable=W0612 + logging.debug("[Trying addr %s port %s]", sa[0], sa[1]) + try: + s = socket.socket(af, socktype, proto) + except socket.error, e: + logging.debug("[socket.socket() failed: %s]", e) + continue + try: + s.connect(sa) + except socket.error, e: + logging.exception("[socket.connect() failed: %s]", e) + s.close() + continue + return cls(sock = s, proc = None, killsig = None, args = args) + sys.exit(1) + + @classmethod + def loopback(cls, args): + """ + Set up loopback connection and start listening for first PDU. + """ + + s = socket.socketpair() + logging.debug("[Using direct subprocess kludge for testing]") + argv = (sys.executable, sys.argv[0], "server") + return cls(sock = s[1], + proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True), + killsig = signal.SIGINT, args = args, + host = args.host or "none", port = args.port or "none") + + @classmethod + def tls(cls, args): + """ + Set up TLS connection and start listening for first PDU. + + NB: This uses OpenSSL's "s_client" command, which does not + check server certificates properly, so this is not suitable for + production use. Fixing this would be a trivial change, it just + requires using a client program which does check certificates + properly (eg, gnutls-cli, or stunnel's client mode if that works + for such purposes this week). + """ + + argv = ("openssl", "s_client", "-tls1", "-quiet", "-connect", "%s:%s" % (args.host, args.port)) + logging.debug("[Running: %s]", " ".join(argv)) + s = socket.socketpair() + return cls(sock = s[1], + proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True), + killsig = signal.SIGKILL, args = args) + + def setup_sql(self): + """ + Set up an SQLite database to contain the table we receive. If + necessary, we will create the database. + """ + + import sqlite3 + missing = not os.path.exists(self.args.sql_database) + self.sql = sqlite3.connect(self.args.sql_database, detect_types = sqlite3.PARSE_DECLTYPES) + self.sql.text_factory = str + cur = self.sql.cursor() + cur.execute("PRAGMA foreign_keys = on") + if missing: + cur.execute(''' + CREATE TABLE cache ( + cache_id INTEGER PRIMARY KEY NOT NULL, + host TEXT NOT NULL, + port TEXT NOT NULL, + version INTEGER, + nonce INTEGER, + serial INTEGER, + updated INTEGER, + refresh INTEGER, + retry INTEGER, + expire INTEGER, + UNIQUE (host, port))''') + cur.execute(''' + CREATE TABLE prefix ( + cache_id INTEGER NOT NULL + REFERENCES cache(cache_id) + ON DELETE CASCADE + ON UPDATE CASCADE, + asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + prefixlen INTEGER NOT NULL, + max_prefixlen INTEGER NOT NULL, + UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''') + cur.execute(''' + CREATE TABLE routerkey ( + cache_id INTEGER NOT NULL + REFERENCES cache(cache_id) + ON DELETE CASCADE + ON UPDATE CASCADE, + asn INTEGER NOT NULL, + ski TEXT NOT NULL, + key TEXT NOT NULL, + UNIQUE (cache_id, asn, ski), + UNIQUE (cache_id, asn, key))''') + elif self.args.reset_session: + cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port)) + cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated " + "FROM cache WHERE host = ? AND port = ?", + (self.host, self.port)) try: - s.connect(sa) - except socket.error, e: - logging.exception("[socket.connect() failed: %s]", e) - s.close() - continue - return cls(sock = s, proc = None, killsig = None, args = args) - sys.exit(1) - - @classmethod - def loopback(cls, args): - """ - Set up loopback connection and start listening for first PDU. - """ - - s = socket.socketpair() - logging.debug("[Using direct subprocess kludge for testing]") - argv = (sys.executable, sys.argv[0], "server") - return cls(sock = s[1], - proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True), - killsig = signal.SIGINT, args = args, - host = args.host or "none", port = args.port or "none") - - @classmethod - def tls(cls, args): - """ - Set up TLS connection and start listening for first PDU. - - NB: This uses OpenSSL's "s_client" command, which does not - check server certificates properly, so this is not suitable for - production use. Fixing this would be a trivial change, it just - requires using a client program which does check certificates - properly (eg, gnutls-cli, or stunnel's client mode if that works - for such purposes this week). - """ - - argv = ("openssl", "s_client", "-tls1", "-quiet", "-connect", "%s:%s" % (args.host, args.port)) - logging.debug("[Running: %s]", " ".join(argv)) - s = socket.socketpair() - return cls(sock = s[1], - proc = subprocess.Popen(argv, stdin = s[0], stdout = s[0], close_fds = True), - killsig = signal.SIGKILL, args = args) - - def setup_sql(self): - """ - Set up an SQLite database to contain the table we receive. If - necessary, we will create the database. - """ - - import sqlite3 - missing = not os.path.exists(self.args.sql_database) - self.sql = sqlite3.connect(self.args.sql_database, detect_types = sqlite3.PARSE_DECLTYPES) - self.sql.text_factory = str - cur = self.sql.cursor() - cur.execute("PRAGMA foreign_keys = on") - if missing: - cur.execute(''' - CREATE TABLE cache ( - cache_id INTEGER PRIMARY KEY NOT NULL, - host TEXT NOT NULL, - port TEXT NOT NULL, - version INTEGER, - nonce INTEGER, - serial INTEGER, - updated INTEGER, - refresh INTEGER, - retry INTEGER, - expire INTEGER, - UNIQUE (host, port))''') - cur.execute(''' - CREATE TABLE prefix ( - cache_id INTEGER NOT NULL - REFERENCES cache(cache_id) - ON DELETE CASCADE - ON UPDATE CASCADE, - asn INTEGER NOT NULL, - prefix TEXT NOT NULL, - prefixlen INTEGER NOT NULL, - max_prefixlen INTEGER NOT NULL, - UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''') - cur.execute(''' - CREATE TABLE routerkey ( - cache_id INTEGER NOT NULL - REFERENCES cache(cache_id) - ON DELETE CASCADE - ON UPDATE CASCADE, - asn INTEGER NOT NULL, - ski TEXT NOT NULL, - key TEXT NOT NULL, - UNIQUE (cache_id, asn, ski), - UNIQUE (cache_id, asn, key))''') - elif self.args.reset_session: - cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port)) - cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated " - "FROM cache WHERE host = ? AND port = ?", - (self.host, self.port)) - try: - self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone() - if version is not None and self.version is not None and version != self.version: - cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port)) - raise TypeError # Simulate lookup failure case - if version is not None: - self.version = version - if refresh is not None: + self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone() + if version is not None and self.version is not None and version != self.version: + cur.execute("DELETE FROM cache WHERE host = ? and port = ?", (self.host, self.port)) + raise TypeError # Simulate lookup failure case + if version is not None: + self.version = version + if refresh is not None: + self.refresh = refresh + if retry is not None: + self.retry = retry + if expire is not None: + self.expire = expire + if updated is not None: + self.updated = Timestamp(updated) + except TypeError: + cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port)) + self.cache_id = cur.lastrowid + self.sql.commit() + logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]", + self.cache_id, self.version, self.nonce, + self.serial, self.refresh, self.retry, self.expire, self.updated) + + def cache_reset(self): + """ + Handle CacheResetPDU actions. + """ + + self.serial = None + if self.sql: + cur = self.sql.cursor() + cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,)) + cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,)) + cur.execute("UPDATE cache SET version = ?, serial = NULL WHERE cache_id = ?", (self.version, self.cache_id)) + self.sql.commit() + + def end_of_data(self, version, serial, nonce, refresh, retry, expire): + """ + Handle EndOfDataPDU actions. + """ + + assert version == self.version + self.serial = serial + self.nonce = nonce self.refresh = refresh - if retry is not None: - self.retry = retry - if expire is not None: - self.expire = expire - if updated is not None: - self.updated = Timestamp(updated) - except TypeError: - cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port)) - self.cache_id = cur.lastrowid - self.sql.commit() - logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]", - self.cache_id, self.version, self.nonce, - self.serial, self.refresh, self.retry, self.expire, self.updated) - - def cache_reset(self): - """ - Handle CacheResetPDU actions. - """ - - self.serial = None - if self.sql: - cur = self.sql.cursor() - cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,)) - cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,)) - cur.execute("UPDATE cache SET version = ?, serial = NULL WHERE cache_id = ?", (self.version, self.cache_id)) - self.sql.commit() - - def end_of_data(self, version, serial, nonce, refresh, retry, expire): - """ - Handle EndOfDataPDU actions. - """ - - assert version == self.version - self.serial = serial - self.nonce = nonce - self.refresh = refresh - self.retry = retry - self.expire = expire - self.updated = Timestamp.now() - if self.sql: - self.sql.execute("UPDATE cache SET" - " version = ?, serial = ?, nonce = ?," - " refresh = ?, retry = ?, expire = ?," - " updated = ? " - "WHERE cache_id = ?", - (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id)) - self.sql.commit() - - def consume_prefix(self, prefix): - """ - Handle one prefix PDU. - """ - - if self.sql: - values = (self.cache_id, prefix.asn, str(prefix.prefix), prefix.prefixlen, prefix.max_prefixlen) - if prefix.announce: - self.sql.execute("INSERT INTO prefix (cache_id, asn, prefix, prefixlen, max_prefixlen) " - "VALUES (?, ?, ?, ?, ?)", - values) - else: - self.sql.execute("DELETE FROM prefix " - "WHERE cache_id = ? AND asn = ? AND prefix = ? AND prefixlen = ? AND max_prefixlen = ?", - values) - - def consume_routerkey(self, routerkey): - """ - Handle one Router Key PDU. - """ - - if self.sql: - values = (self.cache_id, routerkey.asn, - base64.urlsafe_b64encode(routerkey.ski).rstrip("="), - base64.b64encode(routerkey.key)) - if routerkey.announce: - self.sql.execute("INSERT INTO routerkey (cache_id, asn, ski, key) " - "VALUES (?, ?, ?, ?)", - values) - else: - self.sql.execute("DELETE FROM routerkey " - "WHERE cache_id = ? AND asn = ? AND (ski = ? OR key = ?)", - values) - - def deliver_pdu(self, pdu): - """ - Handle received PDU. - """ - - pdu.consume(self) - - def push_pdu(self, pdu): - """ - Log outbound PDU then write it to stream. - """ - - logging.debug(pdu) - super(ClientChannel, self).push_pdu(pdu) - - def cleanup(self): - """ - Force clean up this client's child process. If everything goes - well, child will have exited already before this method is called, - but we may need to whack it with a stick if something breaks. - """ - - if self.proc is not None and self.proc.returncode is None: - try: - os.kill(self.proc.pid, self.killsig) - except OSError: - pass - - def handle_close(self): - """ - Intercept close event so we can log it, then shut down. - """ - - logging.debug("Server closed channel") - super(ClientChannel, self).handle_close() + self.retry = retry + self.expire = expire + self.updated = Timestamp.now() + if self.sql: + self.sql.execute("UPDATE cache SET" + " version = ?, serial = ?, nonce = ?," + " refresh = ?, retry = ?, expire = ?," + " updated = ? " + "WHERE cache_id = ?", + (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id)) + self.sql.commit() + + def consume_prefix(self, prefix): + """ + Handle one prefix PDU. + """ + + if self.sql: + values = (self.cache_id, prefix.asn, str(prefix.prefix), prefix.prefixlen, prefix.max_prefixlen) + if prefix.announce: + self.sql.execute("INSERT INTO prefix (cache_id, asn, prefix, prefixlen, max_prefixlen) " + "VALUES (?, ?, ?, ?, ?)", + values) + else: + self.sql.execute("DELETE FROM prefix " + "WHERE cache_id = ? AND asn = ? AND prefix = ? AND prefixlen = ? AND max_prefixlen = ?", + values) + + def consume_routerkey(self, routerkey): + """ + Handle one Router Key PDU. + """ + + if self.sql: + values = (self.cache_id, routerkey.asn, + base64.urlsafe_b64encode(routerkey.ski).rstrip("="), + base64.b64encode(routerkey.key)) + if routerkey.announce: + self.sql.execute("INSERT INTO routerkey (cache_id, asn, ski, key) " + "VALUES (?, ?, ?, ?)", + values) + else: + self.sql.execute("DELETE FROM routerkey " + "WHERE cache_id = ? AND asn = ? AND (ski = ? OR key = ?)", + values) + + def deliver_pdu(self, pdu): + """ + Handle received PDU. + """ + + pdu.consume(self) + + def push_pdu(self, pdu): + """ + Log outbound PDU then write it to stream. + """ + + logging.debug(pdu) + super(ClientChannel, self).push_pdu(pdu) + + def cleanup(self): + """ + Force clean up this client's child process. If everything goes + well, child will have exited already before this method is called, + but we may need to whack it with a stick if something breaks. + """ + + if self.proc is not None and self.proc.returncode is None: + try: + os.kill(self.proc.pid, self.killsig) + except OSError: + pass + + def handle_close(self): + """ + Intercept close event so we can log it, then shut down. + """ + + logging.debug("Server closed channel") + super(ClientChannel, self).handle_close() # Hack to let us subclass this from scripts without needing to rewrite client_main(). @@ -460,73 +460,73 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): ClientChannelClass = ClientChannel def client_main(args): - """ - Test client, intended primarily for debugging. - """ + """ + Test client, intended primarily for debugging. + """ - logging.debug("[Startup]") + logging.debug("[Startup]") - assert issubclass(ClientChannelClass, ClientChannel) - constructor = getattr(ClientChannelClass, args.protocol) + assert issubclass(ClientChannelClass, ClientChannel) + constructor = getattr(ClientChannelClass, args.protocol) - client = None - try: - client = constructor(args) + client = None + try: + client = constructor(args) - polled = client.updated - wakeup = None + polled = client.updated + wakeup = None - while True: + while True: - now = Timestamp.now() + now = Timestamp.now() - if client.serial is not None and now > client.updated + client.expire: - logging.info("[Expiring client data: serial %s, last updated %s, expire %s]", - client.serial, client.updated, client.expire) - client.cache_reset() + if client.serial is not None and now > client.updated + client.expire: + logging.info("[Expiring client data: serial %s, last updated %s, expire %s]", + client.serial, client.updated, client.expire) + client.cache_reset() - if client.serial is None or client.nonce is None: - polled = now - client.push_pdu(ResetQueryPDU(version = client.version)) + if client.serial is None or client.nonce is None: + polled = now + client.push_pdu(ResetQueryPDU(version = client.version)) - elif now >= client.updated + client.refresh: - polled = now - client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.serial, - nonce = client.nonce)) + elif now >= client.updated + client.refresh: + polled = now + client.push_pdu(SerialQueryPDU(version = client.version, + serial = client.serial, + nonce = client.nonce)) - remaining = 1 + remaining = 1 - while remaining > 0: - now = Timestamp.now() - timer = client.retry if (now >= client.updated + client.refresh) else client.refresh - wokeup = wakeup - wakeup = max(now, Timestamp(max(polled, client.updated) + timer)) - remaining = wakeup - now - if wakeup != wokeup: - logging.info("[Last client poll %s, next %s]", polled, wakeup) - asyncore.loop(timeout = remaining, count = 1) + while remaining > 0: + now = Timestamp.now() + timer = client.retry if (now >= client.updated + client.refresh) else client.refresh + wokeup = wakeup + wakeup = max(now, Timestamp(max(polled, client.updated) + timer)) + remaining = wakeup - now + if wakeup != wokeup: + logging.info("[Last client poll %s, next %s]", polled, wakeup) + asyncore.loop(timeout = remaining, count = 1) - except KeyboardInterrupt: - sys.exit(0) + except KeyboardInterrupt: + sys.exit(0) - finally: - if client is not None: - client.cleanup() + finally: + if client is not None: + client.cleanup() def argparse_setup(subparsers): - """ - Set up argparse stuff for commands in this module. - """ - - subparser = subparsers.add_parser("client", description = client_main.__doc__, - help = "Test client for RPKI-RTR protocol") - subparser.set_defaults(func = client_main, default_log_to = "stderr") - subparser.add_argument("--sql-database", help = "filename for sqlite3 database of client state") - subparser.add_argument("--force-version", type = int, choices = PDU.version_map, help = "force specific protocol version") - subparser.add_argument("--reset-session", action = "store_true", help = "reset any existing session found in sqlite3 database") - subparser.add_argument("protocol", choices = ("loopback", "tcp", "ssh", "tls"), help = "connection protocol") - subparser.add_argument("host", nargs = "?", help = "server host") - subparser.add_argument("port", nargs = "?", help = "server port") - return subparser + """ + Set up argparse stuff for commands in this module. + """ + + subparser = subparsers.add_parser("client", description = client_main.__doc__, + help = "Test client for RPKI-RTR protocol") + subparser.set_defaults(func = client_main, default_log_to = "stderr") + subparser.add_argument("--sql-database", help = "filename for sqlite3 database of client state") + subparser.add_argument("--force-version", type = int, choices = PDU.version_map, help = "force specific protocol version") + subparser.add_argument("--reset-session", action = "store_true", help = "reset any existing session found in sqlite3 database") + subparser.add_argument("protocol", choices = ("loopback", "tcp", "ssh", "tls"), help = "connection protocol") + subparser.add_argument("host", nargs = "?", help = "server host") + subparser.add_argument("port", nargs = "?", help = "server port") + return subparser |