aboutsummaryrefslogtreecommitdiff
path: root/rpki/rtr/server.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2015-10-26 06:29:00 +0000
committerRob Austein <sra@hactrn.net>2015-10-26 06:29:00 +0000
commitb46deb1417dc3596e9ac9fe2fe8cc0b7f42457e7 (patch)
treeca0dc0276d1adc168bc3337ce0564c4ec4957c1b /rpki/rtr/server.py
parent397beaf6d9900dc3b3cb612c89ebf1d57b1d16f6 (diff)
"Any programmer who fails to comply with the standard naming, formatting,
or commenting conventions should be shot. If it so happens that it is inconvenient to shoot him, then he is to be politely requested to recode his program in adherence to the above standard." -- Michael Spier, Digital Equipment Corporation svn path=/branches/tk705/; revision=6152
Diffstat (limited to 'rpki/rtr/server.py')
-rw-r--r--rpki/rtr/server.py872
1 files changed, 436 insertions, 436 deletions
diff --git a/rpki/rtr/server.py b/rpki/rtr/server.py
index 1c7a5e78..f57c3037 100644
--- a/rpki/rtr/server.py
+++ b/rpki/rtr/server.py
@@ -44,37 +44,37 @@ kickme_base = os.path.join(kickme_dir, "kickme")
class PDU(rpki.rtr.pdus.PDU):
- """
- Generic server PDU.
- """
-
- def send_file(self, server, filename):
"""
- Send a content of a file as a cache response. Caller should catch IOError.
+ Generic server PDU.
"""
- fn2 = os.path.splitext(filename)[1]
- assert fn2.startswith(".v") and fn2[2:].isdigit() and int(fn2[2:]) == server.version
-
- f = open(filename, "rb")
- server.push_pdu(CacheResponsePDU(version = server.version,
- nonce = server.current_nonce))
- server.push_file(f)
- server.push_pdu(EndOfDataPDU(version = server.version,
- serial = server.current_serial,
- nonce = server.current_nonce,
- refresh = server.refresh,
- retry = server.retry,
- expire = server.expire))
-
- def send_nodata(self, server):
- """
- Send a nodata error.
- """
+ def send_file(self, server, filename):
+ """
+ Send a content of a file as a cache response. Caller should catch IOError.
+ """
+
+ fn2 = os.path.splitext(filename)[1]
+ assert fn2.startswith(".v") and fn2[2:].isdigit() and int(fn2[2:]) == server.version
- server.push_pdu(ErrorReportPDU(version = server.version,
- errno = ErrorReportPDU.codes["No Data Available"],
- errpdu = self))
+ f = open(filename, "rb")
+ server.push_pdu(CacheResponsePDU(version = server.version,
+ nonce = server.current_nonce))
+ server.push_file(f)
+ server.push_pdu(EndOfDataPDU(version = server.version,
+ serial = server.current_serial,
+ nonce = server.current_nonce,
+ refresh = server.refresh,
+ retry = server.retry,
+ expire = server.expire))
+
+ def send_nodata(self, server):
+ """
+ Send a nodata error.
+ """
+
+ server.push_pdu(ErrorReportPDU(version = server.version,
+ errno = ErrorReportPDU.codes["No Data Available"],
+ errpdu = self))
clone_pdu = clone_pdu_root(PDU)
@@ -82,512 +82,512 @@ clone_pdu = clone_pdu_root(PDU)
@clone_pdu
class SerialQueryPDU(PDU, rpki.rtr.pdus.SerialQueryPDU):
- """
- Serial Query PDU.
- """
-
- def serve(self, server):
- """
- Received a serial query, send incremental transfer in response.
- If client is already up to date, just send an empty incremental
- transfer.
"""
-
- server.logger.debug(self)
- if server.get_serial() is None:
- self.send_nodata(server)
- elif server.current_nonce != self.nonce:
- server.logger.info("[Client requested wrong nonce, resetting client]")
- server.push_pdu(CacheResetPDU(version = server.version))
- elif server.current_serial == self.serial:
- server.logger.debug("[Client is already current, sending empty IXFR]")
- server.push_pdu(CacheResponsePDU(version = server.version,
- nonce = server.current_nonce))
- server.push_pdu(EndOfDataPDU(version = server.version,
- serial = server.current_serial,
- nonce = server.current_nonce,
- refresh = server.refresh,
- retry = server.retry,
- expire = server.expire))
- elif disable_incrementals:
- server.push_pdu(CacheResetPDU(version = server.version))
- else:
- try:
- self.send_file(server, "%d.ix.%d.v%d" % (server.current_serial, self.serial, server.version))
- except IOError:
- server.push_pdu(CacheResetPDU(version = server.version))
+ Serial Query PDU.
+ """
+
+ def serve(self, server):
+ """
+ Received a serial query, send incremental transfer in response.
+ If client is already up to date, just send an empty incremental
+ transfer.
+ """
+
+ server.logger.debug(self)
+ if server.get_serial() is None:
+ self.send_nodata(server)
+ elif server.current_nonce != self.nonce:
+ server.logger.info("[Client requested wrong nonce, resetting client]")
+ server.push_pdu(CacheResetPDU(version = server.version))
+ elif server.current_serial == self.serial:
+ server.logger.debug("[Client is already current, sending empty IXFR]")
+ server.push_pdu(CacheResponsePDU(version = server.version,
+ nonce = server.current_nonce))
+ server.push_pdu(EndOfDataPDU(version = server.version,
+ serial = server.current_serial,
+ nonce = server.current_nonce,
+ refresh = server.refresh,
+ retry = server.retry,
+ expire = server.expire))
+ elif disable_incrementals:
+ server.push_pdu(CacheResetPDU(version = server.version))
+ else:
+ try:
+ self.send_file(server, "%d.ix.%d.v%d" % (server.current_serial, self.serial, server.version))
+ except IOError:
+ server.push_pdu(CacheResetPDU(version = server.version))
@clone_pdu
class ResetQueryPDU(PDU, rpki.rtr.pdus.ResetQueryPDU):
- """
- Reset Query PDU.
- """
-
- def serve(self, server):
"""
- Received a reset query, send full current state in response.
+ Reset Query PDU.
"""
- server.logger.debug(self)
- if server.get_serial() is None:
- self.send_nodata(server)
- else:
- try:
- fn = "%d.ax.v%d" % (server.current_serial, server.version)
- self.send_file(server, fn)
- except IOError:
- server.push_pdu(ErrorReportPDU(version = server.version,
- errno = ErrorReportPDU.codes["Internal Error"],
- errpdu = self,
- errmsg = "Couldn't open %s" % fn))
+ def serve(self, server):
+ """
+ Received a reset query, send full current state in response.
+ """
+
+ server.logger.debug(self)
+ if server.get_serial() is None:
+ self.send_nodata(server)
+ else:
+ try:
+ fn = "%d.ax.v%d" % (server.current_serial, server.version)
+ self.send_file(server, fn)
+ except IOError:
+ server.push_pdu(ErrorReportPDU(version = server.version,
+ errno = ErrorReportPDU.codes["Internal Error"],
+ errpdu = self,
+ errmsg = "Couldn't open %s" % fn))
@clone_pdu
class ErrorReportPDU(rpki.rtr.pdus.ErrorReportPDU):
- """
- Error Report PDU.
- """
-
- def serve(self, server):
"""
- Received an ErrorReportPDU from client. Not much we can do beyond
- logging it, then killing the connection if error was fatal.
+ Error Report PDU.
"""
- server.logger.error(self)
- if self.errno in self.fatal:
- server.logger.error("[Shutting down due to reported fatal protocol error]")
- sys.exit(1)
+ def serve(self, server):
+ """
+ Received an ErrorReportPDU from client. Not much we can do beyond
+ logging it, then killing the connection if error was fatal.
+ """
+
+ server.logger.error(self)
+ if self.errno in self.fatal:
+ server.logger.error("[Shutting down due to reported fatal protocol error]")
+ sys.exit(1)
def read_current(version):
- """
- Read current serial number and nonce. Return None for both if
- serial and nonce not recorded. For backwards compatibility, treat
- file containing just a serial number as having a nonce of zero.
- """
-
- if version is None:
- return None, None
- try:
- with open("current.v%d" % version, "r") as f:
- values = tuple(int(s) for s in f.read().split())
- return values[0], values[1]
- except IndexError:
- return values[0], 0
- except IOError:
- return None, None
+ """
+ Read current serial number and nonce. Return None for both if
+ serial and nonce not recorded. For backwards compatibility, treat
+ file containing just a serial number as having a nonce of zero.
+ """
+
+ if version is None:
+ return None, None
+ try:
+ with open("current.v%d" % version, "r") as f:
+ values = tuple(int(s) for s in f.read().split())
+ return values[0], values[1]
+ except IndexError:
+ return values[0], 0
+ except IOError:
+ return None, None
def write_current(serial, nonce, version):
- """
- Write serial number and nonce.
- """
+ """
+ Write serial number and nonce.
+ """
- curfn = "current.v%d" % version
- tmpfn = curfn + "%d.tmp" % os.getpid()
- with open(tmpfn, "w") as f:
- f.write("%d %d\n" % (serial, nonce))
- os.rename(tmpfn, curfn)
+ curfn = "current.v%d" % version
+ tmpfn = curfn + "%d.tmp" % os.getpid()
+ with open(tmpfn, "w") as f:
+ f.write("%d %d\n" % (serial, nonce))
+ os.rename(tmpfn, curfn)
class FileProducer(object):
- """
- File-based producer object for asynchat.
- """
+ """
+ File-based producer object for asynchat.
+ """
- def __init__(self, handle, buffersize):
- self.handle = handle
- self.buffersize = buffersize
+ def __init__(self, handle, buffersize):
+ self.handle = handle
+ self.buffersize = buffersize
- def more(self):
- return self.handle.read(self.buffersize)
+ def more(self):
+ return self.handle.read(self.buffersize)
class ServerWriteChannel(rpki.rtr.channels.PDUChannel):
- """
- Kludge to deal with ssh's habit of sometimes (compile time option)
- invoking us with two unidirectional pipes instead of one
- bidirectional socketpair. All the server logic is in the
- ServerChannel class, this class just deals with sending the
- server's output to a different file descriptor.
- """
-
- def __init__(self):
"""
- Set up stdout.
+ Kludge to deal with ssh's habit of sometimes (compile time option)
+ invoking us with two unidirectional pipes instead of one
+ bidirectional socketpair. All the server logic is in the
+ ServerChannel class, this class just deals with sending the
+ server's output to a different file descriptor.
"""
- super(ServerWriteChannel, self).__init__(root_pdu_class = PDU)
- self.init_file_dispatcher(sys.stdout.fileno())
+ def __init__(self):
+ """
+ Set up stdout.
+ """
- def readable(self):
- """
- This channel is never readable.
- """
+ super(ServerWriteChannel, self).__init__(root_pdu_class = PDU)
+ self.init_file_dispatcher(sys.stdout.fileno())
- return False
+ def readable(self):
+ """
+ This channel is never readable.
+ """
- def push_file(self, f):
- """
- Write content of a file to stream.
- """
+ return False
- try:
- self.push_with_producer(FileProducer(f, self.ac_out_buffer_size))
- except OSError, e:
- if e.errno != errno.EAGAIN:
- raise
+ def push_file(self, f):
+ """
+ Write content of a file to stream.
+ """
+ try:
+ self.push_with_producer(FileProducer(f, self.ac_out_buffer_size))
+ except OSError, e:
+ if e.errno != errno.EAGAIN:
+ raise
-class ServerChannel(rpki.rtr.channels.PDUChannel):
- """
- Server protocol engine, handles upcalls from PDUChannel to
- implement protocol logic.
- """
- def __init__(self, logger, refresh, retry, expire):
+class ServerChannel(rpki.rtr.channels.PDUChannel):
"""
- Set up stdin and stdout as connection and start listening for
- first PDU.
+ Server protocol engine, handles upcalls from PDUChannel to
+ implement protocol logic.
"""
- super(ServerChannel, self).__init__(root_pdu_class = PDU)
- self.init_file_dispatcher(sys.stdin.fileno())
- self.writer = ServerWriteChannel()
- self.logger = logger
- self.refresh = refresh
- self.retry = retry
- self.expire = expire
- self.get_serial()
- self.start_new_pdu()
-
- def writable(self):
- """
- This channel is never writable.
- """
+ def __init__(self, logger, refresh, retry, expire):
+ """
+ Set up stdin and stdout as connection and start listening for
+ first PDU.
+ """
- return False
+ super(ServerChannel, self).__init__(root_pdu_class = PDU)
+ self.init_file_dispatcher(sys.stdin.fileno())
+ self.writer = ServerWriteChannel()
+ self.logger = logger
+ self.refresh = refresh
+ self.retry = retry
+ self.expire = expire
+ self.get_serial()
+ self.start_new_pdu()
- def push(self, data):
- """
- Redirect to writer channel.
- """
+ def writable(self):
+ """
+ This channel is never writable.
+ """
- return self.writer.push(data)
+ return False
- def push_with_producer(self, producer):
- """
- Redirect to writer channel.
- """
+ def push(self, data):
+ """
+ Redirect to writer channel.
+ """
- return self.writer.push_with_producer(producer)
+ return self.writer.push(data)
- def push_pdu(self, pdu):
- """
- Redirect to writer channel.
- """
+ def push_with_producer(self, producer):
+ """
+ Redirect to writer channel.
+ """
- return self.writer.push_pdu(pdu)
+ return self.writer.push_with_producer(producer)
- def push_file(self, f):
- """
- Redirect to writer channel.
- """
+ def push_pdu(self, pdu):
+ """
+ Redirect to writer channel.
+ """
- return self.writer.push_file(f)
+ return self.writer.push_pdu(pdu)
- def deliver_pdu(self, pdu):
- """
- Handle received PDU.
- """
+ def push_file(self, f):
+ """
+ Redirect to writer channel.
+ """
- pdu.serve(self)
+ return self.writer.push_file(f)
- def get_serial(self):
- """
- Read, cache, and return current serial number, or None if we can't
- find the serial number file. The latter condition should never
- happen, but maybe we got started in server mode while the cronjob
- mode instance is still building its database.
- """
+ def deliver_pdu(self, pdu):
+ """
+ Handle received PDU.
+ """
- self.current_serial, self.current_nonce = read_current(self.version)
- return self.current_serial
+ pdu.serve(self)
- def check_serial(self):
- """
- Check for a new serial number.
- """
+ def get_serial(self):
+ """
+ Read, cache, and return current serial number, or None if we can't
+ find the serial number file. The latter condition should never
+ happen, but maybe we got started in server mode while the cronjob
+ mode instance is still building its database.
+ """
- old_serial = self.current_serial
- return old_serial != self.get_serial()
+ self.current_serial, self.current_nonce = read_current(self.version)
+ return self.current_serial
- def notify(self, data = None, force = False):
- """
- Cronjob instance kicked us: check whether our serial number has
- changed, and send a notify message if so.
+ def check_serial(self):
+ """
+ Check for a new serial number.
+ """
- We have to check rather than just blindly notifying when kicked
- because the cronjob instance has no good way of knowing which
- protocol version we're running, thus has no good way of knowing
- whether we care about a particular change set or not.
- """
+ old_serial = self.current_serial
+ return old_serial != self.get_serial()
- if force or self.check_serial():
- self.push_pdu(SerialNotifyPDU(version = self.version,
- serial = self.current_serial,
- nonce = self.current_nonce))
- else:
- self.logger.debug("Cronjob kicked me but I see no serial change, ignoring")
+ def notify(self, data = None, force = False):
+ """
+ Cronjob instance kicked us: check whether our serial number has
+ changed, and send a notify message if so.
+
+ We have to check rather than just blindly notifying when kicked
+ because the cronjob instance has no good way of knowing which
+ protocol version we're running, thus has no good way of knowing
+ whether we care about a particular change set or not.
+ """
+
+ if force or self.check_serial():
+ self.push_pdu(SerialNotifyPDU(version = self.version,
+ serial = self.current_serial,
+ nonce = self.current_nonce))
+ else:
+ self.logger.debug("Cronjob kicked me but I see no serial change, ignoring")
class KickmeChannel(asyncore.dispatcher, object):
- """
- asyncore dispatcher for the PF_UNIX socket that cronjob mode uses to
- kick servers when it's time to send notify PDUs to clients.
- """
-
- def __init__(self, server):
- asyncore.dispatcher.__init__(self) # Old-style class
- self.server = server
- self.sockname = "%s.%d" % (kickme_base, os.getpid())
- self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- try:
- self.bind(self.sockname)
- os.chmod(self.sockname, 0660)
- except socket.error, e:
- self.server.logger.exception("Couldn't bind() kickme socket: %r", e)
- self.close()
- except OSError, e:
- self.server.logger.exception("Couldn't chmod() kickme socket: %r", e)
-
- def writable(self):
"""
- This socket is read-only, never writable.
+ asyncore dispatcher for the PF_UNIX socket that cronjob mode uses to
+ kick servers when it's time to send notify PDUs to clients.
"""
- return False
+ def __init__(self, server):
+ asyncore.dispatcher.__init__(self) # Old-style class
+ self.server = server
+ self.sockname = "%s.%d" % (kickme_base, os.getpid())
+ self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ try:
+ self.bind(self.sockname)
+ os.chmod(self.sockname, 0660)
+ except socket.error, e:
+ self.server.logger.exception("Couldn't bind() kickme socket: %r", e)
+ self.close()
+ except OSError, e:
+ self.server.logger.exception("Couldn't chmod() kickme socket: %r", e)
+
+ def writable(self):
+ """
+ This socket is read-only, never writable.
+ """
+
+ return False
+
+ def handle_connect(self):
+ """
+ Ignore connect events (not very useful on datagram socket).
+ """
+
+ pass
+
+ def handle_read(self):
+ """
+ Handle receipt of a datagram.
+ """
+
+ data = self.recv(512)
+ self.server.notify(data)
+
+ def cleanup(self):
+ """
+ Clean up this dispatcher's socket.
+ """
+
+ self.close()
+ try:
+ os.unlink(self.sockname)
+ except: # pylint: disable=W0702
+ pass
- def handle_connect(self):
- """
- Ignore connect events (not very useful on datagram socket).
- """
+ def log(self, msg):
+ """
+ Intercept asyncore's logging.
+ """
- pass
+ self.server.logger.info(msg)
- def handle_read(self):
- """
- Handle receipt of a datagram.
- """
+ def log_info(self, msg, tag = "info"):
+ """
+ Intercept asyncore's logging.
+ """
- data = self.recv(512)
- self.server.notify(data)
+ self.server.logger.info("asyncore: %s: %s", tag, msg)
- def cleanup(self):
- """
- Clean up this dispatcher's socket.
- """
+ def handle_error(self):
+ """
+ Handle errors caught by asyncore main loop.
+ """
- self.close()
- try:
- os.unlink(self.sockname)
- except: # pylint: disable=W0702
- pass
+ self.server.logger.exception("[Unhandled exception]")
+ self.server.logger.critical("[Exiting after unhandled exception]")
+ sys.exit(1)
- def log(self, msg):
+
+def _hostport_tag():
"""
- Intercept asyncore's logging.
+ Construct hostname/address + port when we're running under a
+ protocol we understand well enough to do that. This is all
+ kludgery. Just grit your teeth, or perhaps just close your eyes.
"""
- self.server.logger.info(msg)
+ proto = None
- def log_info(self, msg, tag = "info"):
- """
- Intercept asyncore's logging.
- """
+ if proto is None:
+ try:
+ host, port = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM).getpeername()
+ proto = "tcp"
+ except: # pylint: disable=W0702
+ pass
- self.server.logger.info("asyncore: %s: %s", tag, msg)
+ if proto is None:
+ try:
+ host, port = socket.fromfd(0, socket.AF_INET6, socket.SOCK_STREAM).getpeername()[0:2]
+ proto = "tcp"
+ except: # pylint: disable=W0702
+ pass
- def handle_error(self):
+ if proto is None:
+ try:
+ host, port = os.environ["SSH_CONNECTION"].split()[0:2]
+ proto = "ssh"
+ except: # pylint: disable=W0702
+ pass
+
+ if proto is None:
+ try:
+ host, port = os.environ["REMOTE_HOST"], os.getenv("REMOTE_PORT")
+ proto = "ssl"
+ except: # pylint: disable=W0702
+ pass
+
+ if proto is None:
+ return ""
+ elif not port:
+ return "/%s/%s" % (proto, host)
+ elif ":" in host:
+ return "/%s/%s.%s" % (proto, host, port)
+ else:
+ return "/%s/%s:%s" % (proto, host, port)
+
+
+def server_main(args):
"""
- Handle errors caught by asyncore main loop.
+ Implement the server side of the rpkk-router protocol. Other than
+ one PF_UNIX socket inode, this doesn't write anything to disk, so it
+ can be run with minimal privileges. Most of the work has already
+ been done by the database generator, so all this server has to do is
+ pass the results along to a client.
"""
- self.server.logger.exception("[Unhandled exception]")
- self.server.logger.critical("[Exiting after unhandled exception]")
- sys.exit(1)
+ logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))
+ logger.debug("[Starting]")
-def _hostport_tag():
- """
- Construct hostname/address + port when we're running under a
- protocol we understand well enough to do that. This is all
- kludgery. Just grit your teeth, or perhaps just close your eyes.
- """
-
- proto = None
+ if args.rpki_rtr_dir:
+ try:
+ os.chdir(args.rpki_rtr_dir)
+ except OSError, e:
+ sys.exit(e)
- if proto is None:
+ kickme = None
try:
- host, port = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM).getpeername()
- proto = "tcp"
- except: # pylint: disable=W0702
- pass
+ server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
+ kickme = rpki.rtr.server.KickmeChannel(server = server)
+ asyncore.loop(timeout = None)
+ signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
+ except KeyboardInterrupt:
+ sys.exit(0)
+ finally:
+ signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
+ if kickme is not None:
+ kickme.cleanup()
- if proto is None:
- try:
- host, port = socket.fromfd(0, socket.AF_INET6, socket.SOCK_STREAM).getpeername()[0:2]
- proto = "tcp"
- except: # pylint: disable=W0702
- pass
- if proto is None:
- try:
- host, port = os.environ["SSH_CONNECTION"].split()[0:2]
- proto = "ssh"
- except: # pylint: disable=W0702
- pass
+def listener_main(args):
+ """
+ Totally insecure TCP listener for rpki-rtr protocol. We only
+ implement this because it's all that the routers currently support.
+ In theory, we will all be running TCP-AO in the future, at which
+ point this listener will go away or become a TCP-AO listener.
+ """
- if proto is None:
- try:
- host, port = os.environ["REMOTE_HOST"], os.getenv("REMOTE_PORT")
- proto = "ssl"
- except: # pylint: disable=W0702
- pass
+ # Perhaps we should daemonize? Deal with that later.
- if proto is None:
- return ""
- elif not port:
- return "/%s/%s" % (proto, host)
- elif ":" in host:
- return "/%s/%s.%s" % (proto, host, port)
- else:
- return "/%s/%s:%s" % (proto, host, port)
+ # server_main() handles args.rpki_rtr_dir.
+ listener = None
+ try:
+ listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ listener.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+ except: # pylint: disable=W0702
+ if listener is not None:
+ listener.close()
+ listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ try:
+ listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ except AttributeError:
+ pass
+ listener.bind(("", args.port))
+ listener.listen(5)
+ logging.debug("[Listening on port %s]", args.port)
+ while True:
+ try:
+ s, ai = listener.accept()
+ except KeyboardInterrupt:
+ sys.exit(0)
+ logging.debug("[Received connection from %r]", ai)
+ pid = os.fork()
+ if pid == 0:
+ os.dup2(s.fileno(), 0) # pylint: disable=E1103
+ os.dup2(s.fileno(), 1) # pylint: disable=E1103
+ s.close()
+ #os.closerange(3, os.sysconf("SC_OPEN_MAX"))
+ server_main(args)
+ sys.exit()
+ else:
+ logging.debug("[Spawned server %d]", pid)
+ while True:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG) # pylint: disable=W0612
+ if pid:
+ logging.debug("[Server %s exited]", pid)
+ continue
+ except: # pylint: disable=W0702
+ pass
+ break
-def server_main(args):
- """
- Implement the server side of the rpkk-router protocol. Other than
- one PF_UNIX socket inode, this doesn't write anything to disk, so it
- can be run with minimal privileges. Most of the work has already
- been done by the database generator, so all this server has to do is
- pass the results along to a client.
- """
- logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))
+def argparse_setup(subparsers):
+ """
+ Set up argparse stuff for commands in this module.
+ """
- logger.debug("[Starting]")
+ # These could have been lambdas, but doing it this way results in
+ # more useful error messages on argparse failures.
- if args.rpki_rtr_dir:
- try:
- os.chdir(args.rpki_rtr_dir)
- except OSError, e:
- sys.exit(e)
-
- kickme = None
- try:
- server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
- kickme = rpki.rtr.server.KickmeChannel(server = server)
- asyncore.loop(timeout = None)
- signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
- except KeyboardInterrupt:
- sys.exit(0)
- finally:
- signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
- if kickme is not None:
- kickme.cleanup()
+ def refresh(v):
+ return rpki.rtr.pdus.valid_refresh(int(v))
+ def retry(v):
+ return rpki.rtr.pdus.valid_retry(int(v))
-def listener_main(args):
- """
- Totally insecure TCP listener for rpki-rtr protocol. We only
- implement this because it's all that the routers currently support.
- In theory, we will all be running TCP-AO in the future, at which
- point this listener will go away or become a TCP-AO listener.
- """
-
- # Perhaps we should daemonize? Deal with that later.
-
- # server_main() handles args.rpki_rtr_dir.
-
- listener = None
- try:
- listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- listener.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
- except: # pylint: disable=W0702
- if listener is not None:
- listener.close()
- listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except AttributeError:
- pass
- listener.bind(("", args.port))
- listener.listen(5)
- logging.debug("[Listening on port %s]", args.port)
- while True:
- try:
- s, ai = listener.accept()
- except KeyboardInterrupt:
- sys.exit(0)
- logging.debug("[Received connection from %r]", ai)
- pid = os.fork()
- if pid == 0:
- os.dup2(s.fileno(), 0) # pylint: disable=E1103
- os.dup2(s.fileno(), 1) # pylint: disable=E1103
- s.close()
- #os.closerange(3, os.sysconf("SC_OPEN_MAX"))
- server_main(args)
- sys.exit()
- else:
- logging.debug("[Spawned server %d]", pid)
- while True:
- try:
- pid, status = os.waitpid(0, os.WNOHANG) # pylint: disable=W0612
- if pid:
- logging.debug("[Server %s exited]", pid)
- continue
- except: # pylint: disable=W0702
- pass
- break
+ def expire(v):
+ return rpki.rtr.pdus.valid_expire(int(v))
+ # Some duplication of arguments here, not enough to be worth huge
+ # effort to clean up, worry about it later in any case.
-def argparse_setup(subparsers):
- """
- Set up argparse stuff for commands in this module.
- """
-
- # These could have been lambdas, but doing it this way results in
- # more useful error messages on argparse failures.
-
- def refresh(v):
- return rpki.rtr.pdus.valid_refresh(int(v))
-
- def retry(v):
- return rpki.rtr.pdus.valid_retry(int(v))
-
- def expire(v):
- return rpki.rtr.pdus.valid_expire(int(v))
-
- # Some duplication of arguments here, not enough to be worth huge
- # effort to clean up, worry about it later in any case.
-
- subparser = subparsers.add_parser("server", description = server_main.__doc__,
- help = "RPKI-RTR protocol server")
- subparser.set_defaults(func = server_main, default_log_to = "syslog")
- subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
- subparser.add_argument("--retry", type = retry, help = "override default retry timer")
- subparser.add_argument("--expire", type = expire, help = "override default expire timer")
- subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")
-
- subparser = subparsers.add_parser("listener", description = listener_main.__doc__,
- help = "TCP listener for RPKI-RTR protocol server")
- subparser.set_defaults(func = listener_main, default_log_to = "syslog")
- subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
- subparser.add_argument("--retry", type = retry, help = "override default retry timer")
- subparser.add_argument("--expire", type = expire, help = "override default expire timer")
- subparser.add_argument("port", type = int, help = "TCP port on which to listen")
- subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")
+ subparser = subparsers.add_parser("server", description = server_main.__doc__,
+ help = "RPKI-RTR protocol server")
+ subparser.set_defaults(func = server_main, default_log_to = "syslog")
+ subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
+ subparser.add_argument("--retry", type = retry, help = "override default retry timer")
+ subparser.add_argument("--expire", type = expire, help = "override default expire timer")
+ subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")
+
+ subparser = subparsers.add_parser("listener", description = listener_main.__doc__,
+ help = "TCP listener for RPKI-RTR protocol server")
+ subparser.set_defaults(func = listener_main, default_log_to = "syslog")
+ subparser.add_argument("--refresh", type = refresh, help = "override default refresh timer")
+ subparser.add_argument("--retry", type = retry, help = "override default retry timer")
+ subparser.add_argument("--expire", type = expire, help = "override default expire timer")
+ subparser.add_argument("port", type = int, help = "TCP port on which to listen")
+ subparser.add_argument("rpki_rtr_dir", nargs = "?", help = "directory containing RPKI-RTR database")