aboutsummaryrefslogtreecommitdiff
path: root/rpki/http.py
diff options
context:
space:
mode:
Diffstat (limited to 'rpki/http.py')
-rw-r--r--rpki/http.py1058
1 files changed, 0 insertions, 1058 deletions
diff --git a/rpki/http.py b/rpki/http.py
deleted file mode 100644
index 71239c7f..00000000
--- a/rpki/http.py
+++ /dev/null
@@ -1,1058 +0,0 @@
-# $Id$
-#
-# Copyright (C) 2013--2014 Dragon Research Labs ("DRL")
-# Portions copyright (C) 2009--2012 Internet Systems Consortium ("ISC")
-# Portions copyright (C) 2007--2008 American Registry for Internet Numbers ("ARIN")
-#
-# Permission to use, copy, modify, and distribute this software for any
-# purpose with or without fee is hereby granted, provided that the above
-# copyright notices and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND DRL, ISC, AND ARIN DISCLAIM ALL
-# WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL DRL,
-# ISC, OR ARIN BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
-# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
-# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
-# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-"""
-HTTP utilities, both client and server.
-"""
-
-import time
-import socket
-import asyncore
-import asynchat
-import urlparse
-import sys
-import random
-import logging
-import rpki.async
-import rpki.sundial
-import rpki.x509
-import rpki.exceptions
-import rpki.log
-import rpki.POW
-
-logger = logging.getLogger(__name__)
-
-## @var default_content_type
-# HTTP content type used for RPKI messages.
-# Can be overriden on a per-client or per-server basis.
-default_content_type = "application/x-rpki"
-
-## @var want_persistent_client
-# Whether we want persistent HTTP client streams, when server also supports them.
-want_persistent_client = False
-
-## @var want_persistent_server
-# Whether we want persistent HTTP server streams, when client also supports them.
-want_persistent_server = False
-
-## @var default_client_timeout
-# Default HTTP client connection timeout.
-default_client_timeout = rpki.sundial.timedelta(minutes = 5)
-
-## @var default_server_timeout
-# Default HTTP server connection timeouts. Given our druthers, we'd
-# prefer that the client close the connection, as this avoids the
-# problem of client starting to reuse connection just as server closes
-# it, so this should be longer than the client timeout.
-default_server_timeout = rpki.sundial.timedelta(minutes = 10)
-
-## @var default_http_version
-# Preferred HTTP version.
-default_http_version = (1, 0)
-
-## @var default_tcp_port
-# Default port for clients and servers that don't specify one.
-default_tcp_port = 80
-
-## @var enable_ipv6_servers
-# Whether to enable IPv6 listeners. Enabled by default, as it should
-# be harmless. Has no effect if kernel doesn't support IPv6.
-enable_ipv6_servers = True
-
-## @var enable_ipv6_clients
-# Whether to consider IPv6 addresses when making connections.
-# Disabled by default, as IPv6 connectivity is still a bad joke in
-# far too much of the world.
-enable_ipv6_clients = False
-
-## @var have_ipv6
-# Whether the current machine claims to support IPv6. Note that just
-# because the kernel supports it doesn't mean that the machine has
-# usable IPv6 connectivity. I don't know of a simple portable way to
-# probe for connectivity at runtime (the old test of "can you ping
-# SRI-NIC.ARPA?" seems a bit dated...). Don't set this, it's set
-# automatically by probing using the socket() system call at runtime.
-try:
- # pylint: disable=W0702,W0104
- socket.socket(socket.AF_INET6).close()
- socket.IPPROTO_IPV6
- socket.IPV6_V6ONLY
-except:
- have_ipv6 = False
-else:
- have_ipv6 = True
-
-## @var use_adns
-
-# Whether to use rpki.adns code. This is still experimental, so it's
-# not (yet) enabled by default.
-use_adns = False
-try:
- import rpki.adns
-except ImportError:
- pass
-
-def supported_address_families(enable_ipv6):
- """
- IP address families on which servers should listen, and to consider
- when selecting addresses for client connections.
- """
- if enable_ipv6 and have_ipv6:
- return (socket.AF_INET, socket.AF_INET6)
- else:
- return (socket.AF_INET,)
-
-def localhost_addrinfo():
- """
- Return pseudo-getaddrinfo results for localhost.
- """
- result = [(socket.AF_INET, "127.0.0.1")]
- if enable_ipv6_clients and have_ipv6:
- result.append((socket.AF_INET6, "::1"))
- return result
-
-class http_message(object):
- """
- Virtual class representing of one HTTP message.
- """
-
- software_name = "ISC RPKI library"
-
- def __init__(self, version = None, body = None, headers = None):
- self.version = version
- self.body = body
- self.headers = headers
- self.normalize_headers()
-
- def normalize_headers(self, headers = None):
- """
- Clean up (some of) the horrible messes that HTTP allows in its
- headers.
- """
- if headers is None:
- headers = () if self.headers is None else self.headers.items()
- translate_underscore = True
- else:
- translate_underscore = False
- result = {}
- for k, v in headers:
- if translate_underscore:
- k = k.replace("_", "-")
- k = "-".join(s.capitalize() for s in k.split("-"))
- v = v.strip()
- if k in result:
- result[k] += ", " + v
- else:
- result[k] = v
- self.headers = result
-
- @classmethod
- def parse_from_wire(cls, headers):
- """
- Parse and normalize an incoming HTTP message.
- """
- self = cls()
- headers = headers.split("\r\n")
- self.parse_first_line(*headers.pop(0).split(None, 2))
- for i in xrange(len(headers) - 2, -1, -1):
- if headers[i + 1][0].isspace():
- headers[i] += headers[i + 1]
- del headers[i + 1]
- self.normalize_headers([h.split(":", 1) for h in headers])
- return self
-
- def format(self):
- """
- Format an outgoing HTTP message.
- """
- s = self.format_first_line()
- if self.body is not None:
- assert isinstance(self.body, str)
- self.headers["Content-Length"] = len(self.body)
- for kv in self.headers.iteritems():
- s += "%s: %s\r\n" % kv
- s += "\r\n"
- if self.body is not None:
- s += self.body
- return s
-
- def __str__(self):
- return self.format()
-
- def parse_version(self, version):
- """
- Parse HTTP version, raise an exception if we can't.
- """
- if version[:5] != "HTTP/":
- raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version)
- self.version = tuple(int(i) for i in version[5:].split("."))
-
- @property
- def persistent(self):
- """
- Figure out whether this HTTP message encourages a persistent connection.
- """
- c = self.headers.get("Connection")
- if self.version == (1, 1):
- return c is None or "close" not in c.lower()
- elif self.version == (1, 0):
- return c is not None and "keep-alive" in c.lower()
- else:
- return False
-
-class http_request(http_message):
- """
- HTTP request message.
- """
-
- def __init__(self, cmd = None, path = None, version = default_http_version, body = None, callback = None, errback = None, **headers):
- assert cmd == "POST" or body is None
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.cmd = cmd
- self.path = path
- self.callback = callback
- self.errback = errback
- self.retried = False
-
- def parse_first_line(self, cmd, path, version):
- """
- Parse first line of HTTP request message.
- """
- self.parse_version(version)
- self.cmd = cmd
- self.path = path
-
- def format_first_line(self):
- """
- Format first line of HTTP request message, and set up the
- User-Agent header.
- """
- self.headers.setdefault("User-Agent", self.software_name)
- return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1])
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.cmd, self.path)
-
-class http_response(http_message):
- """
- HTTP response message.
- """
-
- def __init__(self, code = None, reason = None, version = default_http_version, body = None, **headers):
- http_message.__init__(self, version = version, body = body, headers = headers)
- self.code = code
- self.reason = reason
-
- def parse_first_line(self, version, code, reason):
- """
- Parse first line of HTTP response message.
- """
- self.parse_version(version)
- self.code = int(code)
- self.reason = reason
-
- def format_first_line(self):
- """
- Format first line of HTTP response message, and set up Date and
- Server headers.
- """
- self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT"))
- self.headers.setdefault("Server", self.software_name)
- return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason)
-
- def __repr__(self):
- return rpki.log.log_repr(self, self.code, self.reason)
-
-def addr_to_string(addr):
- """
- Convert socket addr tuple to printable string. Assumes 2-element
- tuple is IPv4, 4-element tuple is IPv6, throws TypeError for
- anything else.
- """
-
- if len(addr) == 2:
- return "%s:%d" % (addr[0], addr[1])
- if len(addr) == 4:
- return "%s.%d" % (addr[0], addr[1])
- raise TypeError
-
-@rpki.log.class_logger(logger)
-class http_stream(asynchat.async_chat):
- """
- Virtual class representing an HTTP message stream.
- """
-
- # Keep pylint happy; @class_logger overwrites this.
- logger = None
-
- def __repr__(self):
- status = ["connected"] if self.connected else []
- try:
- status.append(addr_to_string(self.addr))
- except TypeError:
- pass
- return rpki.log.log_repr(self, *status)
-
- def __init__(self, sock = None):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- asynchat.async_chat.__init__(self, sock)
- self.buffer = []
- self.timer = rpki.async.timer(self.handle_timeout)
- self.restart()
-
- def restart(self):
- """
- (Re)start HTTP message parser, reset timer.
- """
- assert not self.buffer
- self.chunk_handler = None
- self.set_terminator("\r\n\r\n")
- self.update_timeout()
-
- def update_timeout(self):
- """
- Put this stream's timer in known good state: set it to the
- stream's timeout value if we're doing timeouts, otherwise clear
- it.
- """
- if self.timeout is not None:
- self.logger.debug("Setting timeout %s", self.timeout)
- self.timer.set(self.timeout)
- else:
- self.logger.debug("Clearing timeout")
- self.timer.cancel()
-
- def collect_incoming_data(self, data):
- """
- Buffer incoming data from asynchat.
- """
- self.buffer.append(data)
- self.update_timeout()
-
- def get_buffer(self):
- """
- Consume data buffered from asynchat.
- """
- val = "".join(self.buffer)
- self.buffer = []
- return val
-
- def found_terminator(self):
- """
- Asynchat reported that it found whatever terminator we set, so
- figure out what to do next. This can be messy, because we can be
- in any of several different states:
-
- @li We might be handling chunked HTTP, in which case we have to
- initialize the chunk decoder;
-
- @li We might have found the end of the message body, in which case
- we can (finally) process it; or
-
- @li We might have just gotten to the end of the message headers,
- in which case we have to parse them to figure out which of three
- separate mechanisms (chunked, content-length, TCP close) is going
- to tell us how to find the end of the message body.
- """
- self.update_timeout()
- if self.chunk_handler:
- self.chunk_handler()
- elif not isinstance(self.get_terminator(), str):
- self.handle_body()
- else:
- self.msg = self.parse_type.parse_from_wire(self.get_buffer())
- if self.msg.version == (1, 1) and "chunked" in self.msg.headers.get("Transfer-Encoding", "").lower():
- self.msg.body = []
- self.chunk_handler = self.chunk_header
- self.set_terminator("\r\n")
- elif "Content-Length" in self.msg.headers:
- self.set_terminator(int(self.msg.headers["Content-Length"]))
- else:
- self.handle_no_content_length()
-
- def chunk_header(self):
- """
- Asynchat just handed us what should be the header of one chunk of
- a chunked encoding stream. If this chunk has a body, set the
- stream up to read it; otherwise, this is the last chunk, so start
- the process of exiting the chunk decoder.
- """
- n = int(self.get_buffer().partition(";")[0], 16)
- self.logger.debug("Chunk length %s", n)
- if n:
- self.chunk_handler = self.chunk_body
- self.set_terminator(n)
- else:
- self.msg.body = "".join(self.msg.body)
- self.chunk_handler = self.chunk_discard_trailer
-
- def chunk_body(self):
- """
- Asynchat just handed us what should be the body of a chunk of the
- body of a chunked message (sic). Save it, and prepare to move on
- to the next chunk.
- """
- self.logger.debug("Chunk body")
- self.msg.body += self.buffer
- self.buffer = []
- self.chunk_handler = self.chunk_discard_crlf
- self.set_terminator("\r\n")
-
- def chunk_discard_crlf(self):
- """
- Consume the CRLF that terminates a chunk, reinitialize chunk
- decoder to be ready for the next chunk.
- """
- self.logger.debug("Chunk CRLF")
- s = self.get_buffer()
- assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s)
- self.chunk_handler = self.chunk_header
-
- def chunk_discard_trailer(self):
- """
- Consume chunk trailer, which should be empty, then (finally!) exit
- the chunk decoder and hand complete message off to the application.
- """
- self.logger.debug("Chunk trailer")
- s = self.get_buffer()
- assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s)
- self.chunk_handler = None
- self.handle_message()
-
- def handle_body(self):
- """
- Hand normal (not chunked) message off to the application.
- """
- self.msg.body = self.get_buffer()
- self.handle_message()
-
- def handle_error(self):
- """
- Asynchat (or asyncore, or somebody) raised an exception. See
- whether it's one we should just pass along, otherwise log a stack
- trace and close the stream.
- """
- self.timer.cancel()
- etype = sys.exc_info()[0]
- if etype in (SystemExit, rpki.async.ExitNow):
- raise
- if etype is not rpki.exceptions.HTTPClientAborted:
- self.logger.exception("Closing due to error")
- self.close()
-
- def handle_timeout(self):
- """
- Inactivity timer expired, close connection with prejudice.
- """
- self.logger.debug("Timeout, closing")
- self.close()
-
- def handle_close(self):
- """
- Wrapper around asynchat connection close handler, so that we can
- log the event, cancel timer, and so forth.
- """
- self.logger.debug("Close event in HTTP stream handler")
- self.timer.cancel()
- asynchat.async_chat.handle_close(self)
-
-@rpki.log.class_logger(logger)
-class http_server(http_stream):
- """
- HTTP server stream.
- """
-
- ## @var parse_type
- # Stream parser should look for incoming HTTP request messages.
- parse_type = http_request
-
- ## @var timeout
- # Use the default server timeout value set in the module header.
- timeout = default_server_timeout
-
- def __init__(self, sock, handlers):
- self.handlers = handlers
- self.received_content_type = None
- http_stream.__init__(self, sock = sock)
- self.expect_close = not want_persistent_server
- self.logger.debug("Starting")
-
- def handle_no_content_length(self):
- """
- Handle an incoming message that used neither chunking nor a
- Content-Length header (that is: this message will be the last one
- in this server stream). No special action required.
- """
- self.handle_message()
-
- def find_handler(self, path):
- """
- Helper method to search self.handlers.
- """
- for h in self.handlers:
- if path.startswith(h[0]):
- return h[1], h[2] if len(h) > 2 else (default_content_type,)
- return None, None
-
- def handle_message(self):
- """
- HTTP layer managed to deliver a complete HTTP request to
- us, figure out what to do with it. Check the command and
- Content-Type, look for a handler, and if everything looks right,
- pass the message body, path, and a reply callback to the handler.
- """
- self.logger.debug("Received request %r", self.msg)
- if not self.msg.persistent:
- self.expect_close = True
- handler, allowed_content_types = self.find_handler(self.msg.path)
- self.received_content_type = self.msg.headers["Content-Type"]
- error = None
- if self.msg.cmd != "POST":
- error = 501, "No handler for method %s" % self.msg.cmd
- elif self.received_content_type not in allowed_content_types:
- error = 415, "No handler for Content-Type %s" % self.received_content_type
- elif handler is None:
- error = 404, "No handler for URL %s" % self.msg.path
- if error is None:
- try:
- handler(self.msg.body, self.msg.path, self.send_reply)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- self.logger.exception("Unhandled exception while handling HTTP request")
- self.send_error(500, reason = "Unhandled exception %s: %s" % (e.__class__.__name__, e))
- else:
- self.send_error(code = error[0], reason = error[1])
-
- def send_error(self, code, reason):
- """
- Send an error response to this request.
- """
- self.send_message(code = code, reason = reason)
-
- def send_reply(self, code, body = None, reason = "OK"):
- """
- Send a reply to this request.
- """
- self.send_message(code = code, body = body, reason = reason)
-
- def send_message(self, code, reason = "OK", body = None):
- """
- Queue up reply message. If both parties agree that connection is
- persistant, and if no error occurred, restart this stream to
- listen for next message; otherwise, queue up a close event for
- this stream so it will shut down once the reply has been sent.
- """
- self.logger.debug("Sending response %s %s", code, reason)
- if code >= 400:
- self.expect_close = True
- msg = http_response(code = code, reason = reason, body = body,
- Content_Type = self.received_content_type,
- Connection = "Close" if self.expect_close else "Keep-Alive")
- self.push(msg.format())
- if self.expect_close:
- self.logger.debug("Closing")
- self.timer.cancel()
- self.close_when_done()
- else:
- self.logger.debug("Listening for next message")
- self.restart()
-
-@rpki.log.class_logger(logger)
-class http_listener(asyncore.dispatcher):
- """
- Listener for incoming HTTP connections.
- """
-
- def __repr__(self):
- try:
- status = (addr_to_string(self.addr),)
- except TypeError:
- status = ()
- return rpki.log.log_repr(self, *status)
-
- def __init__(self, handlers, addrinfo):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- asyncore.dispatcher.__init__(self)
- self.handlers = handlers
- try:
- af, socktype, proto, canonname, sockaddr = addrinfo # pylint: disable=W0612
- self.create_socket(af, socktype)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except AttributeError:
- pass
- if have_ipv6 and af == socket.AF_INET6:
- self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
- self.bind(sockaddr)
- self.listen(5)
- except Exception:
- self.logger.exception("Couldn't set up HTTP listener")
- self.close()
- for h in handlers:
- self.logger.debug("Handling %s", h[0])
-
- def handle_accept(self):
- """
- Asyncore says we have an incoming connection, spawn an http_server
- stream for it and pass along all of our handler data.
- """
- try:
- res = self.accept()
- if res is None:
- raise
- sock, addr = res # pylint: disable=W0633
- self.logger.debug("Accepting connection from %s", addr_to_string(addr))
- http_server(sock = sock, handlers = self.handlers)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.logger.exception("Unable to accept connection")
-
- def handle_error(self):
- """
- Asyncore signaled an error, pass it along or log it.
- """
- if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow):
- raise
- self.logger.exception("Error in HTTP listener")
-
-@rpki.log.class_logger(logger)
-class http_client(http_stream):
- """
- HTTP client stream.
- """
-
- ## @var parse_type
- # Stream parser should look for incoming HTTP response messages.
- parse_type = http_response
-
- ## @var timeout
- # Use the default client timeout value set in the module header.
- timeout = default_client_timeout
-
- ## @var state
- # Application layer connection state.
- state = None
-
- def __init__(self, queue, hostport):
- http_stream.__init__(self)
- self.logger.debug("Creating new connection to %s", addr_to_string(hostport))
- self.queue = queue
- self.host = hostport[0]
- self.port = hostport[1]
- self.set_state("opening")
- self.expect_close = not want_persistent_client
-
- def start(self):
- """
- Create socket and request a connection.
- """
- if not use_adns:
- self.logger.debug("Not using ADNS")
- self.gotaddrinfo([(socket.AF_INET, self.host)])
- elif self.host == "localhost":
- self.logger.debug("Bypassing DNS for localhost")
- self.gotaddrinfo(localhost_addrinfo())
- else:
- families = supported_address_families(enable_ipv6_clients)
- self.logger.debug("Starting ADNS lookup for %s in families %r", self.host, families)
- rpki.adns.getaddrinfo(self.gotaddrinfo, self.dns_error, self.host, families)
-
- def dns_error(self, e):
- """
- Handle DNS lookup errors. For now, just whack the connection.
- Undoubtedly we should do something better with diagnostics here.
- """
- self.handle_error()
-
- def gotaddrinfo(self, addrinfo):
- """
- Got address data from DNS, create socket and request connection.
- """
- try:
- self.af, self.address = random.choice(addrinfo)
- self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address)
- self.create_socket(self.af, socket.SOCK_STREAM)
- self.connect((self.address, self.port))
- if self.addr is None:
- self.addr = (self.host, self.port)
- self.update_timeout()
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.handle_error()
-
- def handle_connect(self):
- """
- Asyncore says socket has connected.
- """
- self.logger.debug("Socket connected")
- self.set_state("idle")
- assert self.queue.client is self
- self.queue.send_request()
-
- def set_state(self, state):
- """
- Set HTTP client connection state.
- """
- self.logger.debug("State transition %s => %s", self.state, state)
- self.state = state
-
- def handle_no_content_length(self):
- """
- Handle response message that used neither chunking nor a
- Content-Length header (that is: this message will be the last one
- in this server stream). In this case we want to read until we
- reach the end of the data stream.
- """
- self.set_terminator(None)
-
- def send_request(self, msg):
- """
- Queue up request message and kickstart connection.
- """
- self.logger.debug("Sending request %r", msg)
- assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state)
- self.set_state("request-sent")
- msg.headers["Connection"] = "Close" if self.expect_close else "Keep-Alive"
- self.push(msg.format())
- self.restart()
-
- def handle_message(self):
- """
- Handle incoming HTTP response message. Make sure we're in a state
- where we expect to see such a message (and allow the mysterious
- empty messages that Apache sends during connection close, no idea
- what that is supposed to be about). If everybody agrees that the
- connection should stay open, put it into an idle state; otherwise,
- arrange for the stream to shut down.
- """
-
- self.logger.debug("Message received, state %s", self.state)
-
- if not self.msg.persistent:
- self.expect_close = True
-
- if self.state != "request-sent":
- if self.state == "closing":
- assert not self.msg.body
- self.logger.debug("Ignoring empty response received while closing")
- return
- raise rpki.exceptions.HTTPUnexpectedState("%r received message while in unexpected state %s" % (self, self.state))
-
- if self.expect_close:
- self.logger.debug("Closing")
- self.set_state("closing")
- self.close_when_done()
- else:
- self.logger.debug("Idling")
- self.set_state("idle")
- self.update_timeout()
-
- if self.msg.code != 200:
- errmsg = "HTTP request failed"
- if self.msg.code is not None:
- errmsg += " with status %s" % self.msg.code
- if self.msg.reason:
- errmsg += ", reason %s" % self.msg.reason
- if self.msg.body:
- errmsg += ", response %s" % self.msg.body
- raise rpki.exceptions.HTTPRequestFailed(errmsg)
- self.queue.return_result(self, self.msg, detach = self.expect_close)
-
- def handle_close(self):
- """
- Asyncore signaled connection close. If we were waiting for that
- to find the end of a response message, process the resulting
- message now; if we were waiting for the response to a request we
- sent, signal the error.
- """
- http_stream.handle_close(self)
- self.logger.debug("State %s", self.state)
- if self.get_terminator() is None:
- self.handle_body()
- elif self.state == "request-sent":
- raise rpki.exceptions.HTTPClientAborted("HTTP request aborted by close event")
- else:
- self.queue.detach(self)
-
- def handle_timeout(self):
- """
- Connection idle timer has expired. Shut down connection in any
- case, noisily if we weren't idle.
- """
- bad = self.state not in ("idle", "closing")
- if bad:
- self.logger.warning("Timeout while in state %s", self.state)
- http_stream.handle_timeout(self)
- if bad:
- try:
- raise rpki.exceptions.HTTPTimeout
- except: # pylint: disable=W0702
- self.handle_error()
- else:
- self.queue.detach(self)
-
- def handle_error(self):
- """
- Asyncore says something threw an exception. Log it, then shut
- down the connection and pass back the exception.
- """
- eclass, edata = sys.exc_info()[0:2]
- self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata)
- http_stream.handle_error(self)
- self.queue.return_result(self, edata, detach = True)
-
-@rpki.log.class_logger(logger)
-class http_queue(object):
- """
- Queue of pending HTTP requests for a single destination. This class
- is very tightly coupled to http_client; http_client handles the HTTP
- stream itself, this class provides a slightly higher-level API.
- """
-
- def __repr__(self):
- return rpki.log.log_repr(self, addr_to_string(self.hostport))
-
- def __init__(self, hostport):
- self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
- self.hostport = hostport
- self.client = None
- self.logger.debug("Created")
- self.queue = []
-
- def request(self, *requests):
- """
- Append http_request object(s) to this queue.
- """
- self.logger.debug("Adding requests %r", requests)
- self.queue.extend(requests)
-
- def restart(self):
- """
- Send next request for this queue, if we can. This may involve
- starting a new http_client stream, reusing an existing idle
- stream, or just ignoring this request if there's an active client
- stream already; in the last case, handling of the response (or
- exception, or timeout) for the query currently in progress will
- call this method when it's time to kick out the next query.
- """
- try:
- if self.client is None:
- self.client = http_client(self, self.hostport)
- self.logger.debug("Attached client %r", self.client)
- self.client.start()
- elif self.client.state == "idle":
- self.logger.debug("Sending request to existing client %r", self.client)
- self.send_request()
- else:
- self.logger.debug("Client %r exists in state %r", self.client, self.client.state)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- self.return_result(self.client, e, detach = True)
-
- def send_request(self):
- """
- Kick out the next query in this queue, if any.
- """
- if self.queue:
- self.client.send_request(self.queue[0])
-
- def detach(self, client_):
- """
- Detatch a client from this queue. Silently ignores attempting to
- detach a client that is not attached to this queue, to simplify
- handling of what otherwise would be a nasty set of race
- conditions.
- """
- if client_ is self.client:
- self.logger.debug("Detaching client %r", client_)
- self.client = None
-
- def return_result(self, client, result, detach = False): # pylint: disable=W0621
- """
- Client stream has returned a result, which we need to pass along
- to the original caller. Result may be either an HTTP response
- message or an exception. In either case, once we're done
- processing this result, kick off next message in the queue, if any.
- """
-
- if client is not self.client:
- self.logger.warning("Wrong client trying to return result. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
- return
-
- if detach:
- self.detach(client)
-
- try:
- req = self.queue.pop(0)
- self.logger.debug("Dequeuing request %r", req)
- except IndexError:
- self.logger.warning("No caller. THIS SHOULD NOT HAPPEN. Dropping result %r", result)
- return
-
- assert isinstance(result, http_response) or isinstance(result, Exception)
-
- if isinstance(result, http_response):
- try:
- self.logger.debug("Returning result %r to caller", result)
- req.callback(result.body)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- result = e
-
- if isinstance(result, Exception):
- try:
- self.logger.warning("Returning exception %r to caller: %s", result, result)
- req.errback(result)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception:
- self.logger.exception("Exception in exception callback, may have lost event chain")
-
- self.logger.debug("Queue: %r", self.queue)
-
- if self.queue:
- self.restart()
-
-## @var client_queues
-# Map of (host, port) tuples to http_queue objects.
-client_queues = {}
-
-def client(msg, url, callback, errback, content_type = default_content_type):
- """
- Open client HTTP connection, send a message, set up callbacks to
- handle response.
- """
-
- u = urlparse.urlparse(url)
-
- if (u.scheme not in ("", "http") or
- u.username is not None or
- u.password is not None or
- u.params != "" or
- u.query != "" or
- u.fragment != ""):
- raise rpki.exceptions.BadClientURL("Unusable URL %s" % url)
-
- logger.debug("Contacting %s", url)
-
- request = http_request(
- cmd = "POST",
- path = u.path,
- body = msg,
- callback = callback,
- errback = errback,
- Host = u.hostname,
- Content_Type = content_type)
-
- hostport = (u.hostname or "localhost", u.port or default_tcp_port)
-
- logger.debug("Created request %r for %s", request, addr_to_string(hostport))
- if hostport not in client_queues:
- client_queues[hostport] = http_queue(hostport)
- client_queues[hostport].request(request)
-
- # Defer connection attempt until after we've had time to process any
- # pending I/O events, in case connections have closed.
-
- logger.debug("Scheduling connection startup for %r", request)
- rpki.async.event_defer(client_queues[hostport].restart)
-
-def server(handlers, port, host = ""):
- """
- Run an HTTP server and wait (forever) for connections.
- """
-
- if not isinstance(handlers, (tuple, list)):
- handlers = (("/", handlers),)
-
- # Yes, this is sick. So is getaddrinfo() returning duplicate
- # records, which RedHat has the gall to claim is a feature.
- ai = []
- for af in supported_address_families(enable_ipv6_servers):
- try:
- if host:
- h = host
- elif have_ipv6 and af == socket.AF_INET6:
- h = "::"
- else:
- h = "0.0.0.0"
- for a in socket.getaddrinfo(h, port, af, socket.SOCK_STREAM):
- if a not in ai:
- ai.append(a)
- except socket.gaierror:
- pass
-
- for a in ai:
- http_listener(addrinfo = a, handlers = handlers)
-
- rpki.async.event_loop()
-
-class caller(object):
- """
- Handle client-side mechanics for protocols based on HTTP, CMS, and
- rpki.xml_utils. Calling sequence is intended to nest within
- rpki.async.sync_wrapper.
- """
-
- debug = False
-
- def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None):
- self.proto = proto
- self.client_key = client_key
- self.client_cert = client_cert
- self.server_ta = server_ta
- self.server_cert = server_cert
- self.url = url
- self.cms_timestamp = None
- if debug is not None:
- self.debug = debug
-
- def __call__(self, cb, eb, *pdus):
-
- def done(r_der):
- """
- Handle CMS-wrapped XML response message.
- """
- try:
- r_cms = self.proto.cms_msg(DER = r_der)
- r_msg = r_cms.unwrap((self.server_ta, self.server_cert))
- self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url)
- if self.debug:
- print "<!-- Reply -->"
- print r_cms.pretty_print_content()
- cb(r_msg)
- except (rpki.async.ExitNow, SystemExit):
- raise
- except Exception, e:
- eb(e)
-
- q_msg = self.proto.msg.query(*pdus)
- q_cms = self.proto.cms_msg()
- q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert)
- if self.debug:
- print "<!-- Query -->"
- print q_cms.pretty_print_content()
-
- client(url = self.url, msg = q_der, callback = done, errback = eb)