aboutsummaryrefslogtreecommitdiff
path: root/rtr-origin
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-04-02 17:43:45 +0000
committerRob Austein <sra@hactrn.net>2009-04-02 17:43:45 +0000
commit4d4928d20749c1717c3b1b1125795758d48f00fa (patch)
tree523c11706143aadffa458b0885761e8a2fd08c8a /rtr-origin
parentf9857737d3d01b99e581a4c7eb9bda1de3f6a7a9 (diff)
Checkpoint
svn path=/rtr-origin/updater.py; revision=2307
Diffstat (limited to 'rtr-origin')
-rwxr-xr-xrtr-origin/updater.py226
1 files changed, 200 insertions, 26 deletions
diff --git a/rtr-origin/updater.py b/rtr-origin/updater.py
index 428da9cc..a058f22c 100755
--- a/rtr-origin/updater.py
+++ b/rtr-origin/updater.py
@@ -23,7 +23,7 @@ OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""
-import sys, os, struct, time, glob, socket, asyncore, asynchat, subprocess
+import sys, os, struct, time, glob, socket, asyncore, asynchat, subprocess, fcntl, signal
import rpki.x509, rpki.ipaddrs, rpki.sundial
os.environ["TZ"] = "UTC"
@@ -66,19 +66,24 @@ class pdu(object):
@classmethod
def initial_asynchat_decoder(cls, chat):
"""Set up initial read for asynchat PDU reader."""
+ log("initial_asynchat_decoder()")
chat.set_terminator(cls.common_header_struct.size)
chat.set_next_decoder(cls.chat_decode_common_header)
@classmethod
def chat_decode_common_header(cls, chat, b):
"""Decode PDU header from an asynchat reader."""
+ log("chat_decode_common_header()")
assert cls._pdu is None
version, pdu_type = cls.common_header_struct.unpack(b)
assert version == cls.version, "PDU version is %d, expected %d" % (version, cls.version)
self = cls.pdu_map[pdu_type]()
- chat.set_terminator(self.header_struct.size)
- chat.set_next_decoder(self.chat_decode_header)
- return None
+ if len(b) >= self.header_struct.size:
+ return self.chat_decode_header(chat, b)
+ else:
+ chat.set_terminator(self.header_struct.size - cls.common_header_struct.size)
+ chat.set_next_decoder(self.chat_decode_header)
+ return None
class pdu_with_serial(pdu):
"""Base class for PDUs consisting of just a serial number."""
@@ -86,6 +91,7 @@ class pdu_with_serial(pdu):
header_struct = struct.Struct("!BBHL")
def __str__(self):
+ log("__str__()")
return "#%s" % self.serial
def to_pdu(self):
@@ -103,6 +109,7 @@ class pdu_with_serial(pdu):
def chat_decode_header(self, chat, b):
"""Decode PDU from an asynchat reader."""
+ log("chat_decode_header()")
version, pdu_type, zero, self.serial = self.header_struct.unpack(b)
assert zero == 0
assert b == self.to_pdu()
@@ -128,6 +135,7 @@ class pdu_empty(pdu):
def chat_decode_header(self, chat, b):
"""Decode PDU from an asynchat reader."""
+ log("chat_decode_header()")
version, pdu_type, zero = self.header_struct.unpack(b)
assert zero == 0
assert b == self.to_pdu()
@@ -143,8 +151,16 @@ class serial_query(pdu_with_serial):
class reset_query(pdu_empty):
"""Reset Query PDU."""
+
pdu_type = 2
+ def serve(self, chat):
+ """Received a reset query, send full current state in response."""
+ f = open("current", "r")
+ current = f.read().strip() + ".ax"
+ f.close()
+ chat.push_file(open(current, "rb"))
+
class cache_response(pdu_empty):
"""Cache Response PDU."""
pdu_type = 3
@@ -188,6 +204,7 @@ class prefix(pdu):
return self
def __str__(self):
+ log("__str__()")
plm = "%s/%s-%s" % (self.prefix, self.prefixlen, self.max_prefixlen)
return "%s %8s %-32s %s" % ("+" if self.announce else "-", self.asn, plm, ":".join(("%02X" % ord(b) for b in self.to_pdu())))
@@ -239,17 +256,19 @@ class prefix(pdu):
def chat_decode_header(self, chat, b):
"""Decode PDU header from an asynchat reader."""
+ log("chat_decode_header()")
version, pdu_type, self.color, self.announce, self.prefixlen, self.max_prefixlen, source = self.header_struct.unpack(b)
assert source == self.source
- chat.clear_ibuf()
+ chat.consume(self.header_struct.size)
chat.set_terminator(self.addr_type.bits / 8)
chat.set_next_decoder(self.chat_decode_prefix)
return None
def chat_decode_prefix(self, chat, b):
"""Decode prefix from an asynchat reader."""
+ log("chat_decode_prefix()")
self.prefix = self.addr_type.from_bytes(b)
- chat.clear_ibuf()
+ chat.consume(self.addr_type.bits / 8)
chat.set_terminator(self.asnum_struct.size)
chat.set_next_decoder(self.chat_decode_asnum)
return None
@@ -282,6 +301,7 @@ class error_report(pdu):
errmsg = ""
def __str__(self):
+ log("__str__()")
return "#%s: %s" % (self.errno, self.errmsg)
def to_pdu(self):
@@ -464,6 +484,7 @@ class file_producer(object):
self.buffersize = buffersize
def more(self):
+ log("more()")
return self.handle.read(self.buffersize)
class pdu_asynchat(asynchat.async_chat):
@@ -475,31 +496,36 @@ class pdu_asynchat(asynchat.async_chat):
def start_new_pdu(self):
"""Starting read of a new PDU, set up initial decoder."""
- self.clear_ibuf()
+ log("start_new_pdu()")
+ self.buffer = ""
self.next_decoder = None
pdu.initial_asynchat_decoder(self)
assert self.next_decoder is not None
- def clear_ibuf(self):
- """Clear the input buffer."""
- self.ibuf = ""
+ def consume(self, n):
+ """Consume n bytes from the input buffer."""
+ log("consume()")
+ self.buffer = self.buffer[n:]
def collect_incoming_data(self, data):
"""Collect data into the input buffer."""
- self.ibuf += data
+ log("collect_incoming_data()")
+ self.buffer += data
def set_next_decoder(self, decoder):
"""Set decoder to use with the next chunk of data."""
+ log("set_next_decoder()")
self.next_decoder = decoder
def found_terminator(self):
"""Got requested data, hand it to decoder. If we get back a PDU,
pass it up, then loop back to listen for another PDU.
"""
- pdu = self.next_decoder(self, ibuf)
+ log("found_terminator()")
+ pdu = self.next_decoder(self, self.buffer)
if pdu is not None:
self.deliver_pdu(pdu)
- self.start_next_pdu()
+ self.start_new_pdu()
def deliver_pdu(self, pdu):
"""Subclass must implement this."""
@@ -507,12 +533,72 @@ class pdu_asynchat(asynchat.async_chat):
def push_pdu(self, pdu):
"""Write PDU to asynchat stream."""
- self.push(pdu.to_pdu())
+ data = pdu.to_pdu()
+ log("push_pdu(%d)" % len(data))
+ self.push(data)
def push_file(self, f):
"""Write content of a file to an asynchat stream."""
+ log("push_file()")
self.push_with_producer(file_producer(f, self.ac_out_buffer_size))
+ def initiate_send(self):
+ """DEBUGGING KLUDGE"""
+ log("initiate_send()")
+ asynchat.async_chat.initiate_send(self)
+
+ def refill_buffer(self):
+ """DEBUGGING KLUDGE"""
+ log("refill_buffer()")
+ asynchat.async_chat.refill_buffer(self)
+
+ def send(self, data):
+ """DEBUGGING KLUDGE"""
+ log("send(%s)" % repr(data))
+ ret = asynchat.async_chat.send(self, data)
+ log("send(): %s" % repr(ret))
+ return ret
+
+ def recv(self, size):
+ """DEBUGGING KLUDGE"""
+ log("recv(%d)" % size)
+ ret = asynchat.async_chat.recv(self, size)
+ log("recv(): %s" % repr(ret))
+ return ret
+
+ def readable(self):
+ """DEBUGGING KLUDGE"""
+ log("readable()")
+ return asynchat.async_chat.readable(self)
+
+ def handle_read_event(self):
+ """DEBUGGING KLUDGE"""
+ log("handle_read_event()")
+ asynchat.async_chat.handle_read_event(self)
+
+ def __getattr__(self, attr):
+ """DEBUGGING KLUDGE"""
+ log("__getattr__(%s, %s)" % (repr(self), repr(attr)))
+ ret = asynchat.async_chat.__getattr__(self, attr)
+ log("__getattr__(): %s" % repr(ret))
+ return ret
+
+ def __repr__(self):
+ """DEBUGGING KLUDGE"""
+ return asyncore.dispatcher.__repr__(self)
+
+ def __strr__(self):
+ """DEBUGGING KLUDGE"""
+ return asyncore.dispatcher.__repr__(self)
+
+ def log(self, message):
+ """Intercept asyncore's logging."""
+ log("asyncore: %s" % message)
+
+ def log_info(self, message, type = "info"):
+ """Intercept asyncore's logging."""
+ log("asyncore[%s]: %s" % (type, message))
+
class server_asynchat(pdu_asynchat):
"""Server protocol engine, handles upcalls from pdu_asynchat to
implement protocol logic.
@@ -520,26 +606,53 @@ class server_asynchat(pdu_asynchat):
def __init__(self):
"""Set up stdin as connection and start listening for first PDU."""
- asynchat.async_chat.__init__(self, conn = sys.stdin)
+ log("server_asynchat.__init__()")
+ asynchat.async_chat.__init__(self)
+ #
+ # I don't know a sane way to get asynchat.async_chat.__init__() to
+ # call asyncore.file_dispatcher.__init__(), so shut your eyes for
+ # a moment while I cut and paste.
+ #
+ fd = sys.stdin.fileno()
+ self.connected = True
+ self._fileno = fd
+ self.socket = asyncore.file_wrapper(fd)
+ self.add_channel()
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+ #
+ # Ok, you can look again now.
+ #
self.start_new_pdu()
+ log("server_asynchat.__init__(%s)" % repr(self))
def deliver_pdu(self, pdu):
"""Handle received PDU."""
- pdu.handle_request(self)
+ log("deliver_pdu(%s)" % pdu)
+ pdu.serve(self)
-class clientr_asynchat(pdu_asynchat):
+class client_asynchat(pdu_asynchat):
"""Client protocol engine, handles upcalls from pdu_asynchat."""
def __init__(self, *sshargs):
"""Set up ssh connection and start listening for first PDU."""
s = socket.socketpair()
- self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True)
+ if False:
+ self.ssh = subprocess.Popen(sshargs, executable = "/usr/bin/ssh", stdin = s[0], stdout = s[0], close_fds = True)
+ else:
+ print "[Ignoring arguments, using direct socket loopback kludge for testing]"
+ self.ssh = subprocess.Popen(["/usr/local/bin/python", "updater.py", "server"], stdin = s[0], stdout = s[0], close_fds = True)
asynchat.async_chat.__init__(self, conn = s[1])
self.start_new_pdu()
def deliver_pdu(self, pdu):
"""Handle received PDU. For now, just print it."""
- pdu.pprint()
+ log("deliver_pdu(%s)" % pdu)
+
+ def cleanup(self):
+ """Clean up this chat session's child process."""
+ os.kill(self.ssh.pid, signal.SIGTERM)
class server_wakeup(asyncore.dispatcher):
"""asycnore dispatcher for server. This just handles the PF_UNIX
@@ -547,15 +660,76 @@ class server_wakeup(asyncore.dispatcher):
time for us to send a notify PDU to our client.
"""
+ def __init__(self, chat):
+ asyncore.dispatcher.__init__(self)
+ self.my_asynchat_handle = chat
+ self.my_socket_filename = "wakeup.%d" % os.getpid()
+ self.create_socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self.bind(self.my_socket_filename)
+ self.listen(5)
+
+ def writable(self):
+ """This socket is read-only, never writable."""
+ return False
+
+ def handle_read(self):
+ """Handle receipt of a datagram."""
+ log("handle_read()")
+ self.my_asynchat_handle.notify(self.recv(512))
+
+ def cleanup(self):
+ """Clean up this dispatcher's socket."""
+ log("wakeup.close()")
+ self.close()
+ try:
+ os.unlink(self.my_socket_filename)
+ except:
+ pass
+
def server_main():
"""Main program for server mode. Not really written yet."""
- server = server_asynchat(blah = blah)
- wakeup = server_wakeup(blee = blee)
- asyncore.loop()
+ wakeup = None
+ try:
+ log("starting chat")
+ server = server_asynchat()
+ log("chat setup got %s" % repr(server))
+ log("chat connected: %s" % server.connected)
+ log("starting wakeup")
+ wakeup = server_wakeup(chat = server)
+ log("wakeup setup got %s" % repr(wakeup))
+ log("looping")
+ asyncore.loop()
+ finally:
+ log("finally")
+ if wakeup is not None:
+ wakeup.cleanup()
def client_main():
"""Main program for client mode. Not really written yet."""
- raise NotImplementedError
-
-if __name__ == "__main__":
- updater_main()
+ client = None
+ try:
+ client = client_asynchat()
+ log("chat connected: %s" % client.connected)
+ log("sleeping...")
+ time.sleep(2)
+ client.push_pdu(reset_query())
+ log("chat connected: %s" % client.connected)
+ asyncore.loop()
+ finally:
+ if client is not None:
+ client.cleanup()
+
+def log(msg):
+ """Logging hack, debugging code only, clean up later..."""
+ sys.stderr.write(("[%s] " % jane) + msg + "\n")
+
+if len(sys.argv) == 1:
+ jane = "client"
+else:
+ assert len(sys.argv) == 2
+ jane = sys.argv[1]
+
+{ "updater" : updater_main,
+ "client" : client_main,
+ "server" : server_main,
+ }[jane]()