diff options
author | Rob Austein <sra@hactrn.net> | 2009-05-03 20:45:50 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-05-03 20:45:50 +0000 |
commit | f7fe04d7a2822090a5b350fbba3e08372fb169ff (patch) | |
tree | 00fd5de1f4daf5a9f450160695a1b17a3ecbd838 /scripts/async-http.py | |
parent | b2c6f80a3d58f08aa6c40408743db817d676857f (diff) |
Refactor http_manager into http_queue
svn path=/scripts/async-http.py; revision=2392
Diffstat (limited to 'scripts/async-http.py')
-rw-r--r-- | scripts/async-http.py | 94 |
1 files changed, 50 insertions, 44 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py index c8d74e25..66cd000e 100644 --- a/scripts/async-http.py +++ b/scripts/async-http.py @@ -357,11 +357,10 @@ class http_client(http_stream): parse_type = http_response - def __init__(self, manager, hostport): + def __init__(self, queue, hostport): self.log("Creating new connection to %s" % repr(hostport)) http_stream.__init__(self) - self.manager = manager - self.hostport = hostport + self.queue = queue self.state = "idle" self.expect_close = not want_persistent_client self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -373,11 +372,11 @@ class http_client(http_stream): def send_request(self, msg): self.log("Sending request") assert self.state == "idle" - assert msg is not None - self.state = "request-sent" - msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" - self.push(msg.format()) - self.restart() + if msg is not None: + self.state = "request-sent" + msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" + self.push(msg.format()) + self.restart() def handle_message(self): if not self.msg.persistent(): @@ -385,7 +384,7 @@ class http_client(http_stream): self.log("Message received, state %s" % self.state) msg = None if self.state == "request-sent": - msg = self.manager.done_with_request(self.hostport) + msg = self.queue.done_with_request() print "Query:" print msg print @@ -403,7 +402,7 @@ class http_client(http_stream): self.state = "idle" if msg != None: msg.callback(self.msg) - msg = self.manager.next_request(self.hostport, not self.expect_close) + msg = self.queue.next_request(not self.expect_close) if msg is not None: self.log("Got a new message to send from my queue") self.send_request(msg) @@ -414,62 +413,69 @@ class http_client(http_stream): def handle_connect(self): self.log("Connected") - msg = self.manager.next_request(self.hostport, True) - self.send_request(msg) + self.send_request(self.queue.next_request(True)) + + def kickstart(self): + self.log("Kickstart") + self.send_request(self.queue.next_request(True)) def handle_close(self): http_stream.handle_close(self) if self.get_terminator() is None: self.handle_body() -class http_manager(object): +class http_queue(object): log = logger - # Hmm, these parallel dicts are almost certainly a hint that we need - # an http_queue class or some such, then the manager can become a - # simple map of destinations to queues, or something like that. - - def __init__(self): - self.clients = {} - self.queues = {} + def __init__(self, hostport, *requests): + self.hostport = hostport + self.queue = list(requests) + self.client = http_client(self, hostport) - def query(self, url, callback, body = None): - u = urlparse.urlparse(url) - assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == "" - request = http_request(cmd = "POST", path = u.path, body = body, callback = callback, - Host = u.hostname, Content_Type = "text/plain") - hostport = (u.hostname or "localhost", u.port or 80) - assert (hostport in self.queues) == (hostport in self.clients) - if hostport not in self.queues: - self.queues[hostport] = [] - self.queues[hostport].append(request) - if hostport not in self.clients: - self.clients[hostport] = http_client(self, hostport) - - # Messages have to stay in queue here in case client fails and we - # need to retry with another client. What a mess. + def request(self, *requests): + need_kick = self.client.state == "idle" and not self.queue + self.queue.extend(requests) + if need_kick: + self.client.kickstart() - def done_with_request(self, hostport): - req = self.queues[hostport].pop(0) + def done_with_request(self): + req = self.queue.pop(0) self.log("Dequeuing request %r" % req) return req - def next_request(self, hostport, usable): - queue = self.queues.get(hostport) - if not queue: + def next_request(self, usable): + if not self.queue: self.log("Queue is empty") return None - self.log("Queue: %r" % queue) + self.log("Queue: %r" % self.queue) if usable: self.log("Queue not empty and connection usable") - return queue[0] + return self.queue[0] else: self.log("Queue not empty but connection not usable, spawning") - self.clients[hostport] = http_client(self, hostport) - self.log("Spawned connection %r" % self.clients[hostport]) + self.client = http_client(self, hostport) + self.log("Spawned connection %r" % self.client) return None +class http_manager(object): + + log = logger + + def __init__(self): + self.queues = {} + + def query(self, url, callback, body = None): + u = urlparse.urlparse(url) + assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == "" + request = http_request(cmd = "POST", path = u.path, body = body, callback = callback, + Host = u.hostname, Content_Type = "text/plain") + hostport = (u.hostname or "localhost", u.port or 80) + if hostport in self.queues: + self.queues[hostport].request(request) + else: + self.queues[hostport] = http_queue(hostport, request) + # server: reuse rest-style dispatcher from current https code. # # add downcall to set result: don't do this presently, because |