aboutsummaryrefslogtreecommitdiff
path: root/scripts/async-http.py
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-04-30 01:26:19 +0000
committerRob Austein <sra@hactrn.net>2009-04-30 01:26:19 +0000
commit807f3533e0f38e62224ca578597314ff30ae550c (patch)
tree6903d5568cf9f9b31a64e27a89aec640be664696 /scripts/async-http.py
parentde48a7e4dac62ce88aff8fe97ad46ed3ddd9459a (diff)
Checkpoint
svn path=/scripts/async-http.py; revision=2378
Diffstat (limited to 'scripts/async-http.py')
-rw-r--r--scripts/async-http.py150
1 files changed, 87 insertions, 63 deletions
diff --git a/scripts/async-http.py b/scripts/async-http.py
index b33cbc5d..5b85cf24 100644
--- a/scripts/async-http.py
+++ b/scripts/async-http.py
@@ -44,7 +44,8 @@ import rpki.async
debug = True
-allow_persistence = False
+want_persistent_client = False
+want_persistent_server = False
class http_message(object):
@@ -182,41 +183,45 @@ class http_stream(asynchat.async_chat):
self.handle_message()
def handle_error(self):
- if debug: print "[Error in HTTP stream handler]"
+ if debug: print "[%s: Error in HTTP stream handler]" % repr(self)
print traceback.format_exc()
asyncore.close_all()
class http_server(http_stream):
+ def __init__(self, conn = None):
+ http_stream.__init__(self, conn)
+ self.expect_close = not want_persistent_server
+
def handle_headers(self):
- if debug: print "[Got headers]"
+ if debug: print "[%s: Got headers]" % repr(self)
self.msg = http_request.parse_from_wire(self.get_buffer())
if self.msg.cmd == "POST":
- if debug: print "[Waiting for POST body]"
+ if debug: print "[%s: Waiting for POST body]" % repr(self)
self.set_terminator(int(self.msg.headers["Content-Length"]))
else:
self.handle_message()
def handle_message(self):
- if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ if not self.msg.persistent():
+ self.expect_close = True
print "Query:"
print self.msg
print
msg = http_response(code = 200, reason = "OK", body = self.msg.format(),
- Connection = "Keep-Alive" if allow_persistence and self.msg.persistent() else "Close",
+ Connection = "Close" if self.expect_close else "Keep-Alive",
Cache_Control = "no-cache,no-store",
Content_Type = "text/plain")
-
print "Reply:"
print msg
print
self.push(msg.format())
- if allow_persistence and self.msg.persistent():
- if debug: print "[Listening for next message]"
- self.restart()
- else:
- if debug: print "[Closing]"
+ if self.expect_close:
+ if debug: print "[%s: Closing]" % repr(self)
self.close_when_done()
+ else:
+ if debug: print "[%s: Listening for next message]" % repr(self)
+ self.restart()
class http_listener(asyncore.dispatcher):
@@ -227,24 +232,36 @@ class http_listener(asyncore.dispatcher):
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.bind(("", port))
self.listen(5)
- if debug: print "[Listening on port %s]" % port
+ if debug: print "[%s: Listening on port %s]" % (repr(self), port)
def handle_accept(self):
- if debug: print "[Accepting connection]"
+ if debug: print "[%s: Accepting connection]" % repr(self)
server = http_server(self.accept()[0])
def handle_error(self):
- if debug: print "[Error in HTTP listener]"
+ if debug: print "[%s: Error in HTTP listener]" % repr(self)
print traceback.format_exc()
asyncore.close_all()
+# Might need to know whether outbound data is fully sent, as part of
+# this state thing. If so, calling .writable() ought to do the trick,
+# so long as it has no side effects (need to check asynchat.py for
+# that).
+#
+# I don't think there's anything we can do about crossed-in-mail
+# problem where we finish sending query just as server sends us
+# an unsolicited message. One would like to think that the HTTP
+# specification rules this out, but no bets.
+
class http_client(http_stream):
def __init__(self, narrator, hostport):
- if debug: print "[Creating new connection]"
+ if debug: print "[%s: Creating new connection]" % repr(self)
http_stream.__init__(self)
self.narrator = narrator
self.hostport = hostport
+ self.state = "idle"
+ self.expect_close = not want_persistent_client
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(hostport)
@@ -255,51 +272,48 @@ class http_client(http_stream):
else:
self.set_terminator(None)
+ def send_request(self, msg):
+ print "[%s: Sending request]" % repr(self)
+ assert self.state == "idle"
+ assert msg is not None
+ self.state = "request-sent"
+ msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
+ self.push(msg.format())
+ self.restart()
+
def handle_message(self):
- try:
- msg = self.narrator.done_msg(self.hostport)
+ if not self.msg.persistent():
+ self.expect_close = True
+ print "[%s: Message received, state %s]" % (repr(self), self.state)
+ if self.state == "request-sent":
print "Query:"
- print msg
+ print self.narrator.done_with_request(self.hostport)
print
- except IndexError:
- #
- # This is not a good test. If we get here, we certainly have an
- # unsolicited message, but if we get an unsolicited message with
- # something in the queue we will get confused and match up the
- # wrong query and reply. Needs fixing.
- #
- print "[Received unsolicited message]"
- if debug: print "[Connection %s persistent]" % ("is" if self.msg.persistent() else "isn't")
+ elif self.state == "idle":
+ print "[%s: Received unsolicited message]" % repr(self)
+ elif self.state == "closing":
+ assert not self.msg.body
+ print "[%s: Ignoring empty response received while closing]" % repr(self)
+ return
+ else:
+ print "[%s: Unexpected state]" % repr(self)
print "Reply:"
print self.msg
print
- #
- # Hmm, this is probably the first place where the persistence
- # handling is confused. Why are we even looking for a next
- # message when the connection should close? Fixing this may
- # simplify unsnarling the rest of the client-side persistence
- # hairball.
- #
- self.next_msg(first = False)
-
- def handle_connect(self):
- if debug: print "[Connected]"
- self.next_msg(first = True)
-
- def queue_message(self, msg):
- if debug: print "[Adding message to queue]"
- self.message_queue.append(msg)
-
- def next_msg(self, first):
- msg = self.narrator.next_msg(self.hostport, first or (allow_persistence and self.msg.persistent()))
+ msg = self.narrator.next_request(self.hostport, not self.expect_close)
if msg is not None:
- if debug: print "[Got a new message to send from my queue]"
- self.push(msg.format())
- self.restart()
+ if debug: print "[%s: Got a new message to send from my queue]" % repr(self)
+ self.send_request(msg)
else:
- if debug: print "[No messages left in queue]"
+ if debug: print "[%s: Closing]" % repr(self)
+ self.state = "closing"
self.close_when_done()
+ def handle_connect(self):
+ if debug: print "[%s: Connected]" % repr(self)
+ msg = self.narrator.next_request(self.hostport, True)
+ self.send_request(msg)
+
def handle_close(self):
if self.get_terminator() is None:
self.found_terminator()
@@ -315,26 +329,36 @@ class http_narrator(object):
assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
request = http_request(cmd = "POST", path = u.path, body = body,
Host = u.hostname,
- Content_Type = "text/plain",
- Connection = "Keep-Alive" if allow_persistence else "Close")
+ Content_Type = "text/plain")
hostport = (u.hostname or "localhost", u.port or 80)
- if hostport in self.queues:
- self.queues[hostport].append(request)
- else:
- self.queues[hostport] = [request]
+ assert (hostport in self.queues) == (hostport in self.clients)
+ if hostport not in self.queues:
+ self.queues[hostport] = []
+ self.queues[hostport].append(request)
if hostport not in self.clients:
self.clients[hostport] = http_client(self, hostport)
- def done_msg(self, hostport):
- return self.queues[hostport].pop(0)
+ # Messages have to stay in queue here in case client fails and we
+ # need to retry with another client. What a mess.
+
+ def done_with_request(self, hostport):
+ req = self.queues[hostport].pop(0)
+ print "[%s: Dequeuing request %s]" % (repr(self), repr(req))
+ return req
- def next_msg(self, hostport, usable):
+ def next_request(self, hostport, usable):
queue = self.queues.get(hostport)
- if queue and not usable:
- self.clients[hostport] = http_client(self, hostport)
- if queue and usable:
+ if not queue:
+ print "[%s: Queue is empty]" % repr(self)
+ return None
+ print "[%s: Queue: %s]" % (repr(self), repr(queue))
+ if usable:
+ print "[%s: Queue not empty and connection usable]" % repr(self)
return queue[0]
else:
+ print "[%s: Queue not empty but connection not usable, spawning]" % repr(self)
+ self.clients[hostport] = http_client(self, hostport)
+ print "[%s: Spawned connection %s]" % (repr(self), repr(self.clients[hostport]))
return None
if len(sys.argv) == 1: