diff options
author | Rob Austein <sra@hactrn.net> | 2014-05-21 03:36:13 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2014-05-21 03:36:13 +0000 |
commit | 9227cb13508c98c79b9c14961daaf6c19a74e1e6 (patch) | |
tree | 2608368f3d0ddb4d8b060fbb2b2b4a90b3a13b4e /rpki/rtr/client.py | |
parent | 08b636cbe79f5d48966cf52f375062e3531aed3f (diff) |
Finish implementing 6810-bis timing parameters.
svn path=/trunk/; revision=5832
Diffstat (limited to 'rpki/rtr/client.py')
-rw-r--r-- | rpki/rtr/client.py | 111 |
1 files changed, 61 insertions, 50 deletions
diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py index e29a244c..d6000e04 100644 --- a/rpki/rtr/client.py +++ b/rpki/rtr/client.py @@ -60,12 +60,12 @@ class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU): """ logging.debug(self) - if client.current_serial is None or client.current_nonce != self.nonce: + if client.serial is None or client.nonce != self.nonce: client.push_pdu(ResetQueryPDU(version = client.version)) - elif self.serial != client.current_serial: + elif self.serial != client.serial: client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.current_serial, - nonce = client.current_nonce)) + serial = client.serial, + nonce = client.nonce)) else: logging.debug("[Notify did not change serial number, ignoring]") @@ -79,7 +79,7 @@ class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU): """ logging.debug(self) - if self.nonce != client.current_nonce: + if self.nonce != client.nonce: logging.debug("[Nonce changed, resetting]") client.cache_reset() @@ -171,21 +171,16 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): Client protocol engine, handles upcalls from PDUChannel. """ - current_serial = None - current_nonce = None - sql = None - host = None - port = None - cache_id = None - - # For initial test purposes, let's use the minimum allowed values - # from the RFC 6810 bis I-D as the initial defaults for refresh and - # retry, and the maximum allowed for expire; these will be overriden - # as soon as we receive an EndOfDataPDU. - # - refresh = 120 - retry = 120 - expire = 172800 + serial = None + nonce = None + sql = None + host = None + port = None + cache_id = None + refresh = rpki.rtr.pdus.default_refresh + retry = rpki.rtr.pdus.default_retry + expire = rpki.rtr.pdus.default_expire + updated = Timestamp(0) def __init__(self, sock, proc, killsig, host, port): self.killsig = killsig @@ -312,7 +307,6 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): prefixlen INTEGER NOT NULL, max_prefixlen INTEGER NOT NULL, UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''') - cur.execute(''' CREATE TABLE routerkey ( cache_id INTEGER NOT NULL @@ -325,11 +319,11 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): UNIQUE (cache_id, asn, ski), UNIQUE (cache_id, asn, key))''') - cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire " + cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated " "FROM cache WHERE host = ? AND port = ?", (self.host, self.port)) try: - self.cache_id, version, self.current_nonce, self.current_serial, refresh, retry, expire = cur.fetchone() + self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone() if version is not None: self.version = version if refresh is not None: @@ -338,25 +332,23 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): self.retry = retry if expire is not None: self.expire = expire + if updated is not None: + self.updated = Timestamp(updated) except TypeError: cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port)) self.cache_id = cur.lastrowid self.sql.commit() - logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s]", - self.cache_id, self.version, self.current_nonce, - self.current_serial, self.refresh, self.retry, self.expire) + logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]", + self.cache_id, self.version, self.nonce, + self.serial, self.refresh, self.retry, self.expire, self.updated) def cache_reset(self): """ Handle CacheResetPDU actions. """ - self.current_serial = None + self.serial = None if self.sql: - # - # For some reason there was no commit here. Dunno why. - # See if adding one breaks anything.... - # cur = self.sql.cursor() cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,)) cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,)) @@ -369,18 +361,19 @@ class ClientChannel(rpki.rtr.channels.PDUChannel): """ assert version == self.version - self.current_serial = serial - self.current_nonce = nonce - self.refresh = refresh - self.retry = retry - self.expire = expire + self.serial = serial + self.nonce = nonce + self.refresh = refresh + self.retry = retry + self.expire = expire + self.updated = Timestamp.now() if self.sql: self.sql.execute("UPDATE cache SET" " version = ?, serial = ?, nonce = ?," " refresh = ?, retry = ?, expire = ?," - " updated = datetime('now') " + " updated = ? " "WHERE cache_id = ?", - (version, serial, nonce, refresh, retry, expire, self.cache_id)) + (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id)) self.sql.commit() def consume_prefix(self, prefix): @@ -468,26 +461,44 @@ def client_main(args): client = constructor(args.host, args.port) if args.sql_database: client.setup_sql(args.sql_database) + + polled = client.updated + wakeup = None + while True: - if client.current_serial is None or client.current_nonce is None: + + now = Timestamp.now() + + if client.serial is not None and now > client.updated + client.expire: + logging.info("[Expiring client data: serial %s, last updated %s, expire %s]", + client.serial, client.updated, client.expire) + client.cache_reset() + + if client.serial is None or client.nonce is None: + polled = now client.push_pdu(ResetQueryPDU(version = client.version)) - else: + + elif now >= client.updated + client.refresh: + polled = now client.push_pdu(SerialQueryPDU(version = client.version, - serial = client.current_serial, - nonce = client.current_nonce)) - polled = Timestamp.now() - wakeup = None - while True: - if wakeup != polled + client.refresh: - wakeup = Timestamp(polled + client.refresh) + serial = client.serial, + nonce = client.nonce)) + + remaining = 1 + + while remaining > 0: + now = Timestamp.now() + timer = client.retry if (now >= client.updated + client.refresh) else client.refresh + wokeup = wakeup + wakeup = max(now, Timestamp(max(polled, client.updated) + timer)) + remaining = wakeup - now + if wakeup != wokeup: logging.info("[Last client poll %s, next %s]", polled, wakeup) - remaining = wakeup - time.time() - if remaining < 0: - break asyncore.loop(timeout = remaining, count = 1) except KeyboardInterrupt: sys.exit(0) + finally: if client is not None: client.cleanup() |