aboutsummaryrefslogtreecommitdiff
path: root/rpki/rtr/client.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2014-05-21 03:36:13 +0000
committerRob Austein <sra@hactrn.net>2014-05-21 03:36:13 +0000
commit9227cb13508c98c79b9c14961daaf6c19a74e1e6 (patch)
tree2608368f3d0ddb4d8b060fbb2b2b4a90b3a13b4e /rpki/rtr/client.py
parent08b636cbe79f5d48966cf52f375062e3531aed3f (diff)
Finish implementing 6810-bis timing parameters.
svn path=/trunk/; revision=5832
Diffstat (limited to 'rpki/rtr/client.py')
-rw-r--r--rpki/rtr/client.py111
1 files changed, 61 insertions, 50 deletions
diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py
index e29a244c..d6000e04 100644
--- a/rpki/rtr/client.py
+++ b/rpki/rtr/client.py
@@ -60,12 +60,12 @@ class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU):
"""
logging.debug(self)
- if client.current_serial is None or client.current_nonce != self.nonce:
+ if client.serial is None or client.nonce != self.nonce:
client.push_pdu(ResetQueryPDU(version = client.version))
- elif self.serial != client.current_serial:
+ elif self.serial != client.serial:
client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
+ serial = client.serial,
+ nonce = client.nonce))
else:
logging.debug("[Notify did not change serial number, ignoring]")
@@ -79,7 +79,7 @@ class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU):
"""
logging.debug(self)
- if self.nonce != client.current_nonce:
+ if self.nonce != client.nonce:
logging.debug("[Nonce changed, resetting]")
client.cache_reset()
@@ -171,21 +171,16 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
Client protocol engine, handles upcalls from PDUChannel.
"""
- current_serial = None
- current_nonce = None
- sql = None
- host = None
- port = None
- cache_id = None
-
- # For initial test purposes, let's use the minimum allowed values
- # from the RFC 6810 bis I-D as the initial defaults for refresh and
- # retry, and the maximum allowed for expire; these will be overriden
- # as soon as we receive an EndOfDataPDU.
- #
- refresh = 120
- retry = 120
- expire = 172800
+ serial = None
+ nonce = None
+ sql = None
+ host = None
+ port = None
+ cache_id = None
+ refresh = rpki.rtr.pdus.default_refresh
+ retry = rpki.rtr.pdus.default_retry
+ expire = rpki.rtr.pdus.default_expire
+ updated = Timestamp(0)
def __init__(self, sock, proc, killsig, host, port):
self.killsig = killsig
@@ -312,7 +307,6 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
prefixlen INTEGER NOT NULL,
max_prefixlen INTEGER NOT NULL,
UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''')
-
cur.execute('''
CREATE TABLE routerkey (
cache_id INTEGER NOT NULL
@@ -325,11 +319,11 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
UNIQUE (cache_id, asn, ski),
UNIQUE (cache_id, asn, key))''')
- cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire "
+ cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated "
"FROM cache WHERE host = ? AND port = ?",
(self.host, self.port))
try:
- self.cache_id, version, self.current_nonce, self.current_serial, refresh, retry, expire = cur.fetchone()
+ self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone()
if version is not None:
self.version = version
if refresh is not None:
@@ -338,25 +332,23 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
self.retry = retry
if expire is not None:
self.expire = expire
+ if updated is not None:
+ self.updated = Timestamp(updated)
except TypeError:
cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port))
self.cache_id = cur.lastrowid
self.sql.commit()
- logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s]",
- self.cache_id, self.version, self.current_nonce,
- self.current_serial, self.refresh, self.retry, self.expire)
+ logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]",
+ self.cache_id, self.version, self.nonce,
+ self.serial, self.refresh, self.retry, self.expire, self.updated)
def cache_reset(self):
"""
Handle CacheResetPDU actions.
"""
- self.current_serial = None
+ self.serial = None
if self.sql:
- #
- # For some reason there was no commit here. Dunno why.
- # See if adding one breaks anything....
- #
cur = self.sql.cursor()
cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,))
cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,))
@@ -369,18 +361,19 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
"""
assert version == self.version
- self.current_serial = serial
- self.current_nonce = nonce
- self.refresh = refresh
- self.retry = retry
- self.expire = expire
+ self.serial = serial
+ self.nonce = nonce
+ self.refresh = refresh
+ self.retry = retry
+ self.expire = expire
+ self.updated = Timestamp.now()
if self.sql:
self.sql.execute("UPDATE cache SET"
" version = ?, serial = ?, nonce = ?,"
" refresh = ?, retry = ?, expire = ?,"
- " updated = datetime('now') "
+ " updated = ? "
"WHERE cache_id = ?",
- (version, serial, nonce, refresh, retry, expire, self.cache_id))
+ (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id))
self.sql.commit()
def consume_prefix(self, prefix):
@@ -468,26 +461,44 @@ def client_main(args):
client = constructor(args.host, args.port)
if args.sql_database:
client.setup_sql(args.sql_database)
+
+ polled = client.updated
+ wakeup = None
+
while True:
- if client.current_serial is None or client.current_nonce is None:
+
+ now = Timestamp.now()
+
+ if client.serial is not None and now > client.updated + client.expire:
+ logging.info("[Expiring client data: serial %s, last updated %s, expire %s]",
+ client.serial, client.updated, client.expire)
+ client.cache_reset()
+
+ if client.serial is None or client.nonce is None:
+ polled = now
client.push_pdu(ResetQueryPDU(version = client.version))
- else:
+
+ elif now >= client.updated + client.refresh:
+ polled = now
client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
- polled = Timestamp.now()
- wakeup = None
- while True:
- if wakeup != polled + client.refresh:
- wakeup = Timestamp(polled + client.refresh)
+ serial = client.serial,
+ nonce = client.nonce))
+
+ remaining = 1
+
+ while remaining > 0:
+ now = Timestamp.now()
+ timer = client.retry if (now >= client.updated + client.refresh) else client.refresh
+ wokeup = wakeup
+ wakeup = max(now, Timestamp(max(polled, client.updated) + timer))
+ remaining = wakeup - now
+ if wakeup != wokeup:
logging.info("[Last client poll %s, next %s]", polled, wakeup)
- remaining = wakeup - time.time()
- if remaining < 0:
- break
asyncore.loop(timeout = remaining, count = 1)
except KeyboardInterrupt:
sys.exit(0)
+
finally:
if client is not None:
client.cleanup()