diff options
-rw-r--r-- | rtr-origin/rtr-origin.py | 133 |
1 files changed, 65 insertions, 68 deletions
diff --git a/rtr-origin/rtr-origin.py b/rtr-origin/rtr-origin.py index 84b4ceb8..9ea66700 100644 --- a/rtr-origin/rtr-origin.py +++ b/rtr-origin/rtr-origin.py @@ -11,8 +11,8 @@ translating that data into the form used in the rpki-router protocol. cronjob mode prepares both full dumps (axfr) and incremental dumps against a specific prior version (ixfr). [Terminology here borrowed from DNS, as is much of the protocol design.] Finally, cronjob mode -sends wakeup messages to any active servers, so that they can notify -their clients that a new version is available. +kicks any active servers, so that they can notify their clients that a +new version is available. server mode implements the server side of the rpkk-router protocol. Other than one PF_UNIX socket inode, it doesn't write anything to @@ -201,7 +201,9 @@ class serial_query(pdu_with_serial): transfer. """ log(self) - if int(server.get_serial()) == self.serial: + if server.get_serial() is None: + server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Sorry, I have no current data to give you")) + elif int(server.current_serial) == self.serial: log("[Client is already current, sending empty IXFR]") server.push_pdu(cache_response()) server.push_pdu(end_of_data(serial = server.current_serial)) @@ -226,15 +228,15 @@ class reset_query(pdu_empty): log(self) if server.get_serial() is None: server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Sorry, I have no current data to give you")) - return - try: - fn = "%s.ax" % server.current_serial - f = open(fn, "rb") - server.push_pdu(cache_response()) - server.push_file(f) - server.push_pdu(end_of_data(serial = server.current_serial)) - except IOError: - server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Couldn't open %s" % fn)) + else: + try: + fn = "%s.ax" % server.current_serial + f = open(fn, "rb") + server.push_pdu(cache_response()) + server.push_file(f) + server.push_pdu(end_of_data(serial = server.current_serial)) + except IOError: + server.push_pdu(error_report(errno = 666, errpdu = self, errmsg = "Couldn't open %s" % fn)) class cache_response(pdu_empty): """Cache Response PDU.""" @@ -511,10 +513,8 @@ class prefix_set(list): def cronjob_main(): """Toy version of main program for cronjob. This isn't ready for - real use yet, but does most of the basic operations. Sending notify - wakeup calls to server processes is waiting for me to write server - code for this to talk to. Still needs cleanup, config file (instead - of wired in magic filenames), etc. + real use yet, but does most of the basic operations. Still needs + cleanup, config file (instead of wired in magic filenames), etc. """ axfrs = [prefix_set.load_axfr(f) for f in glob.glob("*.ax")] @@ -544,9 +544,12 @@ def cronjob_main(): print p s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - for name in glob.iglob("wakeup.*"): - print "# Notifying %s" % name - s.sendto("Hello, Polly!", name) + for name in glob.iglob("kickme.*"): + print "# Kicking %s" % name + try: + s.sendto("Hello, Polly!", name) + except: + print "# Failed to kick %s" % name s.close() class file_producer(object): @@ -559,7 +562,7 @@ class file_producer(object): def more(self): return self.handle.read(self.buffersize) -class pdu_asynchat(asynchat.async_chat): +class pdu_channel(asynchat.async_chat): """asynchat subclass that understands our PDUs. This just handles the network I/O. Specific engines (client, server) should be subclasses of this with methods that do something useful with the @@ -599,12 +602,12 @@ class pdu_asynchat(asynchat.async_chat): raise NotImplementedError def push_pdu(self, pdu): - """Write PDU to asynchat stream.""" + """Write PDU to stream.""" data = pdu.to_pdu() self.push(data) def push_file(self, f): - """Write content of a file to an asynchat stream.""" + """Write content of a file to stream.""" self.push_with_producer(file_producer(f, self.ac_out_buffer_size)) def log(self, msg): @@ -616,22 +619,19 @@ class pdu_asynchat(asynchat.async_chat): log("asynchat: %s: %s" % (tag, msg)) def handle_error(self): - """Handle errors caught by asyncore main loop. Asyncore has a - default handler for this but I find its customized backtraces very - hard to read. - """ + """Handle errors caught by asyncore main loop.""" log(traceback.format_exc()) - log("Closing channel %s" % repr(self)) - self.close() + log("Exiting after unhandled exception") + asyncore.close_all() -class server_asynchat(pdu_asynchat): - """Server protocol engine, handles upcalls from pdu_asynchat to +class server_channel(pdu_channel): + """Server protocol engine, handles upcalls from pdu_channel to implement protocol logic. """ def __init__(self): """Set up stdin as connection and start listening for first PDU.""" - pdu_asynchat.__init__(self) + pdu_channel.__init__(self) # # I don't know a sane way to get asynchat.async_chat.__init__() to # call asyncore.file_dispatcher.__init__(), so shut your eyes for @@ -655,17 +655,10 @@ class server_asynchat(pdu_asynchat): """Handle received PDU.""" pdu.serve(self) - wakeup = None - - def set_wakeup(self, wakeup): - """Record companion wakeup socket, for shutdown.""" - self.wakeup = wakeup - def handle_close(self): - """Intercept close event so we can shut down wakeup socket too.""" - if self.wakeup is not None: - self.wakeup.close() + """Intercept close event so we can shut down other sockets.""" asynchat.async_chat.handle_close(self) + asyncore.close_all() def get_serial(self): """Read, cache, and return current serial number, or None if we @@ -688,12 +681,14 @@ class server_asynchat(pdu_asynchat): return old_serial != self.get_serial() def notify(self, data = None): - """Receive a wakeup from cronjob instance.""" + """Cronjob instance kicked us, send a notify message.""" if self.check_serial(): self.push_pdu(serial_notify(serial = self.current_serial)) + else: + log("Cronjob kicked me without a valid current serial number") -class client_asynchat(pdu_asynchat): - """Client protocol engine, handles upcalls from pdu_asynchat.""" +class client_channel(pdu_channel): + """Client protocol engine, handles upcalls from pdu_channel.""" current_serial = None @@ -707,7 +702,7 @@ class client_asynchat(pdu_asynchat): self.ssh = subprocess.Popen(["/usr/local/bin/python", "rtr-origin.py", "server"], stdin = s[0], stdout = s[0], close_fds = True) else: self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True) - pdu_asynchat.__init__(self, conn = s[1]) + pdu_channel.__init__(self, conn = s[1]) self.start_new_pdu() def deliver_pdu(self, pdu): @@ -726,18 +721,22 @@ class client_asynchat(pdu_asynchat): except OSError: pass -class server_wakeup(asyncore.dispatcher): - """asycnore dispatcher for server. This just handles the PF_UNIX - sockets we use to receive wakeup calls from the cronjob when it's - time for us to send a notify PDU to our client. + def handle_close(self): + """Intercept close event so we can log it.""" + log("Server closed channel") + asynchat.async_chat.handle_close(self) + +class kickme_channel(asyncore.dispatcher): + """asyncore dispatcher for the PF_UNIX socket that cronjob mode uses + to kick servers when it's time to send notify PDUs to clients. """ - def __init__(self, chat): + def __init__(self, server): asyncore.dispatcher.__init__(self) - self.my_asynchat_handle = chat - self.my_socket_filename = "wakeup.%d" % os.getpid() + self.server = server + self.sockname = "kickme.%d" % os.getpid() self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self.bind(self.my_socket_filename) + self.bind(self.sockname) def writable(self): """This socket is read-only, never writable.""" @@ -750,13 +749,13 @@ class server_wakeup(asyncore.dispatcher): def handle_read(self): """Handle receipt of a datagram.""" data = self.recv(512) - self.my_asynchat_handle.notify(data) + self.server.notify(data) def cleanup(self): """Clean up this dispatcher's socket.""" self.close() try: - os.unlink(self.my_socket_filename) + os.unlink(self.sockname) except: pass @@ -769,31 +768,29 @@ class server_wakeup(asyncore.dispatcher): log("asyncore: %s: %s" % (tag, msg)) def handle_error(self): - """Handle errors caught by asyncore main loop. Asyncore has a - default handler for this but I find its customized backtraces very - hard to read. - """ + """Handle errors caught by asyncore main loop.""" log(traceback.format_exc()) - log("Closing channel %s" % repr(self)) - self.close() + log("Exiting after unhandled exception") + asyncore.close_all() def server_main(): - """Main program for server mode. Not really written yet.""" - wakeup = None + """Main program for server mode. Server is event driven, so + everything interesting happens in the channel classes. + """ + kickme = None try: - server = server_asynchat() - wakeup = server_wakeup(chat = server) - server.set_wakeup(wakeup) + server = server_channel() + kickme = kickme_channel(server = server) asyncore.loop() finally: - if wakeup is not None: - wakeup.cleanup() + if kickme is not None: + kickme.cleanup() def client_main(): """Main program for client mode. Not really written yet.""" client = None try: - client = client_asynchat() + client = client_channel() client.push_pdu(reset_query()) period = rpki.sundial.timedelta(seconds = 90) wakeup = rpki.sundial.now() + period |