aboutsummaryrefslogtreecommitdiff
path: root/scripts/Old/async-http.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/Old/async-http.py')
-rw-r--r--scripts/Old/async-http.py529
1 files changed, 0 insertions, 529 deletions
diff --git a/scripts/Old/async-http.py b/scripts/Old/async-http.py
deleted file mode 100644
index 5b5fc1cd..00000000
--- a/scripts/Old/async-http.py
+++ /dev/null
@@ -1,529 +0,0 @@
-"""
-Testbed for figuring out how to write asynchronous HTTPS code.
-
-$Id$
-
-Copyright (C) 2009 Internet Systems Consortium ("ISC")
-
-Permission to use, copy, modify, and distribute this software for any
-purpose with or without fee is hereby granted, provided that the above
-copyright notice and this permission notice appear in all copies.
-
-THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
-REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
-AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
-INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
-LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
-OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
-PERFORMANCE OF THIS SOFTWARE.
-"""
-
-# Command that may be useful for testing server side of this:
-#
-# lynx -post_data -mime_header -source http://127.0.0.1:8000/
-#
-# Testing the client side of this is more entertaining, both because
-# we need to be liberal in what we accept and also because servers do
-# wildly different things depending both on HTTP version number and
-# options signaled by the client and also on internal details in the
-# server context (such as whether the content is static or generated
-# by CGI). It's useful to test against static content, but also to
-# test against CGI-generated output, eg, the following trivial script:
-#
-# print "Content-Type: text/plain; charset=US-ASCII\r"
-# print "\r"
-# for i in xrange(100):
-# print "%08d" % i, "." * 120, "\r"
-#
-# At least with Apache 2.0, result of running this differs
-# significantly depending on whether client signals HTTP 1.0 or 1.1;
-# the latter produces chunked output.
-
-import sys, os, time, socket, asyncore, asynchat, traceback, urlparse
-import rpki.async, rpki.sundial
-
-debug = True
-
-want_persistent_client = True
-want_persistent_server = True
-
-idle_timeout_default = rpki.sundial.timedelta(seconds = 60)
-active_timeout_default = rpki.sundial.timedelta(seconds = 15)
-
-default_http_version = (1, 1)
-
-class http_message(object):
-
- software_name = "BalmyBandicoot HTTP test code"
-
- def __init__(self, version = None, body = None, headers = None):
- self.version = version
- self.body = body
- self.headers = headers
- self.normalize_headers()
-
- def normalize_headers(self, headers = None):
- if headers is None:
- headers = () if self.headers is None else self.headers.items()
- translate_underscore = True
- else:
- translate_underscore = False
- result = {}
- for k,v in headers:
- if translate_underscore:
- k = k.replace("_", "-")
- k = "-".join(s.capitalize() for s in k.split("-"))
- v = v.strip()
- if k in result:
- result[k] += ", " + v
- else:
- result[k] = v
- self.headers = result
-
- @classmethod
- def parse_from_wire(cls, headers):
- self = cls()
- headers = headers.split("\r\n")
- self.parse_first_line(*headers.pop(0).split(None, 2))
- for i in xrange(len(headers) - 2, -1, -1):
- if headers[i + 1][0].isspace():
- headers[i] += headers[i + 1]
- del headers[i + 1]
- self.normalize_headers([h.split(":", 1) for h in headers])
- return self
-
- def format(self):
- s = self.format_first_line()
- if self.body is not None:
- assert isinstance(self.body, str)
- self.headers["Content-Length"] = len(self.body)
- for kv in self.headers.iteritems():
- s += "%s: %s\r\n" % kv
- s += "\r\n"
- if self.body is not None:
- s += self.body
- return s
-
- def __str__(self):
- return self.format()
-
- def parse_version(self, version):
- if version[:5] != "HTTP/":
- raise RuntimeError, "Couldn't parse version %s" % version
- self.version = tuple(int(i) for i in version[5:].split("."))
-
- def persistent(self):
- c = self.headers.get("Connection")
- if self.version == (1, 1):
- return c is None or "close" not in c.lower()
- elif self.version == (1, 0):
- return c is not None and "keep-alive" in c.lower()
- else:
- return False
-
-class http_request(http_message):
-
- def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, **headers):
- if cmd is not None and cmd != "POST" and body is not None:
- raise RuntimeError
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.cmd = cmd
- self.path = path
- self.callback = callback
-
- def parse_first_line(self, cmd, path, version):
- self.parse_version(version)
- self.cmd = cmd
- self.path = path
-
- def format_first_line(self):
- self.headers.setdefault("User-Agent", self.software_name)
- return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
-
-class http_response(http_message):
-
- def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.code = code
- self.reason = reason
-
- def parse_first_line(self, version, code, reason):
- self.parse_version(version)
- self.code = int(code)
- self.reason = reason
-
- def format_first_line(self):
- self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
- self.headers.setdefault("Server", self.software_name)
- return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
-
-def logger(self, msg):
- if debug:
- print "[%s %r: %s]" % (rpki.sundial.now().strftime("%T"), self, msg)
-
-class http_stream(asynchat.async_chat):
-
- log = logger
-
- idle_timeout = idle_timeout_default
- active_timeout = active_timeout_default
-
- def __init__(self, conn = None):
- asynchat.async_chat.__init__(self, conn = conn)
- self.buffer = []
- self.timer = rpki.async.timer(self.handle_timeout)
- self.restart()
-
- def restart(self, idle = True):
- assert not self.buffer
- self.chunk_handler = None
- self.set_terminator("\r\n\r\n")
- timeout = self.idle_timeout if idle else self.active_timeout
- if timeout is not None:
- self.timer.set(timeout)
- else:
- self.timer.cancel()
-
- def update_active_timeout(self):
- if self.active_timeout is not None:
- self.timer.set(self.active_timeout)
- else:
- self.timer.cancel()
-
- def collect_incoming_data(self, data):
- """Buffer the data"""
- self.buffer.append(data)
- self.update_active_timeout()
-
- def get_buffer(self):
- val = "".join(self.buffer)
- self.buffer = []
- return val
-
- def found_terminator(self):
- self.update_active_timeout()
- if self.chunk_handler:
- self.chunk_handler()
- elif not isinstance(self.get_terminator(), str):
- self.handle_body()
- else:
- self.log("Got headers")
- self.msg = self.parse_type.parse_from_wire(self.get_buffer())
- if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
- self.msg.body = []
- self.chunk_handler = self.chunk_header
- self.set_terminator("\r\n")
- elif "Content-Length" in self.msg.headers:
- self.set_terminator(int(self.msg.headers["Content-Length"]))
- else:
- self.handle_no_content_length()
-
- def chunk_header(self):
- n = int(self.get_buffer().partition(";")[0], 16)
- self.log("Chunk length %s" % n)
- if n:
- self.chunk_handler = self.chunk_body
- self.set_terminator(n)
- else:
- self.msg.body = "".join(self.msg.body)
- self.chunk_handler = self.chunk_discard_trailer
-
- def chunk_body(self):
- self.log("Chunk body")
- self.msg.body += self.buffer
- self.buffer = []
- self.chunk_handler = self.chunk_discard_crlf
- self.set_terminator("\r\n")
-
- def chunk_discard_crlf(self):
- self.log("Chunk CRLF")
- s = self.get_buffer()
- assert s == "", "Expected chunk CRLF, got '%s'" % s
- self.chunk_handler = self.chunk_header
-
- def chunk_discard_trailer(self):
- self.log("Chunk trailer")
- s = self.get_buffer()
- assert s == "", "Expected end of chunk trailers, got '%s'" % s
- self.chunk_handler = None
- self.handle_message()
-
- def handle_body(self):
- self.msg.body = self.get_buffer()
- self.handle_message()
-
- def handle_error(self):
- self.log("Error in HTTP stream handler")
- print traceback.format_exc()
- asyncore.close_all()
-
- def handle_timeout(self):
- self.log("Timeout, closing")
- self.close()
-
- def handle_close(self):
- asynchat.async_chat.handle_close(self)
- self.timer.cancel()
- self.log("Closed")
-
-class http_server(http_stream):
-
- parse_type = http_request
-
- def __init__(self, conn, handlers):
- self.handlers = handlers
- http_stream.__init__(self, conn)
- self.expect_close = not want_persistent_server
-
- def handle_no_content_length(self):
- self.handle_message()
-
- def find_handler(self, path):
- """Helper method to search self.handlers."""
- for s, h in self.handlers:
- if path.startswith(s):
- return h
- return None
-
- def handle_message(self):
- if not self.msg.persistent():
- self.expect_close = True
- handler = self.find_handler(self.msg.path)
- error = None
- if False and self.msg.cmd != "POST":
- error = 501, "No handler for method %s" % self.msg.cmd
- elif False and self.headers["Content-Type"] != rpki_content_type:
- error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"]
- elif handler is None:
- error = 404, "No handler for URL %s" % self.msg.path
- if error is None:
- handler(self.msg, self.send_message)
- else:
- self.send_error(*error)
-
- def send_error(self, code, reason):
- self.handle_message_message(http_response(code = code, reason = reason))
-
- def send_message(self, msg):
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- if self.expect_close:
- self.log("Closing")
- self.timer.cancel()
- self.close_when_done()
- else:
- self.log("Listening for next message")
- self.restart()
-
-class http_listener(asyncore.dispatcher):
-
- log = logger
-
- def __init__(self, handlers, port = 80, host = ""):
- asyncore.dispatcher.__init__(self)
- self.handlers = handlers
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.bind((host, port))
- self.listen(5)
- self.log("Listening on (host, port) %r, handlers %r" % ((host, port), handlers))
-
- def handle_accept(self):
- self.log("Accepting connection")
- server = http_server(conn = self.accept()[0], handlers = self.handlers)
-
- def handle_error(self):
- self.log("Error in HTTP listener")
- print traceback.format_exc()
- asyncore.close_all()
-
-class http_client(http_stream):
-
- parse_type = http_response
-
- def __init__(self, queue, hostport):
- self.log("Creating new connection to %s" % repr(hostport))
- http_stream.__init__(self)
- self.queue = queue
- self.state = "idle"
- self.expect_close = not want_persistent_client
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(hostport)
-
- def handle_no_content_length(self):
- self.set_terminator(None)
-
- def send_request(self, msg):
- self.log("Sending request")
- assert self.state == "idle"
- if msg is not None:
- self.state = "request-sent"
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- self.restart(idle = False)
-
- def handle_message(self):
- if not self.msg.persistent():
- self.expect_close = True
- self.log("Message received, state %s" % self.state)
- msg = None
- if self.state == "request-sent":
- msg = self.queue.done_with_request()
- elif self.state == "idle":
- self.log("Received unsolicited message")
- elif self.state == "closing":
- assert not self.msg.body
- self.log("Ignoring empty response received while closing")
- return
- else:
- raise RuntimeError, "[%r: Unexpected state]" % self
- self.state = "idle"
- if msg != None:
- msg.callback(self.msg)
- msg = self.queue.next_request(not self.expect_close)
- if msg is not None:
- self.log("Got a new message to send from my queue")
- self.send_request(msg)
- elif self.expect_close:
- self.log("Closing")
- self.state = "closing"
- self.queue.closing(self)
- self.close_when_done()
- else:
- self.log("Idling")
- self.timer.set(self.idle_timeout)
-
- def handle_connect(self):
- self.log("Connected")
- self.send_request(self.queue.next_request(True))
-
- def kickstart(self):
- self.log("Kickstart")
- assert self.state == "idle"
- self.send_request(self.queue.next_request(True))
-
- def handle_close(self):
- http_stream.handle_close(self)
- self.queue.closing(self)
- if self.get_terminator() is None:
- self.handle_body()
-
-class http_queue(object):
-
- log = logger
-
- def __init__(self, hostport):
- self.log("Creating queue for %s" % repr(hostport))
- self.hostport = hostport
- self.client = None
- self.queue = []
-
- def request(self, *requests):
- self.log("Adding requests %r" % requests)
- need_kick = self.client is not None and not self.queue
- self.queue.extend(requests)
- if self.client is None:
- self.client = http_client(self, self.hostport)
- elif need_kick:
- self.client.kickstart()
-
- def done_with_request(self):
- req = self.queue.pop(0)
- self.log("Dequeuing request %r" % req)
- return req
-
- def next_request(self, usable):
- if not self.queue:
- self.log("Queue is empty")
- return None
- self.log("Queue: %r" % self.queue)
- if usable:
- self.log("Queue not empty and connection usable")
- return self.queue[0]
- else:
- self.log("Queue not empty but connection not usable, spawning")
- self.client = http_client(self, self.hostport)
- self.log("Spawned connection %r" % self.client)
- return None
-
- def closing(self, client):
- if client is self.client:
- self.log("Removing client")
- self.client = None
-
-class http_manager(dict):
-
- log = logger
-
- def query(self, url, callback, body = None):
- u = urlparse.urlparse(url)
- assert u.scheme == "http" and u.username is None and u.password is None and u.params == "" and u.query == "" and u.fragment == ""
- request = http_request(cmd = "POST", path = u.path, body = body, callback = callback,
- Host = u.hostname, Content_Type = "text/plain")
- hostport = (u.hostname or "localhost", u.port or 80)
- self.log("Created request %r for %r" % (request, hostport))
- if hostport not in self:
- self[hostport] = http_queue(hostport)
- self[hostport].request(request)
-
- def __repr__(self):
- return "<%s object at %s>" % (self.__class__.__name__, id(self))
-
-def client(msg, url, timeout = 300, callback = None):
- pass
-
-def server(handlers, port, host =""):
- if not isinstance(handlers, (tuple, list)):
- handlers = (("/", handlers),)
- listener = http_listener(port = 8000, handlers = handlers)
- rpki.async.event_loop()
-
-if len(sys.argv) == 1:
-
- def handler(query_message, reply_callback):
- reply_callback(http_response(
- code = 200,
- reason = "OK",
- body = str(query_message),
- Cache_Control = "no-cache,no-store",
- Content_Type = "text/plain"))
-
- def other_handler(query_message, reply_callback):
- reply_callback(http_response(
- code = 200,
- reason = "OK",
- body = "Ok, you found it.\r\n\r\n" + str(query_message),
- Cache_Control = "no-cache,no-store",
- Content_Type = "text/plain"))
-
- server(port = 8000, handlers = (("/wombat", other_handler), ("/", handler)))
-
-else:
-
- def got_one(msg):
- logger(None, "Got response")
- if True:
- print msg
- print
-
- manager = http_manager()
-
- timer = rpki.async.timer()
-
- def loop(iterator, url):
- logger(None, "Scheduler loop")
- manager.query(url = url, callback = got_one, body = "Hi, I'm trying to talk to URL %s" % url)
- timer.set(rpki.sundial.timedelta(seconds = 3))
-
- def done():
- logger(None, "Scheduler done")
-
- timer.set_handler(rpki.async.iterator(sys.argv[1:], loop, done))
-
- rpki.async.event_loop()
-
- for q in manager.values():
- assert not q.queue, "Requests still scheduled: %r %r %r" % (q, q.hostport, q.queue)
-
- assert not rpki.async.timer.queue, "Timers still scheduled: %r" % rpki.async.timer.queue