aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rtr-origin/rtr-origin.py133
1 files changed, 65 insertions, 68 deletions
diff --git a/rtr-origin/rtr-origin.py b/rtr-origin/rtr-origin.py
index 84b4ceb8..9ea66700 100644
--- a/rtr-origin/rtr-origin.py
+++ b/rtr-origin/rtr-origin.py
@@ -11,8 +11,8 @@ translating that data into the form used in the rpki-router protocol.
cronjob mode prepares both full dumps (axfr) and incremental dumps
against a specific prior version (ixfr). [Terminology here borrowed
from DNS, as is much of the protocol design.] Finally, cronjob mode
-sends wakeup messages to any active servers, so that they can notify
-their clients that a new version is available.
+kicks any active servers, so that they can notify their clients that a
+new version is available.
server mode implements the server side of the rpkk-router protocol.
Other than one PF_UNIX socket inode, it doesn't write anything to
@@ -201,7 +201,9 @@ class serial_query(pdu_with_serial):
transfer.
"""
log(self)
- if int(server.get_serial()) == self.serial:
+ if server.get_serial() is None:
+ server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Sorry, I have no current data to give you"))
+ elif int(server.current_serial) == self.serial:
log("[Client is already current, sending empty IXFR]")
server.push_pdu(cache_response())
server.push_pdu(end_of_data(serial = server.current_serial))
@@ -226,15 +228,15 @@ class reset_query(pdu_empty):
log(self)
if server.get_serial() is None:
server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Sorry, I have no current data to give you"))
- return
- try:
- fn = "%s.ax" % server.current_serial
- f = open(fn, "rb")
- server.push_pdu(cache_response())
- server.push_file(f)
- server.push_pdu(end_of_data(serial = server.current_serial))
- except IOError:
- server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Couldn't open %s" % fn))
+ else:
+ try:
+ fn = "%s.ax" % server.current_serial
+ f = open(fn, "rb")
+ server.push_pdu(cache_response())
+ server.push_file(f)
+ server.push_pdu(end_of_data(serial = server.current_serial))
+ except IOError:
+ server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Couldn't open %s" % fn))
class cache_response(pdu_empty):
"""Cache Response PDU."""
@@ -511,10 +513,8 @@ class prefix_set(list):
def cronjob_main():
"""Toy version of main program for cronjob. This isn't ready for
- real use yet, but does most of the basic operations. Sending notify
- wakeup calls to server processes is waiting for me to write server
- code for this to talk to. Still needs cleanup, config file (instead
- of wired in magic filenames), etc.
+ real use yet, but does most of the basic operations. Still needs
+ cleanup, config file (instead of wired in magic filenames), etc.
"""
axfrs = [prefix_set.load_axfr(f) for f in glob.glob("*.ax")]
@@ -544,9 +544,12 @@ def cronjob_main():
print p
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- for name in glob.iglob("wakeup.*"):
- print "# Notifying %s" % name
- s.sendto("Hello, Polly!", name)
+ for name in glob.iglob("kickme.*"):
+ print "# Kicking %s" % name
+ try:
+ s.sendto("Hello, Polly!", name)
+ except:
+ print "# Failed to kick %s" % name
s.close()
class file_producer(object):
@@ -559,7 +562,7 @@ class file_producer(object):
def more(self):
return self.handle.read(self.buffersize)
-class pdu_asynchat(asynchat.async_chat):
+class pdu_channel(asynchat.async_chat):
"""asynchat subclass that understands our PDUs. This just handles
the network I/O. Specific engines (client, server) should be
subclasses of this with methods that do something useful with the
@@ -599,12 +602,12 @@ class pdu_asynchat(asynchat.async_chat):
raise NotImplementedError
def push_pdu(self, pdu):
- """Write PDU to asynchat stream."""
+ """Write PDU to stream."""
data = pdu.to_pdu()
self.push(data)
def push_file(self, f):
- """Write content of a file to an asynchat stream."""
+ """Write content of a file to stream."""
self.push_with_producer(file_producer(f, self.ac_out_buffer_size))
def log(self, msg):
@@ -616,22 +619,19 @@ class pdu_asynchat(asynchat.async_chat):
log("asynchat: %s: %s" % (tag, msg))
def handle_error(self):
- """Handle errors caught by asyncore main loop. Asyncore has a
- default handler for this but I find its customized backtraces very
- hard to read.
- """
+ """Handle errors caught by asyncore main loop."""
log(traceback.format_exc())
- log("Closing channel %s" % repr(self))
- self.close()
+ log("Exiting after unhandled exception")
+ asyncore.close_all()
-class server_asynchat(pdu_asynchat):
- """Server protocol engine, handles upcalls from pdu_asynchat to
+class server_channel(pdu_channel):
+ """Server protocol engine, handles upcalls from pdu_channel to
implement protocol logic.
"""
def __init__(self):
"""Set up stdin as connection and start listening for first PDU."""
- pdu_asynchat.__init__(self)
+ pdu_channel.__init__(self)
#
# I don't know a sane way to get asynchat.async_chat.__init__() to
# call asyncore.file_dispatcher.__init__(), so shut your eyes for
@@ -655,17 +655,10 @@ class server_asynchat(pdu_asynchat):
"""Handle received PDU."""
pdu.serve(self)
- wakeup = None
-
- def set_wakeup(self, wakeup):
- """Record companion wakeup socket, for shutdown."""
- self.wakeup = wakeup
-
def handle_close(self):
- """Intercept close event so we can shut down wakeup socket too."""
- if self.wakeup is not None:
- self.wakeup.close()
+ """Intercept close event so we can shut down other sockets."""
asynchat.async_chat.handle_close(self)
+ asyncore.close_all()
def get_serial(self):
"""Read, cache, and return current serial number, or None if we
@@ -688,12 +681,14 @@ class server_asynchat(pdu_asynchat):
return old_serial != self.get_serial()
def notify(self, data = None):
- """Receive a wakeup from cronjob instance."""
+ """Cronjob instance kicked us, send a notify message."""
if self.check_serial():
self.push_pdu(serial_notify(serial = self.current_serial))
+ else:
+ log("Cronjob kicked me without a valid current serial number")
-class client_asynchat(pdu_asynchat):
- """Client protocol engine, handles upcalls from pdu_asynchat."""
+class client_channel(pdu_channel):
+ """Client protocol engine, handles upcalls from pdu_channel."""
current_serial = None
@@ -707,7 +702,7 @@ class client_asynchat(pdu_asynchat):
self.ssh = subprocess.Popen(["/usr/local/bin/python", "rtr-origin.py", "server"], stdin = s[0], stdout = s[0], close_fds = True)
else:
self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True)
- pdu_asynchat.__init__(self, conn = s[1])
+ pdu_channel.__init__(self, conn = s[1])
self.start_new_pdu()
def deliver_pdu(self, pdu):
@@ -726,18 +721,22 @@ class client_asynchat(pdu_asynchat):
except OSError:
pass
-class server_wakeup(asyncore.dispatcher):
- """asycnore dispatcher for server. This just handles the PF_UNIX
- sockets we use to receive wakeup calls from the cronjob when it's
- time for us to send a notify PDU to our client.
+ def handle_close(self):
+ """Intercept close event so we can log it."""
+ log("Server closed channel")
+ asynchat.async_chat.handle_close(self)
+
+class kickme_channel(asyncore.dispatcher):
+ """asyncore dispatcher for the PF_UNIX socket that cronjob mode uses
+ to kick servers when it's time to send notify PDUs to clients.
"""
- def __init__(self, chat):
+ def __init__(self, server):
asyncore.dispatcher.__init__(self)
- self.my_asynchat_handle = chat
- self.my_socket_filename = "wakeup.%d" % os.getpid()
+ self.server = server
+ self.sockname = "kickme.%d" % os.getpid()
self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- self.bind(self.my_socket_filename)
+ self.bind(self.sockname)
def writable(self):
"""This socket is read-only, never writable."""
@@ -750,13 +749,13 @@ class server_wakeup(asyncore.dispatcher):
def handle_read(self):
"""Handle receipt of a datagram."""
data = self.recv(512)
- self.my_asynchat_handle.notify(data)
+ self.server.notify(data)
def cleanup(self):
"""Clean up this dispatcher's socket."""
self.close()
try:
- os.unlink(self.my_socket_filename)
+ os.unlink(self.sockname)
except:
pass
@@ -769,31 +768,29 @@ class server_wakeup(asyncore.dispatcher):
log("asyncore: %s: %s" % (tag, msg))
def handle_error(self):
- """Handle errors caught by asyncore main loop. Asyncore has a
- default handler for this but I find its customized backtraces very
- hard to read.
- """
+ """Handle errors caught by asyncore main loop."""
log(traceback.format_exc())
- log("Closing channel %s" % repr(self))
- self.close()
+ log("Exiting after unhandled exception")
+ asyncore.close_all()
def server_main():
- """Main program for server mode. Not really written yet."""
- wakeup = None
+ """Main program for server mode. Server is event driven, so
+ everything interesting happens in the channel classes.
+ """
+ kickme = None
try:
- server = server_asynchat()
- wakeup = server_wakeup(chat = server)
- server.set_wakeup(wakeup)
+ server = server_channel()
+ kickme = kickme_channel(server = server)
asyncore.loop()
finally:
- if wakeup is not None:
- wakeup.cleanup()
+ if kickme is not None:
+ kickme.cleanup()
def client_main():
"""Main program for client mode. Not really written yet."""
client = None
try:
- client = client_asynchat()
+ client = client_channel()
client.push_pdu(reset_query())
period = rpki.sundial.timedelta(seconds = 90)
wakeup = rpki.sundial.now() + period