diff options
author | Rob Austein <sra@hactrn.net> | 2009-04-30 01:26:19 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2009-04-30 01:26:19 +0000 |
commit | 807f3533e0f38e62224ca578597314ff30ae550c (patch) | |
tree | 6903d5568cf9f9b31a64e27a89aec640be664696 /scripts/async-http.py | |
parent | de48a7e4dac62ce88aff8fe97ad46ed3ddd9459a (diff) |
Checkpoint
svn path=/scripts/async-http.py; revision=2378
Diffstat (limited to 'scripts/async-http.py')
-rw-r--r-- | scripts/async-http.py | 150 |
1 files changed, 87 insertions, 63 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py index b33cbc5d..5b85cf24 100644 --- a/scripts/async-http.py +++ b/scripts/async-http.py @@ -44,7 +44,8 @@ import rpki.async debug = True -allow_persistence = False +want_persistent_client = False +want_persistent_server = False class http_message(object): @@ -182,41 +183,45 @@ class http_stream(asynchat.async_chat): self.handle_message() def handle_error(self): - if debug: print "[Error in HTTP stream handler]" + if debug: print "[%s: Error in HTTP stream handler]" % repr(self) print traceback.format_exc() asyncore.close_all() class http_server(http_stream): + def __init__(self, conn = None): + http_stream.__init__(self, conn) + self.expect_close = not want_persistent_server + def handle_headers(self): - if debug: print "[Got headers]" + if debug: print "[%s: Got headers]" % repr(self) self.msg = http_request.parse_from_wire(self.get_buffer()) if self.msg.cmd == "POST": - if debug: print "[Waiting for POST body]" + if debug: print "[%s: Waiting for POST body]" % repr(self) self.set_terminator(int(self.msg.headers["Content-Length"])) else: self.handle_message() def handle_message(self): - if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") + if not self.msg.persistent(): + self.expect_close = True print "Query:" print self.msg print msg = http_response(code = 200, reason = "OK", body = self.msg.format(), - Connection = "Keep-Alive" if allow_persistence and self.msg.persistent() else "Close", + Connection = "Close" if self.expect_close else "Keep-Alive", Cache_Control = "no-cache,no-store", Content_Type = "text/plain") - print "Reply:" print msg print self.push(msg.format()) - if allow_persistence and self.msg.persistent(): - if debug: print "[Listening for next message]" - self.restart() - else: - if debug: print "[Closing]" + if self.expect_close: + if debug: print "[%s: Closing]" % repr(self) self.close_when_done() + else: + if debug: print "[%s: Listening for next message]" % repr(self) + self.restart() class http_listener(asyncore.dispatcher): @@ -227,24 +232,36 @@ class http_listener(asyncore.dispatcher): self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self.bind(("", port)) self.listen(5) - if debug: print "[Listening on port %s]" % port + if debug: print "[%s: Listening on port %s]" % (repr(self), port) def handle_accept(self): - if debug: print "[Accepting connection]" + if debug: print "[%s: Accepting connection]" % repr(self) server = http_server(self.accept()[0]) def handle_error(self): - if debug: print "[Error in HTTP listener]" + if debug: print "[%s: Error in HTTP listener]" % repr(self) print traceback.format_exc() asyncore.close_all() +# Might need to know whether outbound data is fully sent, as part of +# this state thing. If so, calling .writable() ought to do the trick, +# so long as it has no side effects (need to check asynchat.py for +# that). +# +# I don't think there's anything we can do about crossed-in-mail +# problem where we finish sending query just as server sends us +# an unsolicited message. One would like to think that the HTTP +# specification rules this out, but no bets. + class http_client(http_stream): def __init__(self, narrator, hostport): - if debug: print "[Creating new connection]" + if debug: print "[%s: Creating new connection]" % repr(self) http_stream.__init__(self) self.narrator = narrator self.hostport = hostport + self.state = "idle" + self.expect_close = not want_persistent_client self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(hostport) @@ -255,51 +272,48 @@ class http_client(http_stream): else: self.set_terminator(None) + def send_request(self, msg): + print "[%s: Sending request]" % repr(self) + assert self.state == "idle" + assert msg is not None + self.state = "request-sent" + msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" + self.push(msg.format()) + self.restart() + def handle_message(self): - try: - msg = self.narrator.done_msg(self.hostport) + if not self.msg.persistent(): + self.expect_close = True + print "[%s: Message received, state %s]" % (repr(self), self.state) + if self.state == "request-sent": print "Query:" - print msg + print self.narrator.done_with_request(self.hostport) print - except IndexError: - # - # This is not a good test. If we get here, we certainly have an - # unsolicited message, but if we get an unsolicited message with - # something in the queue we will get confused and match up the - # wrong query and reply. Needs fixing. - # - print "[Received unsolicited message]" - if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't") + elif self.state == "idle": + print "[%s: Received unsolicited message]" % repr(self) + elif self.state == "closing": + assert not self.msg.body + print "[%s: Ignoring empty response received while closing]" % repr(self) + return + else: + print "[%s: Unexpected state]" % repr(self) print "Reply:" print self.msg print - # - # Hmm, this is probably the first place where the persistence - # handling is confused. Why are we even looking for a next - # message when the connection should close? Fixing this may - # simplify unsnarling the rest of the client-side persistence - # hairball. - # - self.next_msg(first = False) - - def handle_connect(self): - if debug: print "[Connected]" - self.next_msg(first = True) - - def queue_message(self, msg): - if debug: print "[Adding message to queue]" - self.message_queue.append(msg) - - def next_msg(self, first): - msg = self.narrator.next_msg(self.hostport, first or (allow_persistence and self.msg.persistent())) + msg = self.narrator.next_request(self.hostport, not self.expect_close) if msg is not None: - if debug: print "[Got a new message to send from my queue]" - self.push(msg.format()) - self.restart() + if debug: print "[%s: Got a new message to send from my queue]" % repr(self) + self.send_request(msg) else: - if debug: print "[No messages left in queue]" + if debug: print "[%s: Closing]" % repr(self) + self.state = "closing" self.close_when_done() + def handle_connect(self): + if debug: print "[%s: Connected]" % repr(self) + msg = self.narrator.next_request(self.hostport, True) + self.send_request(msg) + def handle_close(self): if self.get_terminator() is None: self.found_terminator() @@ -315,26 +329,36 @@ class http_narrator(object): assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == "" request = http_request(cmd = "POST", path = u.path, body = body, Host = u.hostname, - Content_Type = "text/plain", - Connection = "Keep-Alive" if allow_persistence else "Close") + Content_Type = "text/plain") hostport = (u.hostname or "localhost", u.port or 80) - if hostport in self.queues: - self.queues[hostport].append(request) - else: - self.queues[hostport] = [request] + assert (hostport in self.queues) == (hostport in self.clients) + if hostport not in self.queues: + self.queues[hostport] = [] + self.queues[hostport].append(request) if hostport not in self.clients: self.clients[hostport] = http_client(self, hostport) - def done_msg(self, hostport): - return self.queues[hostport].pop(0) + # Messages have to stay in queue here in case client fails and we + # need to retry with another client. What a mess. + + def done_with_request(self, hostport): + req = self.queues[hostport].pop(0) + print "[%s: Dequeuing request %s]" % (repr(self), repr(req)) + return req - def next_msg(self, hostport, usable): + def next_request(self, hostport, usable): queue = self.queues.get(hostport) - if queue and not usable: - self.clients[hostport] = http_client(self, hostport) - if queue and usable: + if not queue: + print "[%s: Queue is empty]" % repr(self) + return None + print "[%s: Queue: %s]" % (repr(self), repr(queue)) + if usable: + print "[%s: Queue not empty and connection usable]" % repr(self) return queue[0] else: + print "[%s: Queue not empty but connection not usable, spawning]" % repr(self) + self.clients[hostport] = http_client(self, hostport) + print "[%s: Spawned connection %s]" % (repr(self), repr(self.clients[hostport])) return None if len(sys.argv) == 1: |