aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/async-http.py121
1 files changed, 78 insertions, 43 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py
index 3b196b1e..f783a7d9 100644
--- a/scripts/async-http.py
+++ b/scripts/async-http.py
@@ -131,12 +131,6 @@ class http_stream(asynchat.async_chat):
assert not self.buffer
self.set_terminator("\r\n\r\n")
- def reading_headers(self):
- return isinstance(self.get_terminator(), str)
-
- def reading_body(self):
- return isinstance(self.get_terminator(), int)
-
def collect_incoming_data(self, data):
"""Buffer the data"""
self.buffer.append(data)
@@ -147,94 +141,137 @@ class http_stream(asynchat.async_chat):
return val
def found_terminator(self):
- if self.reading_headers():
+ if isinstance(self.get_terminator(), str):
return self.handle_headers()
- if self.reading_body():
+ else:
return self.handle_body()
- raise RuntimeError
def handle_body(self):
self.msg.body = self.get_buffer()
- assert len(self.msg.body) == int(self.msg.headers["Content-Length"])
+ #assert len(self.msg.body) == int(self.msg.headers["Content-Length"])
self.handle_message()
def handle_error(self):
+ print "[Error]"
print traceback.format_exc()
asyncore.close_all()
class http_server(http_stream):
def handle_headers(self):
+ print "[Got headers]"
self.msg = http_request.parse_from_wire(self.get_buffer())
if self.msg.cmd == "POST":
+ print "[Waiting for POST body]"
self.set_terminator(int(self.msg.headers["Content-Length"]))
else:
self.handle_message()
def handle_message(self):
+ print "[Got message]"
print self.msg
self.push(http_response(code = 200, reason = "OK", body = self.msg.format(), Content_Type = "text/plain").format())
if False:
+ print "[Closing]"
self.close_when_done()
else:
+ print "[Listening for next message]"
self.restart()
def handle_close(self):
+ print "[Closing all connections]"
+ asyncore.close_all()
+
+class http_listener(asyncore.dispatcher):
+
+ def __init__(self, port):
+ asyncore.dispatcher.__init__(self)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ self.bind(("", port))
+ self.listen(5)
+ print "[Listening on port %s]" % port
+
+ def handle_accept(self):
+ print "[Accepting connection]"
+ server = http_server(self.accept()[0])
+
+ def handle_error(self):
+ print "[Error]"
+ print traceback.format_exc()
asyncore.close_all()
class http_client(http_stream):
+ def __init__(self, orator, hostport, msg = None):
+ http_stream.__init__(self)
+ self.orator = orator
+ self.message_queue = []
+ if msg is not None:
+ self.queue_message(msg)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect(hostport)
+
def handle_headers(self):
self.msg = http_response.parse_from_wire(self.get_buffer())
- self.set_terminator(int(self.msg.headers["Content-Length"]))
+ if "Content-Length" in self.msg.headers:
+ self.set_terminator(int(self.msg.headers["Content-Length"]))
+ else:
+ self.set_terminator(None)
def handle_message(self):
+ print "[Got message]"
print self.msg
self.next_msg()
def handle_connect(self):
+ print "[Connected]"
self.next_msg()
- @classmethod
- def queue_messages(cls, msgs):
- self = cls()
- self.msgs = msgs
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(("", 8000))
-
+ def queue_message(self, msg):
+ print "[Adding message to queue]"
+ self.message_queue.append(msg)
+
def next_msg(self):
- if self.msgs:
- self.push(self.msgs.pop(0).format())
+ if self.message_queue:
+ print "[Pulling next message from queue]"
+ self.push(self.message_queue.pop(0).format())
self.restart()
else:
+ print "[No messages left in queue]"
self.close_when_done()
-class http_listener(asyncore.dispatcher):
+ def handle_close(self):
+ if self.get_terminator() is None:
+ self.found_terminator()
- def __init__(self):
- asyncore.dispatcher.__init__(self)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.bind(("", 8000))
- self.listen(5)
+class http_orator(dict):
- def handle_accept(self):
- server = http_server(self.accept()[0])
+ def query(self, url, body = None):
- def handle_error(self):
- print traceback.format_exc()
- asyncore.close_all()
+ u = urlparse.urlparse(url)
-assert len(sys.argv) in (1, 2)
+ assert u.scheme == "http"
+ assert u.username is None
+ assert u.password is None
+ assert u.params == ""
+ assert u.query == ""
+ assert u.fragment == ""
-cmd = None if len(sys.argv) == 1 else sys.argv[1].upper()
+ request = http_request(cmd = "POST", path = u.path, body = body, Content_Type = "text/plain")
+ hostport = (u.hostname or "localhost", u.port or 80)
-#cmd = "POST"
+ if hostport not in self:
+ print "[Creating new connection]"
+ self[hostport] = http_client(self, hostport, request)
+ else:
+ print "[Reusing existing connection]"
+ self[hostport].queue_message(request)
-if cmd is None:
+if len(sys.argv) == 1:
- listener = http_listener()
+ listener = http_listener(port = 8000)
else:
@@ -242,10 +279,8 @@ else:
# connections properly. For the moment this is just a test to see
# whether the parser can survive multiple messages.
- client = http_client.queue_messages([
- http_request(cmd = cmd, path = "/", body = "Hi, Mom!\r\n", Content_Type = "text/plain"),
- http_request(cmd = cmd, path = "/", body = "Hi, Dad!\r\n", Content_Type = "text/plain"),
- http_request(cmd = cmd, path = "/", body = "Hi, Bro!\r\n", Content_Type = "text/plain"),
- http_request(cmd = cmd, path = "/", body = "Hi, Sis!\r\n", Content_Type = "text/plain") ])
+ orator = http_orator()
+ for url in sys.argv[1:]:
+ orator.query(url = url, body = "Hi, I'm trying to talk to URL %s" % url)
asyncore.loop()