aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-04-28 02:56:35 +0000
committerRob Austein <sra@hactrn.net>2009-04-28 02:56:35 +0000
commit7883f1c47288a355b04cb489b7301bb80cb0c60f (patch)
tree8132af200a6360ee67e0fe0d6f41b207c30f18d2 /scripts
parent83d009e58918a2584b1ddf7aa4faa093ad294424 (diff)
Rework request queuing mechanism to work whether connection is
persistent or not. Well, mostly. Probably not robust in case of unscheduled close yet. svn path=/scripts/async-http.py; revision=2364
Diffstat (limited to 'scripts')
-rw-r--r--scripts/async-http.py99
1 files changed, 50 insertions, 49 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py
index 05b57d4f..a3097863 100644
--- a/scripts/async-http.py
+++ b/scripts/async-http.py
@@ -22,7 +22,6 @@ PERFORMANCE OF THIS SOFTWARE.
#
# lynx -post_data -mime_header -source http://127.0.0.1:8000/
-
import sys, os, time, socket, asyncore, asynchat, traceback, urlparse
class http_message(object):
@@ -92,7 +91,7 @@ class http_message(object):
elif self.version == (1,0):
return c is not None and "keep-alive" in c.lower()
else:
- raise RuntimeError, "Version is neither 1.0 nor 1.1"
+ return False
class http_request(http_message):
@@ -161,37 +160,37 @@ class http_stream(asynchat.async_chat):
self.handle_message()
def handle_error(self):
- print "[Error]"
+ if debug: print "[Error]"
print traceback.format_exc()
asyncore.close_all()
class http_server(http_stream):
def handle_headers(self):
- print "[Got headers]"
+ if debug: print "[Got headers]"
self.msg = http_request.parse_from_wire(self.get_buffer())
if self.msg.cmd == "POST":
- print "[Waiting for POST body]"
+ if debug: print "[Waiting for POST body]"
self.set_terminator(int(self.msg.headers["Content-Length"]))
else:
self.handle_message()
def handle_message(self):
- print "[Got message]"
- print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ if debug: print "[Got message]"
+ if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
print self.msg
self.push(http_response(code = 200, reason = "OK", body = self.msg.format(),
Connection = "Keep-Alive" if self.msg.persistent() else "Close",
Content_Type = "text/plain").format())
if self.msg.persistent():
- print "[Listening for next message]"
+ if debug: print "[Listening for next message]"
self.restart()
else:
- print "[Closing]"
+ if debug: print "[Closing]"
self.close_when_done()
def handle_close(self):
- print "[Closing all connections]"
+ if debug: print "[Closing all connections]"
asyncore.close_all()
class http_listener(asyncore.dispatcher):
@@ -203,25 +202,24 @@ class http_listener(asyncore.dispatcher):
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.bind(("", port))
self.listen(5)
- print "[Listening on port %s]" % port
+ if debug: print "[Listening on port %s]" % port
def handle_accept(self):
- print "[Accepting connection]"
+ if debug: print "[Accepting connection]"
server = http_server(self.accept()[0])
def handle_error(self):
- print "[Error]"
+ if debug: print "[Error]"
print traceback.format_exc()
asyncore.close_all()
class http_client(http_stream):
- def __init__(self, orator, hostport, msg = None):
+ def __init__(self, orator, hostport):
+ if debug: print "[Creating new connection]"
http_stream.__init__(self)
self.orator = orator
- self.message_queue = []
- if msg is not None:
- self.queue_message(msg)
+ self.hostport = hostport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(hostport)
@@ -233,59 +231,62 @@ class http_client(http_stream):
self.set_terminator(None)
def handle_message(self):
- print "[Got message]"
- print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ if debug: print "[Got message]"
+ if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
print self.msg
- self.next_msg()
+ self.next_msg(first = False)
def handle_connect(self):
- print "[Connected]"
- self.next_msg()
+ if debug: print "[Connected]"
+ self.next_msg(first = True)
def queue_message(self, msg):
- print "[Adding message to queue]"
+ if debug: print "[Adding message to queue]"
self.message_queue.append(msg)
- def next_msg(self):
- if self.message_queue:
- try:
- if not self.msg.persistent():
- raise RuntimeError, "Attempting to send subsequent message to non-persistent connection"
- except AttributeError:
- pass
- print "[Pulling next message from queue]"
- self.push(self.message_queue.pop(0).format())
+ def next_msg(self, first):
+ msg = self.orator.next_msg(self.hostport, first or self.msg.persistent())
+ if msg is not None:
+ if debug: print "[Got a new message to send from my queue]"
+ self.push(msg.format())
self.restart()
else:
- print "[No messages left in queue]"
+ if debug: print "[No messages left in queue]"
self.close_when_done()
def handle_close(self):
if self.get_terminator() is None:
self.found_terminator()
-class http_orator(dict):
+class http_orator(object):
- def query(self, url, body = None):
+ def __init__(self):
+ self.clients = {}
+ self.queues = {}
+ def query(self, url, body = None):
u = urlparse.urlparse(url)
-
- assert u.scheme == "http"
- assert u.username is None
- assert u.password is None
- assert u.params == ""
- assert u.query == ""
- assert u.fragment == ""
-
+ assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
request = http_request(cmd = "POST", path = u.path, body = body, Content_Type = "text/plain", Connection = "Keep-Alive")
hostport = (u.hostname or "localhost", u.port or 80)
-
- if hostport not in self:
- print "[Creating new connection]"
- self[hostport] = http_client(self, hostport, request)
+ if hostport in self.queues:
+ self.queues[hostport].append(request)
else:
- print "[Reusing existing connection]"
- self[hostport].queue_message(request)
+ self.queues[hostport] = [request]
+ if hostport not in self.clients:
+ self.clients[hostport] = http_client(self, hostport)
+
+ def next_msg(self, hostport, usable):
+ queue = self.queues.get(hostport)
+ if queue and not usable:
+ self.clients[hostport] = http_client(self, hostport)
+ if queue and usable:
+ if debug: print "[Reusing existing connection]"
+ return queue.pop(0)
+ else:
+ return None
+
+debug = False
if len(sys.argv) == 1: