aboutsummaryrefslogtreecommitdiff
path: root/scripts/async-http.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-05-03 20:45:50 +0000
committerRob Austein <sra@hactrn.net>2009-05-03 20:45:50 +0000
commitf7fe04d7a2822090a5b350fbba3e08372fb169ff (patch)
tree00fd5de1f4daf5a9f450160695a1b17a3ecbd838 /scripts/async-http.py
parentb2c6f80a3d58f08aa6c40408743db817d676857f (diff)
Refactor http_manager into http_queue
svn path=/scripts/async-http.py; revision=2392
Diffstat (limited to 'scripts/async-http.py')
-rw-r--r--scripts/async-http.py94
1 files changed, 50 insertions, 44 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py
index c8d74e25..66cd000e 100644
--- a/scripts/async-http.py
+++ b/scripts/async-http.py
@@ -357,11 +357,10 @@ class http_client(http_stream):
parse_type = http_response
- def __init__(self, manager, hostport):
+ def __init__(self, queue, hostport):
self.log("Creating new connection to %s" % repr(hostport))
http_stream.__init__(self)
- self.manager = manager
- self.hostport = hostport
+ self.queue = queue
self.state = "idle"
self.expect_close = not want_persistent_client
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -373,11 +372,11 @@ class http_client(http_stream):
def send_request(self, msg):
self.log("Sending request")
assert self.state == "idle"
- assert msg is not None
- self.state = "request-sent"
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- self.restart()
+ if msg is not None:
+ self.state = "request-sent"
+ msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
+ self.push(msg.format())
+ self.restart()
def handle_message(self):
if not self.msg.persistent():
@@ -385,7 +384,7 @@ class http_client(http_stream):
self.log("Message received, state %s" % self.state)
msg = None
if self.state == "request-sent":
- msg = self.manager.done_with_request(self.hostport)
+ msg = self.queue.done_with_request()
print "Query:"
print msg
print
@@ -403,7 +402,7 @@ class http_client(http_stream):
self.state = "idle"
if msg != None:
msg.callback(self.msg)
- msg = self.manager.next_request(self.hostport, not self.expect_close)
+ msg = self.queue.next_request(not self.expect_close)
if msg is not None:
self.log("Got a new message to send from my queue")
self.send_request(msg)
@@ -414,62 +413,69 @@ class http_client(http_stream):
def handle_connect(self):
self.log("Connected")
- msg = self.manager.next_request(self.hostport, True)
- self.send_request(msg)
+ self.send_request(self.queue.next_request(True))
+
+ def kickstart(self):
+ self.log("Kickstart")
+ self.send_request(self.queue.next_request(True))
def handle_close(self):
http_stream.handle_close(self)
if self.get_terminator() is None:
self.handle_body()
-class http_manager(object):
+class http_queue(object):
log = logger
- # Hmm, these parallel dicts are almost certainly a hint that we need
- # an http_queue class or some such, then the manager can become a
- # simple map of destinations to queues, or something like that.
-
- def __init__(self):
- self.clients = {}
- self.queues = {}
+ def __init__(self, hostport, *requests):
+ self.hostport = hostport
+ self.queue = list(requests)
+ self.client = http_client(self, hostport)
- def query(self, url, callback, body = None):
- u = urlparse.urlparse(url)
- assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
- request = http_request(cmd = "POST", path = u.path, body = body, callback = callback,
- Host = u.hostname, Content_Type = "text/plain")
- hostport = (u.hostname or "localhost", u.port or 80)
- assert (hostport in self.queues) == (hostport in self.clients)
- if hostport not in self.queues:
- self.queues[hostport] = []
- self.queues[hostport].append(request)
- if hostport not in self.clients:
- self.clients[hostport] = http_client(self, hostport)
-
- # Messages have to stay in queue here in case client fails and we
- # need to retry with another client. What a mess.
+ def request(self, *requests):
+ need_kick = self.client.state == "idle" and not self.queue
+ self.queue.extend(requests)
+ if need_kick:
+ self.client.kickstart()
- def done_with_request(self, hostport):
- req = self.queues[hostport].pop(0)
+ def done_with_request(self):
+ req = self.queue.pop(0)
self.log("Dequeuing request %r" % req)
return req
- def next_request(self, hostport, usable):
- queue = self.queues.get(hostport)
- if not queue:
+ def next_request(self, usable):
+ if not self.queue:
self.log("Queue is empty")
return None
- self.log("Queue: %r" % queue)
+ self.log("Queue: %r" % self.queue)
if usable:
self.log("Queue not empty and connection usable")
- return queue[0]
+ return self.queue[0]
else:
self.log("Queue not empty but connection not usable, spawning")
- self.clients[hostport] = http_client(self, hostport)
- self.log("Spawned connection %r" % self.clients[hostport])
+ self.client = http_client(self, hostport)
+ self.log("Spawned connection %r" % self.client)
return None
+class http_manager(object):
+
+ log = logger
+
+ def __init__(self):
+ self.queues = {}
+
+ def query(self, url, callback, body = None):
+ u = urlparse.urlparse(url)
+ assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
+ request = http_request(cmd = "POST", path = u.path, body = body, callback = callback,
+ Host = u.hostname, Content_Type = "text/plain")
+ hostport = (u.hostname or "localhost", u.port or 80)
+ if hostport in self.queues:
+ self.queues[hostport].request(request)
+ else:
+ self.queues[hostport] = http_queue(hostport, request)
+
# server: reuse rest-style dispatcher from current https code.
#
# add downcall to set result: don't do this presently, because