aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-04-28 02:56:35 +0000
committerRob Austein <sra@hactrn.net>2009-04-28 02:56:35 +0000
commit7883f1c47288a355b04cb489b7301bb80cb0c60f (patch)
tree8132af200a6360ee67e0fe0d6f41b207c30f18d2
parent83d009e58918a2584b1ddf7aa4faa093ad294424 (diff)
Rework request queuing mechanism to work whether connection is
persistent or not. Well, mostly. Probably not robust in case of unscheduled close yet. svn path=/scripts/async-http.py; revision=2364
-rw-r--r--scripts/async-http.py99
1 files changed, 50 insertions, 49 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py
index 05b57d4f..a3097863 100644
--- a/scripts/async-http.py
+++ b/scripts/async-http.py
@@ -22,7 +22,6 @@ PERFORMANCE OF THIS SOFTWARE.
#
# lynx -post_data -mime_header -source http://127.0.0.1:8000/
-
import sys, os, time, socket, asyncore, asynchat, traceback, urlparse
class http_message(object):
@@ -92,7 +91,7 @@ class http_message(object):
elif self.version == (1,0):
return c is not None and "keep-alive" in c.lower()
else:
- raise RuntimeError, "Version is neither 1.0 nor 1.1"
+ return False
class http_request(http_message):
@@ -161,37 +160,37 @@ class http_stream(asynchat.async_chat):
self.handle_message()
def handle_error(self):
- print "[Error]"
+ if debug: print "[Error]"
print traceback.format_exc()
asyncore.close_all()
class http_server(http_stream):
def handle_headers(self):
- print "[Got headers]"
+ if debug: print "[Got headers]"
self.msg = http_request.parse_from_wire(self.get_buffer())
if self.msg.cmd == "POST":
- print "[Waiting for POST body]"
+ if debug: print "[Waiting for POST body]"
self.set_terminator(int(self.msg.headers["Content-Length"]))
else:
self.handle_message()
def handle_message(self):
- print "[Got message]"
- print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ if debug: print "[Got message]"
+ if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
print self.msg
self.push(http_response(code = 200, reason = "OK", body = self.msg.format(),
Connection = "Keep-Alive" if self.msg.persistent() else "Close",
Content_Type = "text/plain").format())
if self.msg.persistent():
- print "[Listening for next message]"
+ if debug: print "[Listening for next message]"
self.restart()
else:
- print "[Closing]"
+ if debug: print "[Closing]"
self.close_when_done()
def handle_close(self):
- print "[Closing all connections]"
+ if debug: print "[Closing all connections]"
asyncore.close_all()
class http_listener(asyncore.dispatcher):
@@ -203,25 +202,24 @@ class http_listener(asyncore.dispatcher):
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.bind(("", port))
self.listen(5)
- print "[Listening on port %s]" % port
+ if debug: print "[Listening on port %s]" % port
def handle_accept(self):
- print "[Accepting connection]"
+ if debug: print "[Accepting connection]"
server = http_server(self.accept()[0])
def handle_error(self):
- print "[Error]"
+ if debug: print "[Error]"
print traceback.format_exc()
asyncore.close_all()
class http_client(http_stream):
- def __init__(self, orator, hostport, msg = None):
+ def __init__(self, orator, hostport):
+ if debug: print "[Creating new connection]"
http_stream.__init__(self)
self.orator = orator
- self.message_queue = []
- if msg is not None:
- self.queue_message(msg)
+ self.hostport = hostport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(hostport)
@@ -233,59 +231,62 @@ class http_client(http_stream):
self.set_terminator(None)
def handle_message(self):
- print "[Got message]"
- print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ if debug: print "[Got message]"
+ if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
print self.msg
- self.next_msg()
+ self.next_msg(first = False)
def handle_connect(self):
- print "[Connected]"
- self.next_msg()
+ if debug: print "[Connected]"
+ self.next_msg(first = True)
def queue_message(self, msg):
- print "[Adding message to queue]"
+ if debug: print "[Adding message to queue]"
self.message_queue.append(msg)
- def next_msg(self):
- if self.message_queue:
- try:
- if not self.msg.persistent():
- raise RuntimeError, "Attempting to send subsequent message to non-persistent connection"
- except AttributeError:
- pass
- print "[Pulling next message from queue]"
- self.push(self.message_queue.pop(0).format())
+ def next_msg(self, first):
+ msg = self.orator.next_msg(self.hostport, first or self.msg.persistent())
+ if msg is not None:
+ if debug: print "[Got a new message to send from my queue]"
+ self.push(msg.format())
self.restart()
else:
- print "[No messages left in queue]"
+ if debug: print "[No messages left in queue]"
self.close_when_done()
def handle_close(self):
if self.get_terminator() is None:
self.found_terminator()
-class http_orator(dict):
+class http_orator(object):
- def query(self, url, body = None):
+ def __init__(self):
+ self.clients = {}
+ self.queues = {}
+ def query(self, url, body = None):
u = urlparse.urlparse(url)
-
- assert u.scheme == "http"
- assert u.username is None
- assert u.password is None
- assert u.params == ""
- assert u.query == ""
- assert u.fragment == ""
-
+ assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
request = http_request(cmd = "POST", path = u.path, body = body, Content_Type = "text/plain", Connection = "Keep-Alive")
hostport = (u.hostname or "localhost", u.port or 80)
-
- if hostport not in self:
- print "[Creating new connection]"
- self[hostport] = http_client(self, hostport, request)
+ if hostport in self.queues:
+ self.queues[hostport].append(request)
else:
- print "[Reusing existing connection]"
- self[hostport].queue_message(request)
+ self.queues[hostport] = [request]
+ if hostport not in self.clients:
+ self.clients[hostport] = http_client(self, hostport)
+
+ def next_msg(self, hostport, usable):
+ queue = self.queues.get(hostport)
+ if queue and not usable:
+ self.clients[hostport] = http_client(self, hostport)
+ if queue and usable:
+ if debug: print "[Reusing existing connection]"
+ return queue.pop(0)
+ else:
+ return None
+
+debug = False
if len(sys.argv) == 1: