diff options
author | Rob Austein <sra@hactrn.net> | 2009-04-02 17:43:45 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-04-02 17:43:45 +0000 |
commit | 4d4928d20749c1717c3b1b1125795758d48f00fa (patch) | |
tree | 523c11706143aadffa458b0885761e8a2fd08c8a /rtr-origin | |
parent | f9857737d3d01b99e581a4c7eb9bda1de3f6a7a9 (diff) |
Checkpoint
svn path=/rtr-origin/updater.py; revision=2307
Diffstat (limited to 'rtr-origin')
-rwxr-xr-x | rtr-origin/updater.py | 226 |
1 files changed, 200 insertions, 26 deletions
diff --git a/rtr-origin/updater.py b/rtr-origin/updater.py index 428da9cc..a058f22c 100755 --- a/rtr-origin/updater.py +++ b/rtr-origin/updater.py @@ -23,7 +23,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """ -import sys, os, struct, time, glob, socket, asyncore, asynchat, subprocess +import sys, os, struct, time, glob, socket, asyncore, asynchat, subprocess, fcntl, signal import rpki.x509, rpki.ipaddrs, rpki.sundial os.environ["TZ"] = "UTC" @@ -66,19 +66,24 @@ class pdu(object): @classmethod def initial_asynchat_decoder(cls, chat): """Set up initial read for asynchat PDU reader.""" + log("initial_asynchat_decoder()") chat.set_terminator(cls.common_header_struct.size) chat.set_next_decoder(cls.chat_decode_common_header) @classmethod def chat_decode_common_header(cls, chat, b): """Decode PDU header from an asynchat reader.""" + log("chat_decode_common_header()") assert cls._pdu is None version, pdu_type = cls.common_header_struct.unpack(b) assert version == cls.version, "PDU version is %d, expected %d" % (version, cls.version) self = cls.pdu_map[pdu_type]() - chat.set_terminator(self.header_struct.size) - chat.set_next_decoder(self.chat_decode_header) - return None + if len(b) >= self.header_struct.size: + return self.chat_decode_header(chat, b) + else: + chat.set_terminator(self.header_struct.size - cls.common_header_struct.size) + chat.set_next_decoder(self.chat_decode_header) + return None class pdu_with_serial(pdu): """Base class for PDUs consisting of just a serial number.""" @@ -86,6 +91,7 @@ class pdu_with_serial(pdu): header_struct = struct.Struct("!BBHL") def __str__(self): + log("__str__()") return "#%s" % self.serial def to_pdu(self): @@ -103,6 +109,7 @@ class pdu_with_serial(pdu): def chat_decode_header(self, chat, b): """Decode PDU from an asynchat reader.""" + log("chat_decode_header()") version, pdu_type, zero, self.serial = self.header_struct.unpack(b) assert zero == 0 assert b == self.to_pdu() @@ -128,6 +135,7 @@ class pdu_empty(pdu): def chat_decode_header(self, chat, b): """Decode PDU from an asynchat reader.""" + log("chat_decode_header()") version, pdu_type, zero = self.header_struct.unpack(b) assert zero == 0 assert b == self.to_pdu() @@ -143,8 +151,16 @@ class serial_query(pdu_with_serial): class reset_query(pdu_empty): """Reset Query PDU.""" + pdu_type = 2 + def serve(self, chat): + """Received a reset query, send full current state in response.""" + f = open("current", "r") + current = f.read().strip() + ".ax" + f.close() + chat.push_file(open(current, "rb")) + class cache_response(pdu_empty): """Cache Response PDU.""" pdu_type = 3 @@ -188,6 +204,7 @@ class prefix(pdu): return self def __str__(self): + log("__str__()") plm = "%s/%s-%s" % (self.prefix, self.prefixlen, self.max_prefixlen) return "%s %8s %-32s %s" % ("+" if self.announce else "-", self.asn, plm, ":".join(("%02X" % ord(b) for b in self.to_pdu()))) @@ -239,17 +256,19 @@ class prefix(pdu): def chat_decode_header(self, chat, b): """Decode PDU header from an asynchat reader.""" + log("chat_decode_header()") version, pdu_type, self.color, self.announce, self.prefixlen, self.max_prefixlen, source = self.header_struct.unpack(b) assert source == self.source - chat.clear_ibuf() + chat.consume(self.header_struct.size) chat.set_terminator(self.addr_type.bits / 8) chat.set_next_decoder(self.chat_decode_prefix) return None def chat_decode_prefix(self, chat, b): """Decode prefix from an asynchat reader.""" + log("chat_decode_prefix()") self.prefix = self.addr_type.from_bytes(b) - chat.clear_ibuf() + chat.consume(self.addr_type.bits / 8) chat.set_terminator(self.asnum_struct.size) chat.set_next_decoder(self.chat_decode_asnum) return None @@ -282,6 +301,7 @@ class error_report(pdu): errmsg = "" def __str__(self): + log("__str__()") return "#%s: %s" % (self.errno, self.errmsg) def to_pdu(self): @@ -464,6 +484,7 @@ class file_producer(object): self.buffersize = buffersize def more(self): + log("more()") return self.handle.read(self.buffersize) class pdu_asynchat(asynchat.async_chat): @@ -475,31 +496,36 @@ class pdu_asynchat(asynchat.async_chat): def start_new_pdu(self): """Starting read of a new PDU, set up initial decoder.""" - self.clear_ibuf() + log("start_new_pdu()") + self.buffer = "" self.next_decoder = None pdu.initial_asynchat_decoder(self) assert self.next_decoder is not None - def clear_ibuf(self): - """Clear the input buffer.""" - self.ibuf = "" + def consume(self, n): + """Consume n bytes from the input buffer.""" + log("consume()") + self.buffer = self.buffer[n:] def collect_incoming_data(self, data): """Collect data into the input buffer.""" - self.ibuf += data + log("collect_incoming_data()") + self.buffer += data def set_next_decoder(self, decoder): """Set decoder to use with the next chunk of data.""" + log("set_next_decoder()") self.next_decoder = decoder def found_terminator(self): """Got requested data, hand it to decoder. If we get back a PDU, pass it up, then loop back to listen for another PDU. """ - pdu = self.next_decoder(self, ibuf) + log("found_terminator()") + pdu = self.next_decoder(self, self.buffer) if pdu is not None: self.deliver_pdu(pdu) - self.start_next_pdu() + self.start_new_pdu() def deliver_pdu(self, pdu): """Subclass must implement this.""" @@ -507,12 +533,72 @@ class pdu_asynchat(asynchat.async_chat): def push_pdu(self, pdu): """Write PDU to asynchat stream.""" - self.push(pdu.to_pdu()) + data = pdu.to_pdu() + log("push_pdu(%d)" % len(data)) + self.push(data) def push_file(self, f): """Write content of a file to an asynchat stream.""" + log("push_file()") self.push_with_producer(file_producer(f, self.ac_out_buffer_size)) + def initiate_send(self): + """DEBUGGING KLUDGE""" + log("initiate_send()") + asynchat.async_chat.initiate_send(self) + + def refill_buffer(self): + """DEBUGGING KLUDGE""" + log("refill_buffer()") + asynchat.async_chat.refill_buffer(self) + + def send(self, data): + """DEBUGGING KLUDGE""" + log("send(%s)" % repr(data)) + ret = asynchat.async_chat.send(self, data) + log("send(): %s" % repr(ret)) + return ret + + def recv(self, size): + """DEBUGGING KLUDGE""" + log("recv(%d)" % size) + ret = asynchat.async_chat.recv(self, size) + log("recv(): %s" % repr(ret)) + return ret + + def readable(self): + """DEBUGGING KLUDGE""" + log("readable()") + return asynchat.async_chat.readable(self) + + def handle_read_event(self): + """DEBUGGING KLUDGE""" + log("handle_read_event()") + asynchat.async_chat.handle_read_event(self) + + def __getattr__(self, attr): + """DEBUGGING KLUDGE""" + log("__getattr__(%s, %s)" % (repr(self), repr(attr))) + ret = asynchat.async_chat.__getattr__(self, attr) + log("__getattr__(): %s" % repr(ret)) + return ret + + def __repr__(self): + """DEBUGGING KLUDGE""" + return asyncore.dispatcher.__repr__(self) + + def __strr__(self): + """DEBUGGING KLUDGE""" + return asyncore.dispatcher.__repr__(self) + + def log(self, message): + """Intercept asyncore's logging.""" + log("asyncore: %s" % message) + + def log_info(self, message, type = "info"): + """Intercept asyncore's logging.""" + log("asyncore[%s]: %s" % (type, message)) + class server_asynchat(pdu_asynchat): """Server protocol engine, handles upcalls from pdu_asynchat to implement protocol logic. @@ -520,26 +606,53 @@ class server_asynchat(pdu_asynchat): def __init__(self): """Set up stdin as connection and start listening for first PDU.""" - asynchat.async_chat.__init__(self, conn = sys.stdin) + log("server_asynchat.__init__()") + asynchat.async_chat.__init__(self) + # + # I don't know a sane way to get asynchat.async_chat.__init__() to + # call asyncore.file_dispatcher.__init__(), so shut your eyes for + # a moment while I cut and paste. + # + fd = sys.stdin.fileno() + self.connected = True + self._fileno = fd + self.socket = asyncore.file_wrapper(fd) + self.add_channel() + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + # + # Ok, you can look again now. + # self.start_new_pdu() + log("server_asynchat.__init__(%s)" % repr(self)) def deliver_pdu(self, pdu): """Handle received PDU.""" - pdu.handle_request(self) + log("deliver_pdu(%s)" % pdu) + pdu.serve(self) -class clientr_asynchat(pdu_asynchat): +class client_asynchat(pdu_asynchat): """Client protocol engine, handles upcalls from pdu_asynchat.""" def __init__(self, *sshargs): """Set up ssh connection and start listening for first PDU.""" s = socket.socketpair() - self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True) + if False: + self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True) + else: + print "[Ignoring arguments, using direct socket loopback kludge for testing]" + self.ssh = subprocess.Popen(["/usr/local/bin/python", "updater.py", "server"], stdin = s[0], stdout = s[0], close_fds = True) asynchat.async_chat.__init__(self, conn = s[1]) self.start_new_pdu() def deliver_pdu(self, pdu): """Handle received PDU. For now, just print it.""" - pdu.pprint() + log("deliver_pdu(%s)" % pdu) + + def cleanup(self): + """Clean up this chat session's child process.""" + os.kill(self.ssh.pid, signal.SIGTERM) class server_wakeup(asyncore.dispatcher): """asycnore dispatcher for server. This just handles the PF_UNIX @@ -547,15 +660,76 @@ class server_wakeup(asyncore.dispatcher): time for us to send a notify PDU to our client. """ + def __init__(self, chat): + asyncore.dispatcher.__init__(self) + self.my_asynchat_handle = chat + self.my_socket_filename = "wakeup.%d" % os.getpid() + self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.bind(self.my_socket_filename) + self.listen(5) + + def writable(self): + """This socket is read-only, never writable.""" + return False + + def handle_read(self): + """Handle receipt of a datagram.""" + log("handle_read()") + self.my_asynchat_handle.notify(self.recv(512)) + + def cleanup(self): + """Clean up this dispatcher's socket.""" + log("wakeup.close()") + self.close() + try: + os.unlink(self.my_socket_filename) + except: + pass + def server_main(): """Main program for server mode. Not really written yet.""" - server = server_asynchat(blah = blah) - wakeup = server_wakeup(blee = blee) - asyncore.loop() + wakeup = None + try: + log("starting chat") + server = server_asynchat() + log("chat setup got %s" % repr(server)) + log("chat connected: %s" % server.connected) + log("starting wakeup") + wakeup = server_wakeup(chat = server) + log("wakeup setup got %s" % repr(wakeup)) + log("looping") + asyncore.loop() + finally: + log("finally") + if wakeup is not None: + wakeup.cleanup() def client_main(): """Main program for client mode. Not really written yet.""" - raise NotImplementedError - -if __name__ == "__main__": - updater_main() + client = None + try: + client = client_asynchat() + log("chat connected: %s" % client.connected) + log("sleeping...") + time.sleep(2) + client.push_pdu(reset_query()) + log("chat connected: %s" % client.connected) + asyncore.loop() + finally: + if client is not None: + client.cleanup() + +def log(msg): + """Logging hack, debugging code only, clean up later...""" + sys.stderr.write(("[%s] " % jane) + msg + "\n") + +if len(sys.argv) == 1: + jane = "client" +else: + assert len(sys.argv) == 2 + jane = sys.argv[1] + +{ "updater" : updater_main, + "client" : client_main, + "server" : server_main, + }[jane]() |