diff options
author | Rob Austein <sra@hactrn.net> | 2014-05-21 03:36:13 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2014-05-21 03:36:13 +0000 |
commit | 9227cb13508c98c79b9c14961daaf6c19a74e1e6 (patch) | |
tree | 2608368f3d0ddb4d8b060fbb2b2b4a90b3a13b4e | |
parent | 08b636cbe79f5d48966cf52f375062e3531aed3f (diff) |
Finish implementing 6810-bis timing parameters.
svn path=/trunk/; revision=5832
-rw-r--r-- | rpki/rtr/channels.py | 3 | ||||
-rw-r--r-- | rpki/rtr/client.py | 111 | ||||
-rw-r--r-- | rpki/rtr/pdus.py | 58 | ||||
-rw-r--r-- | rpki/rtr/server.py | 44 |
4 files changed, 130 insertions, 86 deletions
diff --git a/rpki/rtr/channels.py b/rpki/rtr/channels.py index 53295900..2b1ea406 100644 --- a/rpki/rtr/channels.py +++ b/rpki/rtr/channels.py @@ -37,8 +37,7 @@ class Timestamp(int): """ def __new__(cls, t): - # http://stackoverflow.com/questions/7471255/pythons-super-and-new-confused-me - #return int.__new__(cls, t) + # __new__() is a static method, not a class method, hence the odd calling sequence. return super(Timestamp, cls).__new__(cls, t) @classmethod diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py index e29a244c..d6000e04 100644 --- a/rpki/rtr/client.py +++ b/rpki/rtr/client.py @@ -60,12 +60,12 @@ class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU): """ logging.debug(self) - if client.current_serial is None or client.current_nonce != self.nonce: + if client.serial is None or client.nonce != self.nonce: client.push_pdu(ResetQueryPDU(version = client.version)) - elif self.serial != client.current_serial: + elif self.serial != client.serial: client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.current_serial, - nonce = client.current_nonce)) + serial = client.serial, + nonce = client.nonce)) else: logging.debug("[Notify did not change serial number, ignoring]") @@ -79,7 +79,7 @@ class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU): """ logging.debug(self) - if self.nonce != client.current_nonce: + if self.nonce != client.nonce: logging.debug("[Nonce changed, resetting]") client.cache_reset() @@ -171,21 +171,16 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): Client protocol engine, handles upcalls from PDUChannel. """ - current_serial = None - current_nonce = None - sql = None - host = None - port = None - cache_id = None - - # For initial test purposes, let's use the minimum allowed values - # from the RFC 6810 bis I-D as the initial defaults for refresh and - # retry, and the maximum allowed for expire; these will be overriden - # as soon as we receive an EndOfDataPDU. - # - refresh = 120 - retry = 120 - expire = 172800 + serial = None + nonce = None + sql = None + host = None + port = None + cache_id = None + refresh = rpki.rtr.pdus.default_refresh + retry = rpki.rtr.pdus.default_retry + expire = rpki.rtr.pdus.default_expire + updated = Timestamp(0) def __init__(self, sock, proc, killsig, host, port): self.killsig = killsig @@ -312,7 +307,6 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): prefixlen INTEGER NOT NULL, max_prefixlen INTEGER NOT NULL, UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''') - cur.execute(''' CREATE TABLE routerkey ( cache_id INTEGER NOT NULL @@ -325,11 +319,11 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): UNIQUE (cache_id, asn, ski), UNIQUE (cache_id, asn, key))''') - cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire " + cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated " "FROM cache WHERE host = ? AND port = ?", (self.host, self.port)) try: - self.cache_id, version, self.current_nonce, self.current_serial, refresh, retry, expire = cur.fetchone() + self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone() if version is not None: self.version = version if refresh is not None: @@ -338,25 +332,23 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): self.retry = retry if expire is not None: self.expire = expire + if updated is not None: + self.updated = Timestamp(updated) except TypeError: cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port)) self.cache_id = cur.lastrowid self.sql.commit() - logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s]", - self.cache_id, self.version, self.current_nonce, - self.current_serial, self.refresh, self.retry, self.expire) + logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]", + self.cache_id, self.version, self.nonce, + self.serial, self.refresh, self.retry, self.expire, self.updated) def cache_reset(self): """ Handle CacheResetPDU actions. """ - self.current_serial = None + self.serial = None if self.sql: - # - # For some reason there was no commit here. Dunno why. - # See if adding one breaks anything.... - # cur = self.sql.cursor() cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,)) cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,)) @@ -369,18 +361,19 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): """ assert version == self.version - self.current_serial = serial - self.current_nonce = nonce - self.refresh = refresh - self.retry = retry - self.expire = expire + self.serial = serial + self.nonce = nonce + self.refresh = refresh + self.retry = retry + self.expire = expire + self.updated = Timestamp.now() if self.sql: self.sql.execute("UPDATE cache SET" " version = ?, serial = ?, nonce = ?," " refresh = ?, retry = ?, expire = ?," - " updated = datetime('now') " + " updated = ? " "WHERE cache_id = ?", - (version, serial, nonce, refresh, retry, expire, self.cache_id)) + (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id)) self.sql.commit() def consume_prefix(self, prefix): @@ -468,26 +461,44 @@ def client_main(args): client = constructor(args.host, args.port) if args.sql_database: client.setup_sql(args.sql_database) + + polled = client.updated + wakeup = None + while True: - if client.current_serial is None or client.current_nonce is None: + + now = Timestamp.now() + + if client.serial is not None and now > client.updated + client.expire: + logging.info("[Expiring client data: serial %s, last updated %s, expire %s]", + client.serial, client.updated, client.expire) + client.cache_reset() + + if client.serial is None or client.nonce is None: + polled = now client.push_pdu(ResetQueryPDU(version = client.version)) - else: + + elif now >= client.updated + client.refresh: + polled = now client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.current_serial, - nonce = client.current_nonce)) - polled = Timestamp.now() - wakeup = None - while True: - if wakeup != polled + client.refresh: - wakeup = Timestamp(polled + client.refresh) + serial = client.serial, + nonce = client.nonce)) + + remaining = 1 + + while remaining > 0: + now = Timestamp.now() + timer = client.retry if (now >= client.updated + client.refresh) else client.refresh + wokeup = wakeup + wakeup = max(now, Timestamp(max(polled, client.updated) + timer)) + remaining = wakeup - now + if wakeup != wokeup: logging.info("[Last client poll %s, next %s]", polled, wakeup) - remaining = wakeup - time.time() - if remaining < 0: - break asyncore.loop(timeout = remaining, count = 1) except KeyboardInterrupt: sys.exit(0) + finally: if client is not None: client.cleanup() diff --git a/rpki/rtr/pdus.py b/rpki/rtr/pdus.py index f7d1e47a..0d2e5928 100644 --- a/rpki/rtr/pdus.py +++ b/rpki/rtr/pdus.py @@ -316,6 +316,33 @@ def EndOfDataPDU(version, *args, **kwargs): raise NotImplementedError +# Min, max, and default values, from the current RFC 6810 bis I-D. +# Putting these here lets us keep them all in one place, and use them +# in our client API for both protocol versions even though they can +# only be set in the protocol in version 1. + +default_refresh = 3600 + +def valid_refresh(refresh): + if not isinstance(refresh, int) or refresh < 120 or refresh > 86400: + raise ValueError + return refresh + +default_retry = 600 + +def valid_retry(retry): + if not isinstance(retry, int) or retry < 120 or retry > 7200: + raise ValueError + return retry + +default_expire = 7200 + +def valid_expire(expire): + if not isinstance(expire, int) or expire < 600 or expire > 172800: + raise ValueError + return expire + + @wire_pdu_only(0) class EndOfDataPDUv0(PDUWithSerial): """ @@ -324,14 +351,11 @@ class EndOfDataPDUv0(PDUWithSerial): pdu_type = 7 - # Default values, from the current RFC 6810 bis I-D. - # Putting these here lets us use them in our client API for both - # protocol versions, even though they can only be set in the - # protocol in version 1. - - refresh = 3600 - retry = 600 - expire = 7200 + def __init__(self, version, serial = None, nonce = None, refresh = None, retry = None, expire = None): + super(EndOfDataPDUv0, self).__init__(version, serial, nonce) + self.refresh = valid_refresh(default_refresh if refresh is None else refresh) + self.retry = valid_retry( default_retry if retry is None else retry) + self.expire = valid_expire( default_expire if expire is None else expire) @wire_pdu_only(1) @@ -342,24 +366,6 @@ class EndOfDataPDUv1(EndOfDataPDUv0): header_struct = struct.Struct("!BBHLLLLL") - def __init__(self, version, serial = None, nonce = None, refresh = None, retry = None, expire = None): - super(EndOfDataPDUv1, self).__init__(version) - if serial is not None: - assert isinstance(serial, int) - self.serial = serial - if nonce is not None: - assert isinstance(nonce, int) - self.nonce = nonce - if refresh is not None: - assert isinstance(refresh, int) - self.refresh = refresh - if retry is not None: - assert isinstance(retry, int) - self.retry = retry - if expire is not None: - assert isinstance(expire, int) - self.expire = expire - def __str__(self): return "[%s, serial #%d nonce %d refresh %d retry %d expire %d]" % ( self.__class__.__name__, self.serial, self.nonce, self.refresh, self.retry, self.expire) diff --git a/rpki/rtr/server.py b/rpki/rtr/server.py index 0e6b86cc..a1aacbee 100644 --- a/rpki/rtr/server.py +++ b/rpki/rtr/server.py @@ -31,9 +31,7 @@ import rpki.oids import rpki.rtr.pdus import rpki.rtr.channels -from rpki.rtr.pdus import (clone_pdu_root, - CacheResponsePDU, EndOfDataPDU, CacheResetPDU, CacheResponsePDU, - EndOfDataPDU, CacheResetPDU, CacheResetPDU, SerialNotifyPDU) +from rpki.rtr.pdus import (clone_pdu_root, CacheResponsePDU, EndOfDataPDU, CacheResetPDU, SerialNotifyPDU) # Disable incremental updates. Debugging only, should be False in production. @@ -63,7 +61,10 @@ class PDU(rpki.rtr.pdus.PDU): server.push_file(f) server.push_pdu(EndOfDataPDU(version = server.version, serial = server.current_serial, - nonce = server.current_nonce)) + nonce = server.current_nonce, + refresh = server.refresh, + retry = server.retry, + expire = server.expire)) def send_nodata(self, server): """ @@ -103,7 +104,10 @@ class SerialQueryPDU(PDU, rpki.rtr.pdus.SerialQueryPDU): nonce = server.current_nonce)) server.push_pdu(EndOfDataPDU(version = server.version, serial = server.current_serial, - nonce = server.current_nonce)) + nonce = server.current_nonce, + refresh = server.refresh, + retry = server.retry, + expire = server.expire)) elif disable_incrementals: server.push_pdu(CacheResetPDU(version = server.version)) else: @@ -242,7 +246,7 @@ class ServerChannel(rpki.rtr.channels.PDUChannel): implement protocol logic. """ - def __init__(self, logger): + def __init__(self, logger, refresh, retry, expire): """ Set up stdin and stdout as connection and start listening for first PDU. @@ -252,6 +256,9 @@ class ServerChannel(rpki.rtr.channels.PDUChannel): self.init_file_dispatcher(sys.stdin.fileno()) self.writer = ServerWriteChannel() self.logger = logger + self.refresh = refresh + self.retry = retry + self.expire = expire self.get_serial() self.start_new_pdu() @@ -480,7 +487,7 @@ def server_main(args): kickme = None try: - server = rpki.rtr.server.ServerChannel(logger = logger) + server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire) kickme = rpki.rtr.server.KickmeChannel(server = server) asyncore.loop(timeout = None) except KeyboardInterrupt: @@ -550,13 +557,34 @@ def argparse_setup(subparsers): Set up argparse stuff for commands in this module. """ + # These could have been lambdas, but doing it this way results in + # more useful error messages on argparse failures. + + def refresh(v): + return rpki.rtr.pdus.valid_refresh(int(v)) + + def retry(v): + return rpki.rtr.pdus.valid_retry(int(v)) + + def expire(v): + return rpki.rtr.pdus.valid_expire(int(v)) + + # Some duplication of arguments here, not enough to be worth huge + # effort to clean up, worry about it later in any case. + subparser = subparsers.add_parser("server", description = server_main.__doc__, help = "RPKI-RTR protocol server") subparser.set_defaults(func = server_main, default_log_to = "syslog") + subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer") + subparser.add_argument("--retry", type = retry, help = "override default retry timer") + subparser.add_argument("--expire", type = expire, help = "override default expire timer") subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database") subparser = subparsers.add_parser("listener", description = listener_main.__doc__, help = "TCP listener for RPKI-RTR protocol server") subparser.set_defaults(func = listener_main, default_log_to = "syslog") - subparser.add_argument("port", type = int, help = "TCP port on which to listen") + subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer") + subparser.add_argument("--retry", type = retry, help = "override default retry timer") + subparser.add_argument("--expire", type = expire, help = "override default expire timer") + subparser.add_argument("port", type = int, help = "TCP port on which to listen") subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database") |