diff options
Diffstat (limited to 'rpki/http.py')
-rw-r--r-- | rpki/http.py | 76 |
1 files changed, 60 insertions, 16 deletions
diff --git a/rpki/http.py b/rpki/http.py index 71239c7f..e41b0080 100644 --- a/rpki/http.py +++ b/rpki/http.py @@ -38,10 +38,9 @@ import rpki.POW logger = logging.getLogger(__name__) -## @var default_content_type -# HTTP content type used for RPKI messages. -# Can be overriden on a per-client or per-server basis. -default_content_type = "application/x-rpki" +## @var rpki_content_type +# HTTP content type used for all RPKI messages. +rpki_content_type = "application/x-rpki" ## @var want_persistent_client # Whether we want persistent HTTP client streams, when server also supports them. @@ -113,6 +112,7 @@ def supported_address_families(enable_ipv6): IP address families on which servers should listen, and to consider when selecting addresses for client connections. """ + if enable_ipv6 and have_ipv6: return (socket.AF_INET, socket.AF_INET6) else: @@ -122,6 +122,7 @@ def localhost_addrinfo(): """ Return pseudo-getaddrinfo results for localhost. """ + result = [(socket.AF_INET, "127.0.0.1")] if enable_ipv6_clients and have_ipv6: result.append((socket.AF_INET6, "::1")) @@ -145,6 +146,7 @@ class http_message(object): Clean up (some of) the horrible messes that HTTP allows in its headers. """ + if headers is None: headers = () if self.headers is None else self.headers.items() translate_underscore = True @@ -167,6 +169,7 @@ class http_message(object): """ Parse and normalize an incoming HTTP message. """ + self = cls() headers = headers.split("\r\n") self.parse_first_line(*headers.pop(0).split(None, 2)) @@ -181,6 +184,7 @@ class http_message(object): """ Format an outgoing HTTP message. """ + s = self.format_first_line() if self.body is not None: assert isinstance(self.body, str) @@ -199,6 +203,7 @@ class http_message(object): """ Parse HTTP version, raise an exception if we can't. """ + if version[:5] != "HTTP/": raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version) self.version = tuple(int(i) for i in version[5:].split(".")) @@ -208,6 +213,7 @@ class http_message(object): """ Figure out whether this HTTP message encourages a persistent connection. """ + c = self.headers.get("Connection") if self.version == (1, 1): return c is None or "close" not in c.lower() @@ -234,6 +240,7 @@ class http_request(http_message): """ Parse first line of HTTP request message. """ + self.parse_version(version) self.cmd = cmd self.path = path @@ -243,6 +250,7 @@ class http_request(http_message): Format first line of HTTP request message, and set up the User-Agent header. """ + self.headers.setdefault("User-Agent", self.software_name) return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) @@ -263,6 +271,7 @@ class http_response(http_message): """ Parse first line of HTTP response message. """ + self.parse_version(version) self.code = int(code) self.reason = reason @@ -272,6 +281,7 @@ class http_response(http_message): Format first line of HTTP response message, and set up Date and Server headers. """ + self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) self.headers.setdefault("Server", self.software_name) return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) @@ -320,6 +330,7 @@ class http_stream(asynchat.async_chat): """ (Re)start HTTP message parser, reset timer. """ + assert not self.buffer self.chunk_handler = None self.set_terminator("\r\n\r\n") @@ -331,6 +342,7 @@ class http_stream(asynchat.async_chat): stream's timeout value if we're doing timeouts, otherwise clear it. """ + if self.timeout is not None: self.logger.debug("Setting timeout %s", self.timeout) self.timer.set(self.timeout) @@ -342,6 +354,7 @@ class http_stream(asynchat.async_chat): """ Buffer incoming data from asynchat. """ + self.buffer.append(data) self.update_timeout() @@ -349,6 +362,7 @@ class http_stream(asynchat.async_chat): """ Consume data buffered from asynchat. """ + val = "".join(self.buffer) self.buffer = [] return val @@ -370,6 +384,7 @@ class http_stream(asynchat.async_chat): separate mechanisms (chunked, content-length, TCP close) is going to tell us how to find the end of the message body. """ + self.update_timeout() if self.chunk_handler: self.chunk_handler() @@ -393,6 +408,7 @@ class http_stream(asynchat.async_chat): stream up to read it; otherwise, this is the last chunk, so start the process of exiting the chunk decoder. """ + n = int(self.get_buffer().partition(";")[0], 16) self.logger.debug("Chunk length %s", n) if n: @@ -408,6 +424,7 @@ class http_stream(asynchat.async_chat): body of a chunked message (sic). Save it, and prepare to move on to the next chunk. """ + self.logger.debug("Chunk body") self.msg.body += self.buffer self.buffer = [] @@ -419,6 +436,7 @@ class http_stream(asynchat.async_chat): Consume the CRLF that terminates a chunk, reinitialize chunk decoder to be ready for the next chunk. """ + self.logger.debug("Chunk CRLF") s = self.get_buffer() assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) @@ -429,6 +447,7 @@ class http_stream(asynchat.async_chat): Consume chunk trailer, which should be empty, then (finally!) exit the chunk decoder and hand complete message off to the application. """ + self.logger.debug("Chunk trailer") s = self.get_buffer() assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) @@ -439,6 +458,7 @@ class http_stream(asynchat.async_chat): """ Hand normal (not chunked) message off to the application. """ + self.msg.body = self.get_buffer() self.handle_message() @@ -448,6 +468,7 @@ class http_stream(asynchat.async_chat): whether it's one we should just pass along, otherwise log a stack trace and close the stream. """ + self.timer.cancel() etype = sys.exc_info()[0] if etype in (SystemExit, rpki.async.ExitNow): @@ -460,6 +481,7 @@ class http_stream(asynchat.async_chat): """ Inactivity timer expired, close connection with prejudice. """ + self.logger.debug("Timeout, closing") self.close() @@ -468,6 +490,7 @@ class http_stream(asynchat.async_chat): Wrapper around asynchat connection close handler, so that we can log the event, cancel timer, and so forth. """ + self.logger.debug("Close event in HTTP stream handler") self.timer.cancel() asynchat.async_chat.handle_close(self) @@ -488,7 +511,6 @@ class http_server(http_stream): def __init__(self, sock, handlers): self.handlers = handlers - self.received_content_type = None http_stream.__init__(self, sock = sock) self.expect_close = not want_persistent_server self.logger.debug("Starting") @@ -499,16 +521,18 @@ class http_server(http_stream): Content-Length header (that is: this message will be the last one in this server stream). No special action required. """ + self.handle_message() def find_handler(self, path): """ Helper method to search self.handlers. """ - for h in self.handlers: - if path.startswith(h[0]): - return h[1], h[2] if len(h) > 2 else (default_content_type,) - return None, None + + for s, h in self.handlers: + if path.startswith(s): + return h + return None def handle_message(self): """ @@ -517,16 +541,16 @@ class http_server(http_stream): Content-Type, look for a handler, and if everything looks right, pass the message body, path, and a reply callback to the handler. """ + self.logger.debug("Received request %r", self.msg) if not self.msg.persistent: self.expect_close = True - handler, allowed_content_types = self.find_handler(self.msg.path) - self.received_content_type = self.msg.headers["Content-Type"] + handler = self.find_handler(self.msg.path) error = None if self.msg.cmd != "POST": error = 501, "No handler for method %s" % self.msg.cmd - elif self.received_content_type not in allowed_content_types: - error = 415, "No handler for Content-Type %s" % self.received_content_type + elif self.msg.headers["Content-Type"] != rpki_content_type: + error = 415, "No handler for Content-Type %s" % self.headers["Content-Type"] elif handler is None: error = 404, "No handler for URL %s" % self.msg.path if error is None: @@ -544,12 +568,14 @@ class http_server(http_stream): """ Send an error response to this request. """ + self.send_message(code = code, reason = reason) def send_reply(self, code, body = None, reason = "OK"): """ Send a reply to this request. """ + self.send_message(code = code, body = body, reason = reason) def send_message(self, code, reason = "OK", body = None): @@ -559,11 +585,12 @@ class http_server(http_stream): listen for next message; otherwise, queue up a close event for this stream so it will shut down once the reply has been sent. """ + self.logger.debug("Sending response %s %s", code, reason) if code >= 400: self.expect_close = True msg = http_response(code = code, reason = reason, body = body, - Content_Type = self.received_content_type, + Content_Type = rpki_content_type, Connection = "Close" if self.expect_close else "Keep-Alive") self.push(msg.format()) if self.expect_close: @@ -614,6 +641,7 @@ class http_listener(asyncore.dispatcher): Asyncore says we have an incoming connection, spawn an http_server stream for it and pass along all of our handler data. """ + try: res = self.accept() if res is None: @@ -630,6 +658,7 @@ class http_listener(asyncore.dispatcher): """ Asyncore signaled an error, pass it along or log it. """ + if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): raise self.logger.exception("Error in HTTP listener") @@ -665,6 +694,7 @@ class http_client(http_stream): """ Create socket and request a connection. """ + if not use_adns: self.logger.debug("Not using ADNS") self.gotaddrinfo([(socket.AF_INET, self.host)]) @@ -681,12 +711,14 @@ class http_client(http_stream): Handle DNS lookup errors. For now, just whack the connection. Undoubtedly we should do something better with diagnostics here. """ + self.handle_error() def gotaddrinfo(self, addrinfo): """ Got address data from DNS, create socket and request connection. """ + try: self.af, self.address = random.choice(addrinfo) self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address) @@ -704,6 +736,7 @@ class http_client(http_stream): """ Asyncore says socket has connected. """ + self.logger.debug("Socket connected") self.set_state("idle") assert self.queue.client is self @@ -713,6 +746,7 @@ class http_client(http_stream): """ Set HTTP client connection state. """ + self.logger.debug("State transition %s => %s", self.state, state) self.state = state @@ -723,12 +757,14 @@ class http_client(http_stream): in this server stream). In this case we want to read until we reach the end of the data stream. """ + self.set_terminator(None) def send_request(self, msg): """ Queue up request message and kickstart connection. """ + self.logger.debug("Sending request %r", msg) assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) self.set_state("request-sent") @@ -785,6 +821,7 @@ class http_client(http_stream): message now; if we were waiting for the response to a request we sent, signal the error. """ + http_stream.handle_close(self) self.logger.debug("State %s", self.state) if self.get_terminator() is None: @@ -799,6 +836,7 @@ class http_client(http_stream): Connection idle timer has expired. Shut down connection in any case, noisily if we weren't idle. """ + bad = self.state not in ("idle", "closing") if bad: self.logger.warning("Timeout while in state %s", self.state) @@ -816,6 +854,7 @@ class http_client(http_stream): Asyncore says something threw an exception. Log it, then shut down the connection and pass back the exception. """ + eclass, edata = sys.exc_info()[0:2] self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata) http_stream.handle_error(self) @@ -843,6 +882,7 @@ class http_queue(object): """ Append http_request object(s) to this queue. """ + self.logger.debug("Adding requests %r", requests) self.queue.extend(requests) @@ -855,6 +895,7 @@ class http_queue(object): exception, or timeout) for the query currently in progress will call this method when it's time to kick out the next query. """ + try: if self.client is None: self.client = http_client(self, self.hostport) @@ -874,6 +915,7 @@ class http_queue(object): """ Kick out the next query in this queue, if any. """ + if self.queue: self.client.send_request(self.queue[0]) @@ -884,6 +926,7 @@ class http_queue(object): handling of what otherwise would be a nasty set of race conditions. """ + if client_ is self.client: self.logger.debug("Detaching client %r", client_) self.client = None @@ -939,7 +982,7 @@ class http_queue(object): # Map of (host, port) tuples to http_queue objects. client_queues = {} -def client(msg, url, callback, errback, content_type = default_content_type): +def client(msg, url, callback, errback): """ Open client HTTP connection, send a message, set up callbacks to handle response. @@ -964,7 +1007,7 @@ def client(msg, url, callback, errback, content_type = default_content_type): callback = callback, errback = errback, Host = u.hostname, - Content_Type = content_type) + Content_Type = rpki_content_type) hostport = (u.hostname or "localhost", u.port or default_tcp_port) @@ -1035,6 +1078,7 @@ class caller(object): """ Handle CMS-wrapped XML response message. """ + try: r_cms = self.proto.cms_msg(DER = r_der) r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) |