aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Austein <sra@hactrn.net>2009-05-08 22:45:48 +0000
committerRob Austein <sra@hactrn.net>2009-05-08 22:45:48 +0000
commit3b6dc091e59d09e545a33725b5ca8fc4639d8ecb (patch)
tree5271f84fa7427ec01d98cdd13b4b67f9b94d2720
parent21ba753b3b87d98c3feabfa4ba5c88186874fb48 (diff)
Rototill HTTP client logic. It's still nasty, but cleaner than it was.
svn path=/rpkid/rpki/https.py; revision=2413
-rw-r--r--rpkid/rpki/https.py151
1 files changed, 75 insertions, 76 deletions
diff --git a/rpkid/rpki/https.py b/rpkid/rpki/https.py
index 860b8d22..32e4db4e 100644
--- a/rpkid/rpki/https.py
+++ b/rpkid/rpki/https.py
@@ -383,7 +383,7 @@ class http_client(http_stream):
http_stream.__init__(self)
self.queue = queue
self.hostport = hostport
- self.state = "idle"
+ self.state = "opening"
self.expect_close = not want_persistent_client
def start(self):
@@ -393,76 +393,59 @@ class http_client(http_stream):
except:
self.handle_error()
+ def set_state(self, state):
+ self.log("State transition %s => %s" % (self.state, state))
+ self.state = state
+
def handle_no_content_length(self):
self.set_terminator(None)
def send_request(self, msg):
- self.log("Sending request")
+ self.log("Sending request %r" % msg)
assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
- if msg is not None:
- self.state = "request-sent"
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- self.restart(idle = False)
+ self.set_state("request-sent")
+ msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
+ self.push(msg.format())
+ self.restart(idle = False)
def handle_message(self):
+ self.log("Message received, state %s" % self.state)
+
if not self.msg.persistent():
self.expect_close = True
- self.log("Message received, state %s" % self.state)
- msg = None
- if self.state == "request-sent":
- msg = self.queue.done_with_request()
- elif self.state == "idle":
- self.log("Received unsolicited message")
- elif self.state == "closing":
- assert not self.msg.body
- self.log("Ignoring empty response received while closing")
- return
- else:
- raise RuntimeError, "[%r: Unexpected state]" % self
- self.state = "idle"
- if msg != None:
- try:
- if self.msg.code != 200:
- e = rpki.exceptions.HTTPRequestFailed("HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body))
- rpki.log.debug("HTTPS client returned failure: %s" % e)
- msg.errback(e)
- else:
- self.log("Delivering HTTPS client result")
- msg.callback(self.msg.body)
- except asyncore.ExitNow:
- raise
- except Exception, data:
- self.log("Unhandled exception from callback")
- rpki.log.error(traceback.format_exc())
- msg = self.queue.next_request(not self.expect_close)
- if msg is not None and self.state is "idle":
- self.log("Got a new message to send from my queue")
- self.send_request(msg)
- elif msg is not None:
- self.log("Connection state %s, nothing left for me to do at the moment" % self.state)
- elif self.expect_close:
+
+ if self.state != "request-sent":
+ if self.state == "closing":
+ assert not self.msg.body
+ self.log("Ignoring empty response received while closing")
+ return
+ raise RuntimeError, "%r received message while in unexpected state %s" % (self, self.state)
+
+ if self.expect_close:
self.log("Closing")
- self.state = "closing"
- self.queue.closing(self)
+ self.set_state("closing")
+ self.queue.detach(self)
self.close_when_done()
else:
self.log("Idling")
+ self.set_state("idle")
self.timer.set(self.idle_timeout)
+ if self.msg.code == 200:
+ self.queue.return_result(self.msg)
+ else:
+ self.queue.return_result(rpki.exceptions.HTTPRequestFailed(
+ "HTTP request failed with status %s, reason %s, response %s" % (self.msg.code, self.msg.reason, self.msg.body)))
+
def handle_connect(self):
self.log("Connected")
- self.send_request(self.queue.next_request(True))
-
- def kickstart(self):
- self.log("Kickstart")
- assert self.state == "idle"
- self.send_request(self.queue.next_request(True))
+ self.set_state("idle")
+ self.queue.send_request()
def handle_close(self):
http_stream.handle_close(self)
self.log("State %s" % self.state)
- self.queue.closing(self)
+ self.queue.detach(self)
if self.get_terminator() is None:
self.handle_body()
@@ -470,11 +453,11 @@ class http_client(http_stream):
if self.state != "idle":
self.log("Timeout while in state %s" % self.state)
http_stream.handle_timeout(self)
- self.queue.closing(self)
+ self.queue.detach(self)
def handle_error(self):
http_stream.handle_error(self)
- self.queue.closing(self)
+ self.queue.detach(self)
#
# May need to call request's errback function here.
# This whole queuing business sure complicates matters.
@@ -491,39 +474,55 @@ class http_queue(object):
def request(self, *requests):
self.log("Adding requests %r" % requests)
- need_kick = self.client is not None and not self.queue
+ was_empty = not self.queue
self.queue.extend(requests)
+ if was_empty:
+ self.restart()
+
+ def restart(self):
if self.client is None:
- self.client = http_client(self, self.hostport)
- self.log("Spawned connection %r" % self.client)
+ client = http_client(self, self.hostport)
+ self.log("Attaching client %r" % client)
+ self.client = client
self.client.start()
- elif need_kick:
- self.client.kickstart()
+ else:
+ self.log("Sending new request to existing client %r" % self.client)
+ self.send_request()
+
+ def send_request(self):
+ if self.queue:
+ self.client.send_request(self.queue[0])
+
+ def detach(self, client):
+ if client is self.client:
+ self.log("Detaching client %r" % client)
+ self.client = None
+
+ def return_result(self, result):
+ old_client = self.client
- def done_with_request(self):
req = self.queue.pop(0)
self.log("Dequeuing request %r" % req)
- return req
- def next_request(self, usable):
- if not self.queue:
- self.log("Queue is empty")
- return None
+ try:
+ if isinstance(result, http_response):
+ self.log("Returning result %r to caller" % result)
+ req.callback(result.body)
+ else:
+ assert isinstance(result, Exception)
+ self.log("Returning exception %r to caller: %s" % (result, result))
+ msg.errback(result)
+ except asyncore.ExitNow:
+ raise
+ except:
+ self.log("Unhandled exception from callback")
+ rpki.log.error(traceback.format_exc())
+
self.log("Queue: %r" % self.queue)
- if usable:
- self.log("Queue not empty and connection usable")
- return self.queue[0]
- else:
- self.log("Queue not empty but connection not usable, spawning")
- self.client = http_client(self, self.hostport)
- self.log("Spawned connection %r" % self.client)
- self.client.start()
- return None
- def closing(self, client):
- if client is self.client:
- self.log("Removing client")
- self.client = None
+ if self.queue and self.client is old_client:
+ self.restart()
+
queues = {}