diff options
Diffstat (limited to 'rpki/http.py')
-rw-r--r-- | rpki/http.py | 1058 |
1 files changed, 0 insertions, 1058 deletions
diff --git a/rpki/http.py b/rpki/http.py deleted file mode 100644 index 71239c7f..00000000 --- a/rpki/http.py +++ /dev/null @@ -1,1058 +0,0 @@ -# $Id$ -# -# Copyright (C) 2013--2014 Dragon Research Labs ("DRL") -# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC") -# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN") -# -# Permission to use, copy, modify, and distribute this software for any -# purpose with or without fee is hereby granted, provided that the above -# copyright notices and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL -# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL, -# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR -# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS -# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, -# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION -# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -""" -HTTP utilities, both client and server. -""" - -import time -import socket -import asyncore -import asynchat -import urlparse -import sys -import random -import logging -import rpki.async -import rpki.sundial -import rpki.x509 -import rpki.exceptions -import rpki.log -import rpki.POW - -logger = logging.getLogger(__name__) - -## @var default_content_type -# HTTP content type used for RPKI messages. -# Can be overriden on a per-client or per-server basis. -default_content_type = "application/x-rpki" - -## @var want_persistent_client -# Whether we want persistent HTTP client streams, when server also supports them. -want_persistent_client = False - -## @var want_persistent_server -# Whether we want persistent HTTP server streams, when client also supports them. -want_persistent_server = False - -## @var default_client_timeout -# Default HTTP client connection timeout. -default_client_timeout = rpki.sundial.timedelta(minutes = 5) - -## @var default_server_timeout -# Default HTTP server connection timeouts. Given our druthers, we'd -# prefer that the client close the connection, as this avoids the -# problem of client starting to reuse connection just as server closes -# it, so this should be longer than the client timeout. -default_server_timeout = rpki.sundial.timedelta(minutes = 10) - -## @var default_http_version -# Preferred HTTP version. -default_http_version = (1, 0) - -## @var default_tcp_port -# Default port for clients and servers that don't specify one. -default_tcp_port = 80 - -## @var enable_ipv6_servers -# Whether to enable IPv6 listeners. Enabled by default, as it should -# be harmless. Has no effect if kernel doesn't support IPv6. -enable_ipv6_servers = True - -## @var enable_ipv6_clients -# Whether to consider IPv6 addresses when making connections. -# Disabled by default, as IPv6 connectivity is still a bad joke in -# far too much of the world. -enable_ipv6_clients = False - -## @var have_ipv6 -# Whether the current machine claims to support IPv6. Note that just -# because the kernel supports it doesn't mean that the machine has -# usable IPv6 connectivity. I don't know of a simple portable way to -# probe for connectivity at runtime (the old test of "can you ping -# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set -# automatically by probing using the socket() system call at runtime. -try: - # pylint: disable=W0702,W0104 - socket.socket(socket.AF_INET6).close() - socket.IPPROTO_IPV6 - socket.IPV6_V6ONLY -except: - have_ipv6 = False -else: - have_ipv6 = True - -## @var use_adns - -# Whether to use rpki.adns code. This is still experimental, so it's -# not (yet) enabled by default. -use_adns = False -try: - import rpki.adns -except ImportError: - pass - -def supported_address_families(enable_ipv6): - """ - IP address families on which servers should listen, and to consider - when selecting addresses for client connections. - """ - if enable_ipv6 and have_ipv6: - return (socket.AF_INET, socket.AF_INET6) - else: - return (socket.AF_INET,) - -def localhost_addrinfo(): - """ - Return pseudo-getaddrinfo results for localhost. - """ - result = [(socket.AF_INET, "127.0.0.1")] - if enable_ipv6_clients and have_ipv6: - result.append((socket.AF_INET6, "::1")) - return result - -class http_message(object): - """ - Virtual class representing of one HTTP message. - """ - - software_name = "ISC RPKI library" - - def __init__(self, version = None, body = None, headers = None): - self.version = version - self.body = body - self.headers = headers - self.normalize_headers() - - def normalize_headers(self, headers = None): - """ - Clean up (some of) the horrible messes that HTTP allows in its - headers. - """ - if headers is None: - headers = () if self.headers is None else self.headers.items() - translate_underscore = True - else: - translate_underscore = False - result = {} - for k, v in headers: - if translate_underscore: - k = k.replace("_", "-") - k = "-".join(s.capitalize() for s in k.split("-")) - v = v.strip() - if k in result: - result[k] += ", " + v - else: - result[k] = v - self.headers = result - - @classmethod - def parse_from_wire(cls, headers): - """ - Parse and normalize an incoming HTTP message. - """ - self = cls() - headers = headers.split("\r\n") - self.parse_first_line(*headers.pop(0).split(None, 2)) - for i in xrange(len(headers) - 2, -1, -1): - if headers[i + 1][0].isspace(): - headers[i] += headers[i + 1] - del headers[i + 1] - self.normalize_headers([h.split(":", 1) for h in headers]) - return self - - def format(self): - """ - Format an outgoing HTTP message. - """ - s = self.format_first_line() - if self.body is not None: - assert isinstance(self.body, str) - self.headers["Content-Length"] = len(self.body) - for kv in self.headers.iteritems(): - s += "%s: %s\r\n" % kv - s += "\r\n" - if self.body is not None: - s += self.body - return s - - def __str__(self): - return self.format() - - def parse_version(self, version): - """ - Parse HTTP version, raise an exception if we can't. - """ - if version[:5] != "HTTP/": - raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version) - self.version = tuple(int(i) for i in version[5:].split(".")) - - @property - def persistent(self): - """ - Figure out whether this HTTP message encourages a persistent connection. - """ - c = self.headers.get("Connection") - if self.version == (1, 1): - return c is None or "close" not in c.lower() - elif self.version == (1, 0): - return c is not None and "keep-alive" in c.lower() - else: - return False - -class http_request(http_message): - """ - HTTP request message. - """ - - def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers): - assert cmd == "POST" or body is None - http_message.__init__(self, version = version, body = body, headers = headers) - self.cmd = cmd - self.path = path - self.callback = callback - self.errback = errback - self.retried = False - - def parse_first_line(self, cmd, path, version): - """ - Parse first line of HTTP request message. - """ - self.parse_version(version) - self.cmd = cmd - self.path = path - - def format_first_line(self): - """ - Format first line of HTTP request message, and set up the - User-Agent header. - """ - self.headers.setdefault("User-Agent", self.software_name) - return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) - - def __repr__(self): - return rpki.log.log_repr(self, self.cmd, self.path) - -class http_response(http_message): - """ - HTTP response message. - """ - - def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers): - http_message.__init__(self, version = version, body = body, headers = headers) - self.code = code - self.reason = reason - - def parse_first_line(self, version, code, reason): - """ - Parse first line of HTTP response message. - """ - self.parse_version(version) - self.code = int(code) - self.reason = reason - - def format_first_line(self): - """ - Format first line of HTTP response message, and set up Date and - Server headers. - """ - self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) - self.headers.setdefault("Server", self.software_name) - return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) - - def __repr__(self): - return rpki.log.log_repr(self, self.code, self.reason) - -def addr_to_string(addr): - """ - Convert socket addr tuple to printable string. Assumes 2-element - tuple is IPv4, 4-element tuple is IPv6, throws TypeError for - anything else. - """ - - if len(addr) == 2: - return "%s:%d" % (addr[0], addr[1]) - if len(addr) == 4: - return "%s.%d" % (addr[0], addr[1]) - raise TypeError - -@rpki.log.class_logger(logger) -class http_stream(asynchat.async_chat): - """ - Virtual class representing an HTTP message stream. - """ - - # Keep pylint happy; @class_logger overwrites this. - logger = None - - def __repr__(self): - status = ["connected"] if self.connected else [] - try: - status.append(addr_to_string(self.addr)) - except TypeError: - pass - return rpki.log.log_repr(self, *status) - - def __init__(self, sock = None): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - asynchat.async_chat.__init__(self, sock) - self.buffer = [] - self.timer = rpki.async.timer(self.handle_timeout) - self.restart() - - def restart(self): - """ - (Re)start HTTP message parser, reset timer. - """ - assert not self.buffer - self.chunk_handler = None - self.set_terminator("\r\n\r\n") - self.update_timeout() - - def update_timeout(self): - """ - Put this stream's timer in known good state: set it to the - stream's timeout value if we're doing timeouts, otherwise clear - it. - """ - if self.timeout is not None: - self.logger.debug("Setting timeout %s", self.timeout) - self.timer.set(self.timeout) - else: - self.logger.debug("Clearing timeout") - self.timer.cancel() - - def collect_incoming_data(self, data): - """ - Buffer incoming data from asynchat. - """ - self.buffer.append(data) - self.update_timeout() - - def get_buffer(self): - """ - Consume data buffered from asynchat. - """ - val = "".join(self.buffer) - self.buffer = [] - return val - - def found_terminator(self): - """ - Asynchat reported that it found whatever terminator we set, so - figure out what to do next. This can be messy, because we can be - in any of several different states: - - @li We might be handling chunked HTTP, in which case we have to - initialize the chunk decoder; - - @li We might have found the end of the message body, in which case - we can (finally) process it; or - - @li We might have just gotten to the end of the message headers, - in which case we have to parse them to figure out which of three - separate mechanisms (chunked, content-length, TCP close) is going - to tell us how to find the end of the message body. - """ - self.update_timeout() - if self.chunk_handler: - self.chunk_handler() - elif not isinstance(self.get_terminator(), str): - self.handle_body() - else: - self.msg = self.parse_type.parse_from_wire(self.get_buffer()) - if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower(): - self.msg.body = [] - self.chunk_handler = self.chunk_header - self.set_terminator("\r\n") - elif "Content-Length" in self.msg.headers: - self.set_terminator(int(self.msg.headers["Content-Length"])) - else: - self.handle_no_content_length() - - def chunk_header(self): - """ - Asynchat just handed us what should be the header of one chunk of - a chunked encoding stream. If this chunk has a body, set the - stream up to read it; otherwise, this is the last chunk, so start - the process of exiting the chunk decoder. - """ - n = int(self.get_buffer().partition(";")[0], 16) - self.logger.debug("Chunk length %s", n) - if n: - self.chunk_handler = self.chunk_body - self.set_terminator(n) - else: - self.msg.body = "".join(self.msg.body) - self.chunk_handler = self.chunk_discard_trailer - - def chunk_body(self): - """ - Asynchat just handed us what should be the body of a chunk of the - body of a chunked message (sic). Save it, and prepare to move on - to the next chunk. - """ - self.logger.debug("Chunk body") - self.msg.body += self.buffer - self.buffer = [] - self.chunk_handler = self.chunk_discard_crlf - self.set_terminator("\r\n") - - def chunk_discard_crlf(self): - """ - Consume the CRLF that terminates a chunk, reinitialize chunk - decoder to be ready for the next chunk. - """ - self.logger.debug("Chunk CRLF") - s = self.get_buffer() - assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) - self.chunk_handler = self.chunk_header - - def chunk_discard_trailer(self): - """ - Consume chunk trailer, which should be empty, then (finally!) exit - the chunk decoder and hand complete message off to the application. - """ - self.logger.debug("Chunk trailer") - s = self.get_buffer() - assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) - self.chunk_handler = None - self.handle_message() - - def handle_body(self): - """ - Hand normal (not chunked) message off to the application. - """ - self.msg.body = self.get_buffer() - self.handle_message() - - def handle_error(self): - """ - Asynchat (or asyncore, or somebody) raised an exception. See - whether it's one we should just pass along, otherwise log a stack - trace and close the stream. - """ - self.timer.cancel() - etype = sys.exc_info()[0] - if etype in (SystemExit, rpki.async.ExitNow): - raise - if etype is not rpki.exceptions.HTTPClientAborted: - self.logger.exception("Closing due to error") - self.close() - - def handle_timeout(self): - """ - Inactivity timer expired, close connection with prejudice. - """ - self.logger.debug("Timeout, closing") - self.close() - - def handle_close(self): - """ - Wrapper around asynchat connection close handler, so that we can - log the event, cancel timer, and so forth. - """ - self.logger.debug("Close event in HTTP stream handler") - self.timer.cancel() - asynchat.async_chat.handle_close(self) - -@rpki.log.class_logger(logger) -class http_server(http_stream): - """ - HTTP server stream. - """ - - ## @var parse_type - # Stream parser should look for incoming HTTP request messages. - parse_type = http_request - - ## @var timeout - # Use the default server timeout value set in the module header. - timeout = default_server_timeout - - def __init__(self, sock, handlers): - self.handlers = handlers - self.received_content_type = None - http_stream.__init__(self, sock = sock) - self.expect_close = not want_persistent_server - self.logger.debug("Starting") - - def handle_no_content_length(self): - """ - Handle an incoming message that used neither chunking nor a - Content-Length header (that is: this message will be the last one - in this server stream). No special action required. - """ - self.handle_message() - - def find_handler(self, path): - """ - Helper method to search self.handlers. - """ - for h in self.handlers: - if path.startswith(h[0]): - return h[1], h[2] if len(h) > 2 else (default_content_type,) - return None, None - - def handle_message(self): - """ - HTTP layer managed to deliver a complete HTTP request to - us, figure out what to do with it. Check the command and - Content-Type, look for a handler, and if everything looks right, - pass the message body, path, and a reply callback to the handler. - """ - self.logger.debug("Received request %r", self.msg) - if not self.msg.persistent: - self.expect_close = True - handler, allowed_content_types = self.find_handler(self.msg.path) - self.received_content_type = self.msg.headers["Content-Type"] - error = None - if self.msg.cmd != "POST": - error = 501, "No handler for method %s" % self.msg.cmd - elif self.received_content_type not in allowed_content_types: - error = 415, "No handler for Content-Type %s" % self.received_content_type - elif handler is None: - error = 404, "No handler for URL %s" % self.msg.path - if error is None: - try: - handler(self.msg.body, self.msg.path, self.send_reply) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - self.logger.exception("Unhandled exception while handling HTTP request") - self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e)) - else: - self.send_error(code = error[0], reason = error[1]) - - def send_error(self, code, reason): - """ - Send an error response to this request. - """ - self.send_message(code = code, reason = reason) - - def send_reply(self, code, body = None, reason = "OK"): - """ - Send a reply to this request. - """ - self.send_message(code = code, body = body, reason = reason) - - def send_message(self, code, reason = "OK", body = None): - """ - Queue up reply message. If both parties agree that connection is - persistant, and if no error occurred, restart this stream to - listen for next message; otherwise, queue up a close event for - this stream so it will shut down once the reply has been sent. - """ - self.logger.debug("Sending response %s %s", code, reason) - if code >= 400: - self.expect_close = True - msg = http_response(code = code, reason = reason, body = body, - Content_Type = self.received_content_type, - Connection = "Close" if self.expect_close else "Keep-Alive") - self.push(msg.format()) - if self.expect_close: - self.logger.debug("Closing") - self.timer.cancel() - self.close_when_done() - else: - self.logger.debug("Listening for next message") - self.restart() - -@rpki.log.class_logger(logger) -class http_listener(asyncore.dispatcher): - """ - Listener for incoming HTTP connections. - """ - - def __repr__(self): - try: - status = (addr_to_string(self.addr),) - except TypeError: - status = () - return rpki.log.log_repr(self, *status) - - def __init__(self, handlers, addrinfo): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - asyncore.dispatcher.__init__(self) - self.handlers = handlers - try: - af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612 - self.create_socket(af, socktype) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - except AttributeError: - pass - if have_ipv6 and af == socket.AF_INET6: - self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) - self.bind(sockaddr) - self.listen(5) - except Exception: - self.logger.exception("Couldn't set up HTTP listener") - self.close() - for h in handlers: - self.logger.debug("Handling %s", h[0]) - - def handle_accept(self): - """ - Asyncore says we have an incoming connection, spawn an http_server - stream for it and pass along all of our handler data. - """ - try: - res = self.accept() - if res is None: - raise - sock, addr = res # pylint: disable=W0633 - self.logger.debug("Accepting connection from %s", addr_to_string(addr)) - http_server(sock = sock, handlers = self.handlers) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.logger.exception("Unable to accept connection") - - def handle_error(self): - """ - Asyncore signaled an error, pass it along or log it. - """ - if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): - raise - self.logger.exception("Error in HTTP listener") - -@rpki.log.class_logger(logger) -class http_client(http_stream): - """ - HTTP client stream. - """ - - ## @var parse_type - # Stream parser should look for incoming HTTP response messages. - parse_type = http_response - - ## @var timeout - # Use the default client timeout value set in the module header. - timeout = default_client_timeout - - ## @var state - # Application layer connection state. - state = None - - def __init__(self, queue, hostport): - http_stream.__init__(self) - self.logger.debug("Creating new connection to %s", addr_to_string(hostport)) - self.queue = queue - self.host = hostport[0] - self.port = hostport[1] - self.set_state("opening") - self.expect_close = not want_persistent_client - - def start(self): - """ - Create socket and request a connection. - """ - if not use_adns: - self.logger.debug("Not using ADNS") - self.gotaddrinfo([(socket.AF_INET, self.host)]) - elif self.host == "localhost": - self.logger.debug("Bypassing DNS for localhost") - self.gotaddrinfo(localhost_addrinfo()) - else: - families = supported_address_families(enable_ipv6_clients) - self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families) - rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families) - - def dns_error(self, e): - """ - Handle DNS lookup errors. For now, just whack the connection. - Undoubtedly we should do something better with diagnostics here. - """ - self.handle_error() - - def gotaddrinfo(self, addrinfo): - """ - Got address data from DNS, create socket and request connection. - """ - try: - self.af, self.address = random.choice(addrinfo) - self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address) - self.create_socket(self.af, socket.SOCK_STREAM) - self.connect((self.address, self.port)) - if self.addr is None: - self.addr = (self.host, self.port) - self.update_timeout() - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.handle_error() - - def handle_connect(self): - """ - Asyncore says socket has connected. - """ - self.logger.debug("Socket connected") - self.set_state("idle") - assert self.queue.client is self - self.queue.send_request() - - def set_state(self, state): - """ - Set HTTP client connection state. - """ - self.logger.debug("State transition %s => %s", self.state, state) - self.state = state - - def handle_no_content_length(self): - """ - Handle response message that used neither chunking nor a - Content-Length header (that is: this message will be the last one - in this server stream). In this case we want to read until we - reach the end of the data stream. - """ - self.set_terminator(None) - - def send_request(self, msg): - """ - Queue up request message and kickstart connection. - """ - self.logger.debug("Sending request %r", msg) - assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) - self.set_state("request-sent") - msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive" - self.push(msg.format()) - self.restart() - - def handle_message(self): - """ - Handle incoming HTTP response message. Make sure we're in a state - where we expect to see such a message (and allow the mysterious - empty messages that Apache sends during connection close, no idea - what that is supposed to be about). If everybody agrees that the - connection should stay open, put it into an idle state; otherwise, - arrange for the stream to shut down. - """ - - self.logger.debug("Message received, state %s", self.state) - - if not self.msg.persistent: - self.expect_close = True - - if self.state != "request-sent": - if self.state == "closing": - assert not self.msg.body - self.logger.debug("Ignoring empty response received while closing") - return - raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state)) - - if self.expect_close: - self.logger.debug("Closing") - self.set_state("closing") - self.close_when_done() - else: - self.logger.debug("Idling") - self.set_state("idle") - self.update_timeout() - - if self.msg.code != 200: - errmsg = "HTTP request failed" - if self.msg.code is not None: - errmsg += " with status %s" % self.msg.code - if self.msg.reason: - errmsg += ", reason %s" % self.msg.reason - if self.msg.body: - errmsg += ", response %s" % self.msg.body - raise rpki.exceptions.HTTPRequestFailed(errmsg) - self.queue.return_result(self, self.msg, detach = self.expect_close) - - def handle_close(self): - """ - Asyncore signaled connection close. If we were waiting for that - to find the end of a response message, process the resulting - message now; if we were waiting for the response to a request we - sent, signal the error. - """ - http_stream.handle_close(self) - self.logger.debug("State %s", self.state) - if self.get_terminator() is None: - self.handle_body() - elif self.state == "request-sent": - raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event") - else: - self.queue.detach(self) - - def handle_timeout(self): - """ - Connection idle timer has expired. Shut down connection in any - case, noisily if we weren't idle. - """ - bad = self.state not in ("idle", "closing") - if bad: - self.logger.warning("Timeout while in state %s", self.state) - http_stream.handle_timeout(self) - if bad: - try: - raise rpki.exceptions.HTTPTimeout - except: # pylint: disable=W0702 - self.handle_error() - else: - self.queue.detach(self) - - def handle_error(self): - """ - Asyncore says something threw an exception. Log it, then shut - down the connection and pass back the exception. - """ - eclass, edata = sys.exc_info()[0:2] - self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata) - http_stream.handle_error(self) - self.queue.return_result(self, edata, detach = True) - -@rpki.log.class_logger(logger) -class http_queue(object): - """ - Queue of pending HTTP requests for a single destination. This class - is very tightly coupled to http_client; http_client handles the HTTP - stream itself, this class provides a slightly higher-level API. - """ - - def __repr__(self): - return rpki.log.log_repr(self, addr_to_string(self.hostport)) - - def __init__(self, hostport): - self.logger = logging.LoggerAdapter(self.logger, dict(context = self)) - self.hostport = hostport - self.client = None - self.logger.debug("Created") - self.queue = [] - - def request(self, *requests): - """ - Append http_request object(s) to this queue. - """ - self.logger.debug("Adding requests %r", requests) - self.queue.extend(requests) - - def restart(self): - """ - Send next request for this queue, if we can. This may involve - starting a new http_client stream, reusing an existing idle - stream, or just ignoring this request if there's an active client - stream already; in the last case, handling of the response (or - exception, or timeout) for the query currently in progress will - call this method when it's time to kick out the next query. - """ - try: - if self.client is None: - self.client = http_client(self, self.hostport) - self.logger.debug("Attached client %r", self.client) - self.client.start() - elif self.client.state == "idle": - self.logger.debug("Sending request to existing client %r", self.client) - self.send_request() - else: - self.logger.debug("Client %r exists in state %r", self.client, self.client.state) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - self.return_result(self.client, e, detach = True) - - def send_request(self): - """ - Kick out the next query in this queue, if any. - """ - if self.queue: - self.client.send_request(self.queue[0]) - - def detach(self, client_): - """ - Detatch a client from this queue. Silently ignores attempting to - detach a client that is not attached to this queue, to simplify - handling of what otherwise would be a nasty set of race - conditions. - """ - if client_ is self.client: - self.logger.debug("Detaching client %r", client_) - self.client = None - - def return_result(self, client, result, detach = False): # pylint: disable=W0621 - """ - Client stream has returned a result, which we need to pass along - to the original caller. Result may be either an HTTP response - message or an exception. In either case, once we're done - processing this result, kick off next message in the queue, if any. - """ - - if client is not self.client: - self.logger.warning("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r", result) - return - - if detach: - self.detach(client) - - try: - req = self.queue.pop(0) - self.logger.debug("Dequeuing request %r", req) - except IndexError: - self.logger.warning("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r", result) - return - - assert isinstance(result, http_response) or isinstance(result, Exception) - - if isinstance(result, http_response): - try: - self.logger.debug("Returning result %r to caller", result) - req.callback(result.body) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - result = e - - if isinstance(result, Exception): - try: - self.logger.warning("Returning exception %r to caller: %s", result, result) - req.errback(result) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception: - self.logger.exception("Exception in exception callback, may have lost event chain") - - self.logger.debug("Queue: %r", self.queue) - - if self.queue: - self.restart() - -## @var client_queues -# Map of (host, port) tuples to http_queue objects. -client_queues = {} - -def client(msg, url, callback, errback, content_type = default_content_type): - """ - Open client HTTP connection, send a message, set up callbacks to - handle response. - """ - - u = urlparse.urlparse(url) - - if (u.scheme not in ("", "http") or - u.username is not None or - u.password is not None or - u.params != "" or - u.query != "" or - u.fragment != ""): - raise rpki.exceptions.BadClientURL("Unusable URL %s" % url) - - logger.debug("Contacting %s", url) - - request = http_request( - cmd = "POST", - path = u.path, - body = msg, - callback = callback, - errback = errback, - Host = u.hostname, - Content_Type = content_type) - - hostport = (u.hostname or "localhost", u.port or default_tcp_port) - - logger.debug("Created request %r for %s", request, addr_to_string(hostport)) - if hostport not in client_queues: - client_queues[hostport] = http_queue(hostport) - client_queues[hostport].request(request) - - # Defer connection attempt until after we've had time to process any - # pending I/O events, in case connections have closed. - - logger.debug("Scheduling connection startup for %r", request) - rpki.async.event_defer(client_queues[hostport].restart) - -def server(handlers, port, host = ""): - """ - Run an HTTP server and wait (forever) for connections. - """ - - if not isinstance(handlers, (tuple, list)): - handlers = (("/", handlers),) - - # Yes, this is sick. So is getaddrinfo() returning duplicate - # records, which RedHat has the gall to claim is a feature. - ai = [] - for af in supported_address_families(enable_ipv6_servers): - try: - if host: - h = host - elif have_ipv6 and af == socket.AF_INET6: - h = "::" - else: - h = "0.0.0.0" - for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM): - if a not in ai: - ai.append(a) - except socket.gaierror: - pass - - for a in ai: - http_listener(addrinfo = a, handlers = handlers) - - rpki.async.event_loop() - -class caller(object): - """ - Handle client-side mechanics for protocols based on HTTP, CMS, and - rpki.xml_utils. Calling sequence is intended to nest within - rpki.async.sync_wrapper. - """ - - debug = False - - def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None): - self.proto = proto - self.client_key = client_key - self.client_cert = client_cert - self.server_ta = server_ta - self.server_cert = server_cert - self.url = url - self.cms_timestamp = None - if debug is not None: - self.debug = debug - - def __call__(self, cb, eb, *pdus): - - def done(r_der): - """ - Handle CMS-wrapped XML response message. - """ - try: - r_cms = self.proto.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) - self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url) - if self.debug: - print "<!-- Reply -->" - print r_cms.pretty_print_content() - cb(r_msg) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - eb(e) - - q_msg = self.proto.msg.query(*pdus) - q_cms = self.proto.cms_msg() - q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert) - if self.debug: - print "<!-- Query -->" - print q_cms.pretty_print_content() - - client(url = self.url, msg = q_der, callback = done, errback = eb) |