diff options
-rw-r--r-- | scripts/async-http.py | 121 |
1 files changed, 78 insertions, 43 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py index 3b196b1e..f783a7d9 100644 --- a/scripts/async-http.py +++ b/scripts/async-http.py @@ -131,12 +131,6 @@ class http_stream(asynchat.async_chat): assert not self.buffer self.set_terminator("\r\n\r\n") - def reading_headers(self): - return isinstance(self.get_terminator(), str) - - def reading_body(self): - return isinstance(self.get_terminator(), int) - def collect_incoming_data(self, data): """Buffer the data""" self.buffer.append(data) @@ -147,94 +141,137 @@ class http_stream(asynchat.async_chat): return val def found_terminator(self): - if self.reading_headers(): + if isinstance(self.get_terminator(), str): return self.handle_headers() - if self.reading_body(): + else: return self.handle_body() - raise RuntimeError def handle_body(self): self.msg.body = self.get_buffer() - assert len(self.msg.body) == int(self.msg.headers["Content-Length"]) + #assert len(self.msg.body) == int(self.msg.headers["Content-Length"]) self.handle_message() def handle_error(self): + print "[Error]" print traceback.format_exc() asyncore.close_all() class http_server(http_stream): def handle_headers(self): + print "[Got headers]" self.msg = http_request.parse_from_wire(self.get_buffer()) if self.msg.cmd == "POST": + print "[Waiting for POST body]" self.set_terminator(int(self.msg.headers["Content-Length"])) else: self.handle_message() def handle_message(self): + print "[Got message]" print self.msg self.push(http_response(code = 200, reason = "OK", body = self.msg.format(), Content_Type = "text/plain").format()) if False: + print "[Closing]" self.close_when_done() else: + print "[Listening for next message]" self.restart() def handle_close(self): + print "[Closing all connections]" + asyncore.close_all() + +class http_listener(asyncore.dispatcher): + + def __init__(self, port): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + self.bind(("", port)) + self.listen(5) + print "[Listening on port %s]" % port + + def handle_accept(self): + print "[Accepting connection]" + server = http_server(self.accept()[0]) + + def handle_error(self): + print "[Error]" + print traceback.format_exc() asyncore.close_all() class http_client(http_stream): + def __init__(self, orator, hostport, msg = None): + http_stream.__init__(self) + self.orator = orator + self.message_queue = [] + if msg is not None: + self.queue_message(msg) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(hostport) + def handle_headers(self): self.msg = http_response.parse_from_wire(self.get_buffer()) - self.set_terminator(int(self.msg.headers["Content-Length"])) + if "Content-Length" in self.msg.headers: + self.set_terminator(int(self.msg.headers["Content-Length"])) + else: + self.set_terminator(None) def handle_message(self): + print "[Got message]" print self.msg self.next_msg() def handle_connect(self): + print "[Connected]" self.next_msg() - @classmethod - def queue_messages(cls, msgs): - self = cls() - self.msgs = msgs - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(("", 8000)) - + def queue_message(self, msg): + print "[Adding message to queue]" + self.message_queue.append(msg) + def next_msg(self): - if self.msgs: - self.push(self.msgs.pop(0).format()) + if self.message_queue: + print "[Pulling next message from queue]" + self.push(self.message_queue.pop(0).format()) self.restart() else: + print "[No messages left in queue]" self.close_when_done() -class http_listener(asyncore.dispatcher): + def handle_close(self): + if self.get_terminator() is None: + self.found_terminator() - def __init__(self): - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - self.bind(("", 8000)) - self.listen(5) +class http_orator(dict): - def handle_accept(self): - server = http_server(self.accept()[0]) + def query(self, url, body = None): - def handle_error(self): - print traceback.format_exc() - asyncore.close_all() + u = urlparse.urlparse(url) -assert len(sys.argv) in (1, 2) + assert u.scheme == "http" + assert u.username is None + assert u.password is None + assert u.params == "" + assert u.query == "" + assert u.fragment == "" -cmd = None if len(sys.argv) == 1 else sys.argv[1].upper() + request = http_request(cmd = "POST", path = u.path, body = body, Content_Type = "text/plain") + hostport = (u.hostname or "localhost", u.port or 80) -#cmd = "POST" + if hostport not in self: + print "[Creating new connection]" + self[hostport] = http_client(self, hostport, request) + else: + print "[Reusing existing connection]" + self[hostport].queue_message(request) -if cmd is None: +if len(sys.argv) == 1: - listener = http_listener() + listener = http_listener(port = 8000) else: @@ -242,10 +279,8 @@ else: # connections properly. For the moment this is just a test to see # whether the parser can survive multiple messages. - client = http_client.queue_messages([ - http_request(cmd = cmd, path = "/", body = "Hi, Mom!\r\n", Content_Type = "text/plain"), - http_request(cmd = cmd, path = "/", body = "Hi, Dad!\r\n", Content_Type = "text/plain"), - http_request(cmd = cmd, path = "/", body = "Hi, Bro!\r\n", Content_Type = "text/plain"), - http_request(cmd = cmd, path = "/", body = "Hi, Sis!\r\n", Content_Type = "text/plain") ]) + orator = http_orator() + for url in sys.argv[1:]: + orator.query(url = url, body = "Hi, I'm trying to talk to URL %s" % url) asyncore.loop() |