diff options
author | Rob Austein <sra@hactrn.net> | 2014-04-05 22:42:12 +0000 |
---|---|---|
committer | Rob Austein <sra@hactrn.net> | 2014-04-05 22:42:12 +0000 |
commit | fe0bf509f528dbdc50c7182f81057c6a4e15e4bd (patch) | |
tree | 07c9a923d4a0ccdfea11c49cd284f6d5757c5eda /rpki/http.py | |
parent | aa28ef54c271fbe4d52860ff8cf13cab19e2207c (diff) |
Source tree reorg, phase 1. Almost everything moved, no file contents changed.
svn path=/branches/tk685/; revision=5757
Diffstat (limited to 'rpki/http.py')
-rw-r--r-- | rpki/http.py | 1070 |
1 files changed, 1070 insertions, 0 deletions
diff --git a/rpki/http.py b/rpki/http.py new file mode 100644 index 00000000..3c541f26 --- /dev/null +++ b/rpki/http.py @@ -0,0 +1,1070 @@ +# $Id$ +# +# Copyright (C) 2013--2014 Dragon Research Labs ("DRL") +# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC") +# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") +# +# Permission to use, copy, modify, and distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notices and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL +# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL, +# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION +# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +HTTP utilities, both client and server. +""" + +import time +import socket +import asyncore +import asynchat +import urlparse +import sys +import random +import rpki.async +import rpki.sundial +import rpki.x509 +import rpki.exceptions +import rpki.log +import rpki.POW + +## @var rpki_content_type +# HTTP content type used for all RPKI messages. +rpki_content_type = "application/x-rpki" + +## @var debug_http +# Verbose chatter about HTTP streams. +debug_http = False + +## @var want_persistent_client +# Whether we want persistent HTTP client streams, when server also supports them. +want_persistent_client = False + +## @var want_persistent_server +# Whether we want persistent HTTP server streams, when client also supports them. +want_persistent_server = False + +## @var default_client_timeout +# Default HTTP client connection timeout. +default_client_timeout = rpki.sundial.timedelta(minutes = 5) + +## @var default_server_timeout +# Default HTTP server connection timeouts. Given our druthers, we'd +# prefer that the client close the connection, as this avoids the +# problem of client starting to reuse connection just as server closes +# it, so this should be longer than the client timeout. +default_server_timeout = rpki.sundial.timedelta(minutes = 10) + +## @var default_http_version +# Preferred HTTP version. +default_http_version = (1, 0) + +## @var default_tcp_port +# Default port for clients and servers that don't specify one. +default_tcp_port = 80 + +## @var enable_ipv6_servers +# Whether to enable IPv6 listeners. Enabled by default, as it should +# be harmless. Has no effect if kernel doesn't support IPv6. +enable_ipv6_servers = True + +## @var enable_ipv6_clients +# Whether to consider IPv6 addresses when making connections. +# Disabled by default, as IPv6 connectivity is still a bad joke in +# far too much of the world. +enable_ipv6_clients = False + +## @var have_ipv6 +# Whether the current machine claims to support IPv6. Note that just +# because the kernel supports it doesn't mean that the machine has +# usable IPv6 connectivity. I don't know of a simple portable way to +# probe for connectivity at runtime (the old test of "can you ping +# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set +# automatically by probing using the socket() system call at runtime. +try: + # pylint: disable=W0702,W0104 + socket.socket(socket.AF_INET6).close() + socket.IPPROTO_IPV6 + socket.IPV6_V6ONLY +except: + have_ipv6 = False +else: + have_ipv6 = True + +## @var use_adns + +# Whether to use rpki.adns code. This is still experimental, so it's +# not (yet) enabled by default. +use_adns = False +try: + import rpki.adns +except ImportError: + pass + +def supported_address_families(enable_ipv6): + """ + IP address families on which servers should listen, and to consider + when selecting addresses for client connections. + """ + if enable_ipv6 and have_ipv6: + return (socket.AF_INET, socket.AF_INET6) + else: + return (socket.AF_INET,) + +def localhost_addrinfo(): + """ + Return pseudo-getaddrinfo results for localhost. + """ + result = [(socket.AF_INET, "127.0.0.1")] + if enable_ipv6_clients and have_ipv6: + result.append((socket.AF_INET6, "::1")) + return result + +class http_message(object): + """ + Virtual class representing of one HTTP message. + """ + + software_name = "ISC RPKI library" + + def __init__(self, version = None, body = None, headers = None): + self.version = version + self.body = body + self.headers = headers + self.normalize_headers() + + def normalize_headers(self, headers = None): + """ + Clean up (some of) the horrible messes that HTTP allows in its + headers. + """ + if headers is None: + headers = () if self.headers is None else self.headers.items() + translate_underscore = True + else: + translate_underscore = False + result = {} + for k, v in headers: + if translate_underscore: + k = k.replace("_", "-") + k = "-".join(s.capitalize() for s in k.split("-")) + v = v.strip() + if k in result: + result[k] += ", " + v + else: + result[k] = v + self.headers = result + + @classmethod + def parse_from_wire(cls, headers): + """ + Parse and normalize an incoming HTTP message. + """ + self = cls() + headers = headers.split("\r\n") + self.parse_first_line(*headers.pop(0).split(None, 2)) + for i in xrange(len(headers) - 2, -1, -1): + if headers[i + 1][0].isspace(): + headers[i] += headers[i + 1] + del headers[i + 1] + self.normalize_headers([h.split(":", 1) for h in headers]) + return self + + def format(self): + """ + Format an outgoing HTTP message. + """ + s = self.format_first_line() + if self.body is not None: + assert isinstance(self.body, str) + self.headers["Content-Length"] = len(self.body) + for kv in self.headers.iteritems(): + s += "%s: %s\r\n" % kv + s += "\r\n" + if self.body is not None: + s += self.body + return s + + def __str__(self): + return self.format() + + def parse_version(self, version): + """ + Parse HTTP version, raise an exception if we can't. + """ + if version[:5] != "HTTP/": + raise rpki.exceptions.HTTPBadVersion, "Couldn't parse version %s" % version + self.version = tuple(int(i) for i in version[5:].split(".")) + + @property + def persistent(self): + """ + Figure out whether this HTTP message encourages a persistent connection. + """ + c = self.headers.get("Connection") + if self.version == (1, 1): + return c is None or "close" not in c.lower() + elif self.version == (1, 0): + return c is not None and "keep-alive" in c.lower() + else: + return False + +class http_request(http_message): + """ + HTTP request message. + """ + + def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): + assert cmd == "POST" or body is None + http_message.__init__(self, version = version, body = body, headers = headers) + self.cmd = cmd + self.path = path + self.callback = callback + self.errback = errback + self.retried = False + + def parse_first_line(self, cmd, path, version): + """ + Parse first line of HTTP request message. + """ + self.parse_version(version) + self.cmd = cmd + self.path = path + + def format_first_line(self): + """ + Format first line of HTTP request message, and set up the + User-Agent header. + """ + self.headers.setdefault("User-Agent", self.software_name) + return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) + + def __repr__(self): + return rpki.log.log_repr(self, self.cmd, self.path) + +class http_response(http_message): + """ + HTTP response message. + """ + + def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): + http_message.__init__(self, version = version, body = body, headers = headers) + self.code = code + self.reason = reason + + def parse_first_line(self, version, code, reason): + """ + Parse first line of HTTP response message. + """ + self.parse_version(version) + self.code = int(code) + self.reason = reason + + def format_first_line(self): + """ + Format first line of HTTP response message, and set up Date and + Server headers. + """ + self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) + self.headers.setdefault("Server", self.software_name) + return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) + + def __repr__(self): + return rpki.log.log_repr(self, self.code, self.reason) + +def log_method(self, msg, logger = rpki.log.debug): + """ + Logging method used in several different classes. + """ + assert isinstance(logger, rpki.log.logger) + if debug_http or logger is not rpki.log.debug: + logger("%r: %s" % (self, msg)) + +def addr_to_string(addr): + """ + Convert socket addr tuple to printable string. Assumes 2-element + tuple is IPv4, 4-element tuple is IPv6, throws TypeError for + anything else. + """ + + if len(addr) == 2: + return "%s:%d" % (addr[0], addr[1]) + if len(addr) == 4: + return "%s.%d" % (addr[0], addr[1]) + raise TypeError + +class http_stream(asynchat.async_chat): + """ + Virtual class representing an HTTP message stream. + """ + + log = log_method + + def __repr__(self): + status = ["connected"] if self.connected else [] + try: + status.append(addr_to_string(self.addr)) + except TypeError: + pass + return rpki.log.log_repr(self, *status) + + def __init__(self, sock = None): + asynchat.async_chat.__init__(self, sock) + self.buffer = [] + self.timer = rpki.async.timer(self.handle_timeout) + self.restart() + + def restart(self): + """ + (Re)start HTTP message parser, reset timer. + """ + assert not self.buffer + self.chunk_handler = None + self.set_terminator("\r\n\r\n") + self.update_timeout() + + def update_timeout(self): + """ + Put this stream's timer in known good state: set it to the + stream's timeout value if we're doing timeouts, otherwise clear + it. + """ + if self.timeout is not None: + self.log("Setting timeout %s" % self.timeout) + self.timer.set(self.timeout) + else: + self.log("Clearing timeout") + self.timer.cancel() + + def collect_incoming_data(self, data): + """ + Buffer incoming data from asynchat. + """ + self.buffer.append(data) + self.update_timeout() + + def get_buffer(self): + """ + Consume data buffered from asynchat. + """ + val = "".join(self.buffer) + self.buffer = [] + return val + + def found_terminator(self): + """ + Asynchat reported that it found whatever terminator we set, so + figure out what to do next. This can be messy, because we can be + in any of several different states: + + @li We might be handling chunked HTTP, in which case we have to + initialize the chunk decoder; + + @li We might have found the end of the message body, in which case + we can (finally) process it; or + + @li We might have just gotten to the end of the message headers, + in which case we have to parse them to figure out which of three + separate mechanisms (chunked, content-length, TCP close) is going + to tell us how to find the end of the message body. + """ + self.update_timeout() + if self.chunk_handler: + self.chunk_handler() + elif not isinstance(self.get_terminator(), str): + self.handle_body() + else: + self.msg = self.parse_type.parse_from_wire(self.get_buffer()) + if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower(): + self.msg.body = [] + self.chunk_handler = self.chunk_header + self.set_terminator("\r\n") + elif "Content-Length" in self.msg.headers: + self.set_terminator(int(self.msg.headers["Content-Length"])) + else: + self.handle_no_content_length() + + def chunk_header(self): + """ + Asynchat just handed us what should be the header of one chunk of + a chunked encoding stream. If this chunk has a body, set the + stream up to read it; otherwise, this is the last chunk, so start + the process of exiting the chunk decoder. + """ + n = int(self.get_buffer().partition(";")[0], 16) + self.log("Chunk length %s" % n) + if n: + self.chunk_handler = self.chunk_body + self.set_terminator(n) + else: + self.msg.body = "".join(self.msg.body) + self.chunk_handler = self.chunk_discard_trailer + + def chunk_body(self): + """ + Asynchat just handed us what should be the body of a chunk of the + body of a chunked message (sic). Save it, and prepare to move on + to the next chunk. + """ + self.log("Chunk body") + self.msg.body += self.buffer + self.buffer = [] + self.chunk_handler = self.chunk_discard_crlf + self.set_terminator("\r\n") + + def chunk_discard_crlf(self): + """ + Consume the CRLF that terminates a chunk, reinitialize chunk + decoder to be ready for the next chunk. + """ + self.log("Chunk CRLF") + s = self.get_buffer() + assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) + self.chunk_handler = self.chunk_header + + def chunk_discard_trailer(self): + """ + Consume chunk trailer, which should be empty, then (finally!) exit + the chunk decoder and hand complete message off to the application. + """ + self.log("Chunk trailer") + s = self.get_buffer() + assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) + self.chunk_handler = None + self.handle_message() + + def handle_body(self): + """ + Hand normal (not chunked) message off to the application. + """ + self.msg.body = self.get_buffer() + self.handle_message() + + def handle_error(self): + """ + Asynchat (or asyncore, or somebody) raised an exception. See + whether it's one we should just pass along, otherwise log a stack + trace and close the stream. + """ + self.timer.cancel() + etype = sys.exc_info()[0] + if etype in (SystemExit, rpki.async.ExitNow): + raise + rpki.log.traceback() + if etype is not rpki.exceptions.HTTPClientAborted: + self.log("Closing due to error", rpki.log.warn) + self.close() + + def handle_timeout(self): + """ + Inactivity timer expired, close connection with prejudice. + """ + self.log("Timeout, closing") + self.close() + + def handle_close(self): + """ + Wrapper around asynchat connection close handler, so that we can + log the event, cancel timer, and so forth. + """ + self.log("Close event in HTTP stream handler") + self.timer.cancel() + asynchat.async_chat.handle_close(self) + +class http_server(http_stream): + """ + HTTP server stream. + """ + + ## @var parse_type + # Stream parser should look for incoming HTTP request messages. + parse_type = http_request + + ## @var timeout + # Use the default server timeout value set in the module header. + timeout = default_server_timeout + + def __init__(self, sock, handlers): + self.handlers = handlers + http_stream.__init__(self, sock = sock) + self.expect_close = not want_persistent_server + self.log("Starting") + + def handle_no_content_length(self): + """ + Handle an incoming message that used neither chunking nor a + Content-Length header (that is: this message will be the last one + in this server stream). No special action required. + """ + self.handle_message() + + def find_handler(self, path): + """ + Helper method to search self.handlers. + """ + for s, h in self.handlers: + if path.startswith(s): + return h + return None + + def handle_message(self): + """ + HTTP layer managed to deliver a complete HTTP request to + us, figure out what to do with it. Check the command and + Content-Type, look for a handler, and if everything looks right, + pass the message body, path, and a reply callback to the handler. + """ + self.log("Received request %r" % self.msg) + if not self.msg.persistent: + self.expect_close = True + handler = self.find_handler(self.msg.path) + error = None + if self.msg.cmd != "POST": + error = 501, "No handler for method %s" % self.msg.cmd + elif self.msg.headers["Content-Type"] != rpki_content_type: + error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"] + elif handler is None: + error = 404, "No handler for URL %s" % self.msg.path + if error is None: + try: + handler(self.msg.body, self.msg.path, self.send_reply) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + rpki.log.traceback() + self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) + else: + self.send_error(code = error[0], reason = error[1]) + + def send_error(self, code, reason): + """ + Send an error response to this request. + """ + self.send_message(code = code, reason = reason) + + def send_reply(self, code, body = None, reason = "OK"): + """ + Send a reply to this request. + """ + self.send_message(code = code, body = body, reason = reason) + + def send_message(self, code, reason = "OK", body = None): + """ + Queue up reply message. If both parties agree that connection is + persistant, and if no error occurred, restart this stream to + listen for next message; otherwise, queue up a close event for + this stream so it will shut down once the reply has been sent. + """ + self.log("Sending response %s %s" % (code, reason)) + if code >= 400: + self.expect_close = True + msg = http_response(code = code, reason = reason, body = body, + Content_Type = rpki_content_type, + Connection = "Close" if self.expect_close else "Keep-Alive") + self.push(msg.format()) + if self.expect_close: + self.log("Closing") + self.timer.cancel() + self.close_when_done() + else: + self.log("Listening for next message") + self.restart() + +class http_listener(asyncore.dispatcher): + """ + Listener for incoming HTTP connections. + """ + + log = log_method + + def __repr__(self): + try: + status = (addr_to_string(self.addr),) + except TypeError: + status = () + return rpki.log.log_repr(self, *status) + + def __init__(self, handlers, addrinfo): + asyncore.dispatcher.__init__(self) + self.handlers = handlers + try: + af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612 + self.create_socket(af, socktype) + self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + except AttributeError: + pass + if have_ipv6 and af == socket.AF_INET6: + self.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) + self.bind(sockaddr) + self.listen(5) + except Exception, e: + self.log("Couldn't set up HTTP listener: %s" % e, rpki.log.warn) + rpki.log.traceback() + self.close() + for h in handlers: + self.log("Handling %s" % h[0]) + + def handle_accept(self): + """ + Asyncore says we have an incoming connection, spawn an http_server + stream for it and pass along all of our handler data. + """ + try: + s, c = self.accept() + self.log("Accepting connection from %s" % addr_to_string(c)) + http_server(sock = s, handlers = self.handlers) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + self.log("Unable to accept connection: %s" % e) + self.handle_error() + + def handle_error(self): + """ + Asyncore signaled an error, pass it along or log it. + """ + if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): + raise + self.log("Error in HTTP listener", rpki.log.warn) + rpki.log.traceback() + +class http_client(http_stream): + """ + HTTP client stream. + """ + + ## @var parse_type + # Stream parser should look for incoming HTTP response messages. + parse_type = http_response + + ## @var timeout + # Use the default client timeout value set in the module header. + timeout = default_client_timeout + + ## @var state + # Application layer connection state. + state = None + + def __init__(self, queue, hostport): + self.log("Creating new connection to %s" % addr_to_string(hostport)) + http_stream.__init__(self) + self.queue = queue + self.host = hostport[0] + self.port = hostport[1] + self.set_state("opening") + self.expect_close = not want_persistent_client + + def start(self): + """ + Create socket and request a connection. + """ + if not use_adns: + self.log("Not using ADNS") + self.gotaddrinfo([(socket.AF_INET, self.host)]) + elif self.host == "localhost": + self.log("Bypassing DNS for localhost") + self.gotaddrinfo(localhost_addrinfo()) + else: + families = supported_address_families(enable_ipv6_clients) + self.log("Starting ADNS lookup for %s in families %r" % (self.host, families)) + rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families) + + def dns_error(self, e): + """ + Handle DNS lookup errors. For now, just whack the connection. + Undoubtedly we should do something better with diagnostics here. + """ + self.handle_error() + + def gotaddrinfo(self, addrinfo): + """ + Got address data from DNS, create socket and request connection. + """ + try: + self.af, self.address = random.choice(addrinfo) + self.log("Connecting to AF %s host %s port %s addr %s" % (self.af, self.host, self.port, self.address)) + self.create_socket(self.af, socket.SOCK_STREAM) + self.connect((self.address, self.port)) + if self.addr is None: + self.addr = (self.host, self.port) + self.update_timeout() + except (rpki.async.ExitNow, SystemExit): + raise + except Exception: + self.handle_error() + + def handle_connect(self): + """ + Asyncore says socket has connected. + """ + self.log("Socket connected") + self.set_state("idle") + assert self.queue.client is self + self.queue.send_request() + + def set_state(self, state): + """ + Set HTTP client connection state. + """ + self.log("State transition %s => %s" % (self.state, state)) + self.state = state + + def handle_no_content_length(self): + """ + Handle response message that used neither chunking nor a + Content-Length header (that is: this message will be the last one + in this server stream). In this case we want to read until we + reach the end of the data stream. + """ + self.set_terminator(None) + + def send_request(self, msg): + """ + Queue up request message and kickstart connection. + """ + self.log("Sending request %r" % msg) + assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) + self.set_state("request-sent") + msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" + self.push(msg.format()) + self.restart() + + def handle_message(self): + """ + Handle incoming HTTP response message. Make sure we're in a state + where we expect to see such a message (and allow the mysterious + empty messages that Apache sends during connection close, no idea + what that is supposed to be about). If everybody agrees that the + connection should stay open, put it into an idle state; otherwise, + arrange for the stream to shut down. + """ + + self.log("Message received, state %s" % self.state) + + if not self.msg.persistent: + self.expect_close = True + + if self.state != "request-sent": + if self.state == "closing": + assert not self.msg.body + self.log("Ignoring empty response received while closing") + return + raise rpki.exceptions.HTTPUnexpectedState, "%r received message while in unexpected state %s" % (self, self.state) + + if self.expect_close: + self.log("Closing") + self.set_state("closing") + self.close_when_done() + else: + self.log("Idling") + self.set_state("idle") + self.update_timeout() + + if self.msg.code != 200: + errmsg = "HTTP request failed" + if self.msg.code is not None: + errmsg += " with status %s" % self.msg.code + if self.msg.reason: + errmsg += ", reason %s" % self.msg.reason + if self.msg.body: + errmsg += ", response %s" % self.msg.body + raise rpki.exceptions.HTTPRequestFailed(errmsg) + self.queue.return_result(self, self.msg, detach = self.expect_close) + + def handle_close(self): + """ + Asyncore signaled connection close. If we were waiting for that + to find the end of a response message, process the resulting + message now; if we were waiting for the response to a request we + sent, signal the error. + """ + http_stream.handle_close(self) + self.log("State %s" % self.state) + if self.get_terminator() is None: + self.handle_body() + elif self.state == "request-sent": + raise rpki.exceptions.HTTPClientAborted, "HTTP request aborted by close event" + else: + self.queue.detach(self) + + def handle_timeout(self): + """ + Connection idle timer has expired. Shut down connection in any + case, noisily if we weren't idle. + """ + bad = self.state not in ("idle", "closing") + if bad: + self.log("Timeout while in state %s" % self.state, rpki.log.warn) + http_stream.handle_timeout(self) + if bad: + try: + raise rpki.exceptions.HTTPTimeout + except: # pylint: disable=W0702 + self.handle_error() + else: + self.queue.detach(self) + + def handle_error(self): + """ + Asyncore says something threw an exception. Log it, then shut + down the connection and pass back the exception. + """ + eclass, edata = sys.exc_info()[0:2] + self.log("Error on HTTP client connection %s:%s %s %s" % (self.host, self.port, eclass, edata), rpki.log.warn) + http_stream.handle_error(self) + self.queue.return_result(self, edata, detach = True) + +class http_queue(object): + """ + Queue of pending HTTP requests for a single destination. This class + is very tightly coupled to http_client; http_client handles the HTTP + stream itself, this class provides a slightly higher-level API. + """ + + log = log_method + + def __repr__(self): + return rpki.log.log_repr(self, addr_to_string(self.hostport)) + + def __init__(self, hostport): + self.hostport = hostport + self.client = None + self.log("Created") + self.queue = [] + + def request(self, *requests): + """ + Append http_request object(s) to this queue. + """ + self.log("Adding requests %r" % requests) + self.queue.extend(requests) + + def restart(self): + """ + Send next request for this queue, if we can. This may involve + starting a new http_client stream, reusing an existing idle + stream, or just ignoring this request if there's an active client + stream already; in the last case, handling of the response (or + exception, or timeout) for the query currently in progress will + call this method when it's time to kick out the next query. + """ + try: + if self.client is None: + self.client = http_client(self, self.hostport) + self.log("Attached client %r" % self.client) + self.client.start() + elif self.client.state == "idle": + self.log("Sending request to existing client %r" % self.client) + self.send_request() + else: + self.log("Client %r exists in state %r" % (self.client, self.client.state)) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + self.return_result(self.client, e, detach = True) + + def send_request(self): + """ + Kick out the next query in this queue, if any. + """ + if self.queue: + self.client.send_request(self.queue[0]) + + def detach(self, client_): + """ + Detatch a client from this queue. Silently ignores attempting to + detach a client that is not attached to this queue, to simplify + handling of what otherwise would be a nasty set of race + conditions. + """ + if client_ is self.client: + self.log("Detaching client %r" % client_) + self.client = None + + def return_result(self, client, result, detach = False): # pylint: disable=W0621 + """ + Client stream has returned a result, which we need to pass along + to the original caller. Result may be either an HTTP response + message or an exception. In either case, once we're done + processing this result, kick off next message in the queue, if any. + """ + + if client is not self.client: + self.log("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) + return + + if detach: + self.detach(client) + + try: + req = self.queue.pop(0) + self.log("Dequeuing request %r" % req) + except IndexError: + self.log("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r" % result, rpki.log.warn) + return + + assert isinstance(result, http_response) or isinstance(result, Exception) + + if isinstance(result, http_response): + try: + self.log("Returning result %r to caller" % result) + req.callback(result.body) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + result = e + + if isinstance(result, Exception): + try: + self.log("Returning exception %r to caller: %s" % (result, result), rpki.log.warn) + req.errback(result) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception: + # + # If we get here, we may have lost the event chain. Not + # obvious what we can do about it at this point, but force a + # traceback so that it will be somewhat obvious that something + # really bad happened. + # + self.log("Exception in exception callback", rpki.log.warn) + rpki.log.traceback(True) + + self.log("Queue: %r" % self.queue) + + if self.queue: + self.restart() + +## @var client_queues +# Map of (host, port) tuples to http_queue objects. +client_queues = {} + +def client(msg, url, callback, errback): + """ + Open client HTTP connection, send a message, set up callbacks to + handle response. + """ + + u = urlparse.urlparse(url) + + if (u.scheme not in ("", "http") or + u.username is not None or + u.password is not None or + u.params != "" or + u.query != "" or + u.fragment != ""): + raise rpki.exceptions.BadClientURL, "Unusable URL %s" % url + + if debug_http: + rpki.log.debug("Contacting %s" % url) + + request = http_request( + cmd = "POST", + path = u.path, + body = msg, + callback = callback, + errback = errback, + Host = u.hostname, + Content_Type = rpki_content_type) + + hostport = (u.hostname or "localhost", u.port or default_tcp_port) + + if debug_http: + rpki.log.debug("Created request %r for %s" % (request, addr_to_string(hostport))) + if hostport not in client_queues: + client_queues[hostport] = http_queue(hostport) + client_queues[hostport].request(request) + + # Defer connection attempt until after we've had time to process any + # pending I/O events, in case connections have closed. + + if debug_http: + rpki.log.debug("Scheduling connection startup for %r" % request) + rpki.async.event_defer(client_queues[hostport].restart) + +def server(handlers, port, host = ""): + """ + Run an HTTP server and wait (forever) for connections. + """ + + if not isinstance(handlers, (tuple, list)): + handlers = (("/", handlers),) + + # Yes, this is sick. So is getaddrinfo() returning duplicate + # records, which RedHat has the gall to claim is a feature. + ai = [] + for af in supported_address_families(enable_ipv6_servers): + try: + if host: + h = host + elif have_ipv6 and af == socket.AF_INET6: + h = "::" + else: + h = "0.0.0.0" + for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM): + if a not in ai: + ai.append(a) + except socket.gaierror: + pass + + for a in ai: + http_listener(addrinfo = a, handlers = handlers) + + rpki.async.event_loop() + +class caller(object): + """ + Handle client-side mechanics for protocols based on HTTP, CMS, and + rpki.xml_utils. Calling sequence is intended to nest within + rpki.async.sync_wrapper. + """ + + debug = False + + def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None): + self.proto = proto + self.client_key = client_key + self.client_cert = client_cert + self.server_ta = server_ta + self.server_cert = server_cert + self.url = url + self.cms_timestamp = None + if debug is not None: + self.debug = debug + + def __call__(self, cb, eb, *pdus): + + def done(r_der): + """ + Handle CMS-wrapped XML response message. + """ + try: + r_cms = self.proto.cms_msg(DER = r_der) + r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) + self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url) + if self.debug: + print "<!-- Reply -->" + print r_cms.pretty_print_content() + cb(r_msg) + except (rpki.async.ExitNow, SystemExit): + raise + except Exception, e: + eb(e) + + q_msg = self.proto.msg.query(*pdus) + q_cms = self.proto.cms_msg() + q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert) + if self.debug: + print "<!-- Query -->" + print q_cms.pretty_print_content() + + client(url = self.url, msg = q_der, callback = done, errback = eb) |