diff options
author | Rob Austein <sra@hactrn.net> | 2009-04-28 02:56:35 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-04-28 02:56:35 +0000 |
commit | 7883f1c47288a355b04cb489b7301bb80cb0c60f (patch) | |
tree | 8132af200a6360ee67e0fe0d6f41b207c30f18d2 /scripts | |
parent | 83d009e58918a2584b1ddf7aa4faa093ad294424 (diff) |
Rework request queuing mechanism to work whether connection is
persistent or not. Well, mostly. Probably not robust in case of
unscheduled close yet.
svn path=/scripts/async-http.py; revision=2364
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/async-http.py | 99 |
1 files changed, 50 insertions, 49 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py index 05b57d4f..a3097863 100644 --- a/scripts/async-http.py +++ b/scripts/async-http.py @@ -22,7 +22,6 @@ PERFORMANCE OF THIS SOFTWARE. # # lynx -post_data -mime_header -source http://127.0.0.1:8000/ - import sys, os, time, socket, asyncore, asynchat, traceback, urlparse class http_message(object): @@ -92,7 +91,7 @@ class http_message(object): elif self.version == (1,0): return c is not None and "keep-alive" in c.lower() else: - raise RuntimeError, "Version is neither 1.0 nor 1.1" + return False class http_request(http_message): @@ -161,37 +160,37 @@ class http_stream(asynchat.async_chat): self.handle_message() def handle_error(self): - print "[Error]" + if debug: print "[Error]" print traceback.format_exc() asyncore.close_all() class http_server(http_stream): def handle_headers(self): - print "[Got headers]" + if debug: print "[Got headers]" self.msg = http_request.parse_from_wire(self.get_buffer()) if self.msg.cmd == "POST": - print "[Waiting for POST body]" + if debug: print "[Waiting for POST body]" self.set_terminator(int(self.msg.headers["Content-Length"])) else: self.handle_message() def handle_message(self): - print "[Got message]" - print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") + if debug: print "[Got message]" + if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") print self.msg self.push(http_response(code = 200, reason = "OK", body = self.msg.format(), Connection = "Keep-Alive" if self.msg.persistent() else "Close", Content_Type = "text/plain").format()) if self.msg.persistent(): - print "[Listening for next message]" + if debug: print "[Listening for next message]" self.restart() else: - print "[Closing]" + if debug: print "[Closing]" self.close_when_done() def handle_close(self): - print "[Closing all connections]" + if debug: print "[Closing all connections]" asyncore.close_all() class http_listener(asyncore.dispatcher): @@ -203,25 +202,24 @@ class http_listener(asyncore.dispatcher): self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self.bind(("", port)) self.listen(5) - print "[Listening on port %s]" % port + if debug: print "[Listening on port %s]" % port def handle_accept(self): - print "[Accepting connection]" + if debug: print "[Accepting connection]" server = http_server(self.accept()[0]) def handle_error(self): - print "[Error]" + if debug: print "[Error]" print traceback.format_exc() asyncore.close_all() class http_client(http_stream): - def __init__(self, orator, hostport, msg = None): + def __init__(self, orator, hostport): + if debug: print "[Creating new connection]" http_stream.__init__(self) self.orator = orator - self.message_queue = [] - if msg is not None: - self.queue_message(msg) + self.hostport = hostport self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(hostport) @@ -233,59 +231,62 @@ class http_client(http_stream): self.set_terminator(None) def handle_message(self): - print "[Got message]" - print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") + if debug: print "[Got message]" + if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") print self.msg - self.next_msg() + self.next_msg(first = False) def handle_connect(self): - print "[Connected]" - self.next_msg() + if debug: print "[Connected]" + self.next_msg(first = True) def queue_message(self, msg): - print "[Adding message to queue]" + if debug: print "[Adding message to queue]" self.message_queue.append(msg) - def next_msg(self): - if self.message_queue: - try: - if not self.msg.persistent(): - raise RuntimeError, "Attempting to send subsequent message to non-persistent connection" - except AttributeError: - pass - print "[Pulling next message from queue]" - self.push(self.message_queue.pop(0).format()) + def next_msg(self, first): + msg = self.orator.next_msg(self.hostport, first or self.msg.persistent()) + if msg is not None: + if debug: print "[Got a new message to send from my queue]" + self.push(msg.format()) self.restart() else: - print "[No messages left in queue]" + if debug: print "[No messages left in queue]" self.close_when_done() def handle_close(self): if self.get_terminator() is None: self.found_terminator() -class http_orator(dict): +class http_orator(object): - def query(self, url, body = None): + def __init__(self): + self.clients = {} + self.queues = {} + def query(self, url, body = None): u = urlparse.urlparse(url) - - assert u.scheme == "http" - assert u.username is None - assert u.password is None - assert u.params == "" - assert u.query == "" - assert u.fragment == "" - + assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == "" request = http_request(cmd = "POST", path = u.path, body = body, Content_Type = "text/plain", Connection = "Keep-Alive") hostport = (u.hostname or "localhost", u.port or 80) - - if hostport not in self: - print "[Creating new connection]" - self[hostport] = http_client(self, hostport, request) + if hostport in self.queues: + self.queues[hostport].append(request) else: - print "[Reusing existing connection]" - self[hostport].queue_message(request) + self.queues[hostport] = [request] + if hostport not in self.clients: + self.clients[hostport] = http_client(self, hostport) + + def next_msg(self, hostport, usable): + queue = self.queues.get(hostport) + if queue and not usable: + self.clients[hostport] = http_client(self, hostport) + if queue and usable: + if debug: print "[Reusing existing connection]" + return queue.pop(0) + else: + return None + +debug = False if len(sys.argv) == 1: |