aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2014-05-21 03:36:13 +0000
committerRob Austein <sra@hactrn.net>2014-05-21 03:36:13 +0000
commit9227cb13508c98c79b9c14961daaf6c19a74e1e6 (patch)
tree2608368f3d0ddb4d8b060fbb2b2b4a90b3a13b4e
parent08b636cbe79f5d48966cf52f375062e3531aed3f (diff)
Finish implementing 6810-bis timing parameters.
svn path=/trunk/; revision=5832
-rw-r--r--rpki/rtr/channels.py3
-rw-r--r--rpki/rtr/client.py111
-rw-r--r--rpki/rtr/pdus.py58
-rw-r--r--rpki/rtr/server.py44
4 files changed, 130 insertions, 86 deletions
diff --git a/rpki/rtr/channels.py b/rpki/rtr/channels.py
index 53295900..2b1ea406 100644
--- a/rpki/rtr/channels.py
+++ b/rpki/rtr/channels.py
@@ -37,8 +37,7 @@ class Timestamp(int):
"""
def __new__(cls, t):
- # http://stackoverflow.com/questions/7471255/pythons-super-and-new-confused-me
- #return int.__new__(cls, t)
+ # __new__() is a static method, not a class method, hence the odd calling sequence.
return super(Timestamp, cls).__new__(cls, t)
@classmethod
diff --git a/rpki/rtr/client.py b/rpki/rtr/client.py
index e29a244c..d6000e04 100644
--- a/rpki/rtr/client.py
+++ b/rpki/rtr/client.py
@@ -60,12 +60,12 @@ class SerialNotifyPDU(rpki.rtr.pdus.SerialNotifyPDU):
"""
logging.debug(self)
- if client.current_serial is None or client.current_nonce != self.nonce:
+ if client.serial is None or client.nonce != self.nonce:
client.push_pdu(ResetQueryPDU(version = client.version))
- elif self.serial != client.current_serial:
+ elif self.serial != client.serial:
client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
+ serial = client.serial,
+ nonce = client.nonce))
else:
logging.debug("[Notify did not change serial number, ignoring]")
@@ -79,7 +79,7 @@ class CacheResponsePDU(rpki.rtr.pdus.CacheResponsePDU):
"""
logging.debug(self)
- if self.nonce != client.current_nonce:
+ if self.nonce != client.nonce:
logging.debug("[Nonce changed, resetting]")
client.cache_reset()
@@ -171,21 +171,16 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
Client protocol engine, handles upcalls from PDUChannel.
"""
- current_serial = None
- current_nonce = None
- sql = None
- host = None
- port = None
- cache_id = None
-
- # For initial test purposes, let's use the minimum allowed values
- # from the RFC 6810 bis I-D as the initial defaults for refresh and
- # retry, and the maximum allowed for expire; these will be overriden
- # as soon as we receive an EndOfDataPDU.
- #
- refresh = 120
- retry = 120
- expire = 172800
+ serial = None
+ nonce = None
+ sql = None
+ host = None
+ port = None
+ cache_id = None
+ refresh = rpki.rtr.pdus.default_refresh
+ retry = rpki.rtr.pdus.default_retry
+ expire = rpki.rtr.pdus.default_expire
+ updated = Timestamp(0)
def __init__(self, sock, proc, killsig, host, port):
self.killsig = killsig
@@ -312,7 +307,6 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
prefixlen INTEGER NOT NULL,
max_prefixlen INTEGER NOT NULL,
UNIQUE (cache_id, asn, prefix, prefixlen, max_prefixlen))''')
-
cur.execute('''
CREATE TABLE routerkey (
cache_id INTEGER NOT NULL
@@ -325,11 +319,11 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
UNIQUE (cache_id, asn, ski),
UNIQUE (cache_id, asn, key))''')
- cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire "
+ cur.execute("SELECT cache_id, version, nonce, serial, refresh, retry, expire, updated "
"FROM cache WHERE host = ? AND port = ?",
(self.host, self.port))
try:
- self.cache_id, version, self.current_nonce, self.current_serial, refresh, retry, expire = cur.fetchone()
+ self.cache_id, version, self.nonce, self.serial, refresh, retry, expire, updated = cur.fetchone()
if version is not None:
self.version = version
if refresh is not None:
@@ -338,25 +332,23 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
self.retry = retry
if expire is not None:
self.expire = expire
+ if updated is not None:
+ self.updated = Timestamp(updated)
except TypeError:
cur.execute("INSERT INTO cache (host, port) VALUES (?, ?)", (self.host, self.port))
self.cache_id = cur.lastrowid
self.sql.commit()
- logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s]",
- self.cache_id, self.version, self.current_nonce,
- self.current_serial, self.refresh, self.retry, self.expire)
+ logging.info("[Session %d version %s nonce %s serial %s refresh %s retry %s expire %s updated %s]",
+ self.cache_id, self.version, self.nonce,
+ self.serial, self.refresh, self.retry, self.expire, self.updated)
def cache_reset(self):
"""
Handle CacheResetPDU actions.
"""
- self.current_serial = None
+ self.serial = None
if self.sql:
- #
- # For some reason there was no commit here. Dunno why.
- # See if adding one breaks anything....
- #
cur = self.sql.cursor()
cur.execute("DELETE FROM prefix WHERE cache_id = ?", (self.cache_id,))
cur.execute("DELETE FROM routerkey WHERE cache_id = ?", (self.cache_id,))
@@ -369,18 +361,19 @@ class ClientChannel(rpki.rtr.channels.PDUChannel):
"""
assert version == self.version
- self.current_serial = serial
- self.current_nonce = nonce
- self.refresh = refresh
- self.retry = retry
- self.expire = expire
+ self.serial = serial
+ self.nonce = nonce
+ self.refresh = refresh
+ self.retry = retry
+ self.expire = expire
+ self.updated = Timestamp.now()
if self.sql:
self.sql.execute("UPDATE cache SET"
" version = ?, serial = ?, nonce = ?,"
" refresh = ?, retry = ?, expire = ?,"
- " updated = datetime('now') "
+ " updated = ? "
"WHERE cache_id = ?",
- (version, serial, nonce, refresh, retry, expire, self.cache_id))
+ (version, serial, nonce, refresh, retry, expire, int(self.updated), self.cache_id))
self.sql.commit()
def consume_prefix(self, prefix):
@@ -468,26 +461,44 @@ def client_main(args):
client = constructor(args.host, args.port)
if args.sql_database:
client.setup_sql(args.sql_database)
+
+ polled = client.updated
+ wakeup = None
+
while True:
- if client.current_serial is None or client.current_nonce is None:
+
+ now = Timestamp.now()
+
+ if client.serial is not None and now > client.updated + client.expire:
+ logging.info("[Expiring client data: serial %s, last updated %s, expire %s]",
+ client.serial, client.updated, client.expire)
+ client.cache_reset()
+
+ if client.serial is None or client.nonce is None:
+ polled = now
client.push_pdu(ResetQueryPDU(version = client.version))
- else:
+
+ elif now >= client.updated + client.refresh:
+ polled = now
client.push_pdu(SerialQueryPDU(version = client.version,
- serial = client.current_serial,
- nonce = client.current_nonce))
- polled = Timestamp.now()
- wakeup = None
- while True:
- if wakeup != polled + client.refresh:
- wakeup = Timestamp(polled + client.refresh)
+ serial = client.serial,
+ nonce = client.nonce))
+
+ remaining = 1
+
+ while remaining > 0:
+ now = Timestamp.now()
+ timer = client.retry if (now >= client.updated + client.refresh) else client.refresh
+ wokeup = wakeup
+ wakeup = max(now, Timestamp(max(polled, client.updated) + timer))
+ remaining = wakeup - now
+ if wakeup != wokeup:
logging.info("[Last client poll %s, next %s]", polled, wakeup)
- remaining = wakeup - time.time()
- if remaining < 0:
- break
asyncore.loop(timeout = remaining, count = 1)
except KeyboardInterrupt:
sys.exit(0)
+
finally:
if client is not None:
client.cleanup()
diff --git a/rpki/rtr/pdus.py b/rpki/rtr/pdus.py
index f7d1e47a..0d2e5928 100644
--- a/rpki/rtr/pdus.py
+++ b/rpki/rtr/pdus.py
@@ -316,6 +316,33 @@ def EndOfDataPDU(version, *args, **kwargs):
raise NotImplementedError
+# Min, max, and default values, from the current RFC 6810 bis I-D.
+# Putting these here lets us keep them all in one place, and use them
+# in our client API for both protocol versions even though they can
+# only be set in the protocol in version 1.
+
+default_refresh = 3600
+
+def valid_refresh(refresh):
+ if not isinstance(refresh, int) or refresh < 120 or refresh > 86400:
+ raise ValueError
+ return refresh
+
+default_retry = 600
+
+def valid_retry(retry):
+ if not isinstance(retry, int) or retry < 120 or retry > 7200:
+ raise ValueError
+ return retry
+
+default_expire = 7200
+
+def valid_expire(expire):
+ if not isinstance(expire, int) or expire < 600 or expire > 172800:
+ raise ValueError
+ return expire
+
+
@wire_pdu_only(0)
class EndOfDataPDUv0(PDUWithSerial):
"""
@@ -324,14 +351,11 @@ class EndOfDataPDUv0(PDUWithSerial):
pdu_type = 7
- # Default values, from the current RFC 6810 bis I-D.
- # Putting these here lets us use them in our client API for both
- # protocol versions, even though they can only be set in the
- # protocol in version 1.
-
- refresh = 3600
- retry = 600
- expire = 7200
+ def __init__(self, version, serial = None, nonce = None, refresh = None, retry = None, expire = None):
+ super(EndOfDataPDUv0, self).__init__(version, serial, nonce)
+ self.refresh = valid_refresh(default_refresh if refresh is None else refresh)
+ self.retry = valid_retry( default_retry if retry is None else retry)
+ self.expire = valid_expire( default_expire if expire is None else expire)
@wire_pdu_only(1)
@@ -342,24 +366,6 @@ class EndOfDataPDUv1(EndOfDataPDUv0):
header_struct = struct.Struct("!BBHLLLLL")
- def __init__(self, version, serial = None, nonce = None, refresh = None, retry = None, expire = None):
- super(EndOfDataPDUv1, self).__init__(version)
- if serial is not None:
- assert isinstance(serial, int)
- self.serial = serial
- if nonce is not None:
- assert isinstance(nonce, int)
- self.nonce = nonce
- if refresh is not None:
- assert isinstance(refresh, int)
- self.refresh = refresh
- if retry is not None:
- assert isinstance(retry, int)
- self.retry = retry
- if expire is not None:
- assert isinstance(expire, int)
- self.expire = expire
-
def __str__(self):
return "[%s, serial #%d nonce %d refresh %d retry %d expire %d]" % (
self.__class__.__name__, self.serial, self.nonce, self.refresh, self.retry, self.expire)
diff --git a/rpki/rtr/server.py b/rpki/rtr/server.py
index 0e6b86cc..a1aacbee 100644
--- a/rpki/rtr/server.py
+++ b/rpki/rtr/server.py
@@ -31,9 +31,7 @@ import rpki.oids
import rpki.rtr.pdus
import rpki.rtr.channels
-from rpki.rtr.pdus import (clone_pdu_root,
- CacheResponsePDU, EndOfDataPDU, CacheResetPDU, CacheResponsePDU,
- EndOfDataPDU, CacheResetPDU, CacheResetPDU, SerialNotifyPDU)
+from rpki.rtr.pdus import (clone_pdu_root, CacheResponsePDU, EndOfDataPDU, CacheResetPDU, SerialNotifyPDU)
# Disable incremental updates. Debugging only, should be False in production.
@@ -63,7 +61,10 @@ class PDU(rpki.rtr.pdus.PDU):
server.push_file(f)
server.push_pdu(EndOfDataPDU(version = server.version,
serial = server.current_serial,
- nonce = server.current_nonce))
+ nonce = server.current_nonce,
+ refresh = server.refresh,
+ retry = server.retry,
+ expire = server.expire))
def send_nodata(self, server):
"""
@@ -103,7 +104,10 @@ class SerialQueryPDU(PDU, rpki.rtr.pdus.SerialQueryPDU):
nonce = server.current_nonce))
server.push_pdu(EndOfDataPDU(version = server.version,
serial = server.current_serial,
- nonce = server.current_nonce))
+ nonce = server.current_nonce,
+ refresh = server.refresh,
+ retry = server.retry,
+ expire = server.expire))
elif disable_incrementals:
server.push_pdu(CacheResetPDU(version = server.version))
else:
@@ -242,7 +246,7 @@ class ServerChannel(rpki.rtr.channels.PDUChannel):
implement protocol logic.
"""
- def __init__(self, logger):
+ def __init__(self, logger, refresh, retry, expire):
"""
Set up stdin and stdout as connection and start listening for
first PDU.
@@ -252,6 +256,9 @@ class ServerChannel(rpki.rtr.channels.PDUChannel):
self.init_file_dispatcher(sys.stdin.fileno())
self.writer = ServerWriteChannel()
self.logger = logger
+ self.refresh = refresh
+ self.retry = retry
+ self.expire = expire
self.get_serial()
self.start_new_pdu()
@@ -480,7 +487,7 @@ def server_main(args):
kickme = None
try:
- server = rpki.rtr.server.ServerChannel(logger = logger)
+ server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
kickme = rpki.rtr.server.KickmeChannel(server = server)
asyncore.loop(timeout = None)
except KeyboardInterrupt:
@@ -550,13 +557,34 @@ def argparse_setup(subparsers):
Set up argparse stuff for commands in this module.
"""
+ # These could have been lambdas, but doing it this way results in
+ # more useful error messages on argparse failures.
+
+ def refresh(v):
+ return rpki.rtr.pdus.valid_refresh(int(v))
+
+ def retry(v):
+ return rpki.rtr.pdus.valid_retry(int(v))
+
+ def expire(v):
+ return rpki.rtr.pdus.valid_expire(int(v))
+
+ # Some duplication of arguments here, not enough to be worth huge
+ # effort to clean up, worry about it later in any case.
+
subparser = subparsers.add_parser("server", description = server_main.__doc__,
help = "RPKI-RTR protocol server")
subparser.set_defaults(func = server_main, default_log_to = "syslog")
+ subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
+ subparser.add_argument("--retry", type = retry, help = "override default retry timer")
+ subparser.add_argument("--expire", type = expire, help = "override default expire timer")
subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")
subparser = subparsers.add_parser("listener", description = listener_main.__doc__,
help = "TCP listener for RPKI-RTR protocol server")
subparser.set_defaults(func = listener_main, default_log_to = "syslog")
- subparser.add_argument("port", type = int, help = "TCP port on which to listen")
+ subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
+ subparser.add_argument("--retry", type = retry, help = "override default retry timer")
+ subparser.add_argument("--expire", type = expire, help = "override default expire timer")
+ subparser.add_argument("port", type = int, help = "TCP port on which to listen")
subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")