diff options
Diffstat (limited to 'rpki/http.py')
-rw-r--r-- | rpki/http.py | 94 |
1 files changed, 46 insertions, 48 deletions
diff --git a/rpki/http.py b/rpki/http.py index 71239c7f..16ed0453 100644 --- a/rpki/http.py +++ b/rpki/http.py @@ -113,6 +113,7 @@ def supported_address_families(enable_ipv6): IP address families on which servers should listen, and to consider when selecting addresses for client connections. """ + if enable_ipv6 and have_ipv6: return (socket.AF_INET, socket.AF_INET6) else: @@ -122,6 +123,7 @@ def localhost_addrinfo(): """ Return pseudo-getaddrinfo results for localhost. """ + result = [(socket.AF_INET, "127.0.0.1")] if enable_ipv6_clients and have_ipv6: result.append((socket.AF_INET6, "::1")) @@ -145,6 +147,7 @@ class http_message(object): Clean up (some of) the horrible messes that HTTP allows in its headers. """ + if headers is None: headers = () if self.headers is None else self.headers.items() translate_underscore = True @@ -167,6 +170,7 @@ class http_message(object): """ Parse and normalize an incoming HTTP message. """ + self = cls() headers = headers.split("\r\n") self.parse_first_line(*headers.pop(0).split(None, 2)) @@ -181,6 +185,7 @@ class http_message(object): """ Format an outgoing HTTP message. """ + s = self.format_first_line() if self.body is not None: assert isinstance(self.body, str) @@ -199,6 +204,7 @@ class http_message(object): """ Parse HTTP version, raise an exception if we can't. """ + if version[:5] != "HTTP/": raise rpki.exceptions.HTTPBadVersion("Couldn't parse version %s" % version) self.version = tuple(int(i) for i in version[5:].split(".")) @@ -208,6 +214,7 @@ class http_message(object): """ Figure out whether this HTTP message encourages a persistent connection. """ + c = self.headers.get("Connection") if self.version == (1, 1): return c is None or "close" not in c.lower() @@ -234,6 +241,7 @@ class http_request(http_message): """ Parse first line of HTTP request message. """ + self.parse_version(version) self.cmd = cmd self.path = path @@ -243,6 +251,7 @@ class http_request(http_message): Format first line of HTTP request message, and set up the User-Agent header. """ + self.headers.setdefault("User-Agent", self.software_name) return "%s %s HTTP/%d.%d\r\n" % (self.cmd, self.path, self.version[0], self.version[1]) @@ -263,6 +272,7 @@ class http_response(http_message): """ Parse first line of HTTP response message. """ + self.parse_version(version) self.code = int(code) self.reason = reason @@ -272,6 +282,7 @@ class http_response(http_message): Format first line of HTTP response message, and set up Date and Server headers. """ + self.headers.setdefault("Date", time.strftime("%a, %d %b %Y %T GMT")) self.headers.setdefault("Server", self.software_name) return "HTTP/%d.%d %s %s\r\n" % (self.version[0], self.version[1], self.code, self.reason) @@ -320,6 +331,7 @@ class http_stream(asynchat.async_chat): """ (Re)start HTTP message parser, reset timer. """ + assert not self.buffer self.chunk_handler = None self.set_terminator("\r\n\r\n") @@ -331,6 +343,7 @@ class http_stream(asynchat.async_chat): stream's timeout value if we're doing timeouts, otherwise clear it. """ + if self.timeout is not None: self.logger.debug("Setting timeout %s", self.timeout) self.timer.set(self.timeout) @@ -342,6 +355,7 @@ class http_stream(asynchat.async_chat): """ Buffer incoming data from asynchat. """ + self.buffer.append(data) self.update_timeout() @@ -349,6 +363,7 @@ class http_stream(asynchat.async_chat): """ Consume data buffered from asynchat. """ + val = "".join(self.buffer) self.buffer = [] return val @@ -370,6 +385,7 @@ class http_stream(asynchat.async_chat): separate mechanisms (chunked, content-length, TCP close) is going to tell us how to find the end of the message body. """ + self.update_timeout() if self.chunk_handler: self.chunk_handler() @@ -393,6 +409,7 @@ class http_stream(asynchat.async_chat): stream up to read it; otherwise, this is the last chunk, so start the process of exiting the chunk decoder. """ + n = int(self.get_buffer().partition(";")[0], 16) self.logger.debug("Chunk length %s", n) if n: @@ -408,6 +425,7 @@ class http_stream(asynchat.async_chat): body of a chunked message (sic). Save it, and prepare to move on to the next chunk. """ + self.logger.debug("Chunk body") self.msg.body += self.buffer self.buffer = [] @@ -419,6 +437,7 @@ class http_stream(asynchat.async_chat): Consume the CRLF that terminates a chunk, reinitialize chunk decoder to be ready for the next chunk. """ + self.logger.debug("Chunk CRLF") s = self.get_buffer() assert s == "", "%r: Expected chunk CRLF, got '%s'" % (self, s) @@ -429,6 +448,7 @@ class http_stream(asynchat.async_chat): Consume chunk trailer, which should be empty, then (finally!) exit the chunk decoder and hand complete message off to the application. """ + self.logger.debug("Chunk trailer") s = self.get_buffer() assert s == "", "%r: Expected end of chunk trailers, got '%s'" % (self, s) @@ -439,6 +459,7 @@ class http_stream(asynchat.async_chat): """ Hand normal (not chunked) message off to the application. """ + self.msg.body = self.get_buffer() self.handle_message() @@ -448,6 +469,7 @@ class http_stream(asynchat.async_chat): whether it's one we should just pass along, otherwise log a stack trace and close the stream. """ + self.timer.cancel() etype = sys.exc_info()[0] if etype in (SystemExit, rpki.async.ExitNow): @@ -460,6 +482,7 @@ class http_stream(asynchat.async_chat): """ Inactivity timer expired, close connection with prejudice. """ + self.logger.debug("Timeout, closing") self.close() @@ -468,6 +491,7 @@ class http_stream(asynchat.async_chat): Wrapper around asynchat connection close handler, so that we can log the event, cancel timer, and so forth. """ + self.logger.debug("Close event in HTTP stream handler") self.timer.cancel() asynchat.async_chat.handle_close(self) @@ -499,12 +523,14 @@ class http_server(http_stream): Content-Length header (that is: this message will be the last one in this server stream). No special action required. """ + self.handle_message() def find_handler(self, path): """ Helper method to search self.handlers. """ + for h in self.handlers: if path.startswith(h[0]): return h[1], h[2] if len(h) > 2 else (default_content_type,) @@ -517,6 +543,7 @@ class http_server(http_stream): Content-Type, look for a handler, and if everything looks right, pass the message body, path, and a reply callback to the handler. """ + self.logger.debug("Received request %r", self.msg) if not self.msg.persistent: self.expect_close = True @@ -544,12 +571,14 @@ class http_server(http_stream): """ Send an error response to this request. """ + self.send_message(code = code, reason = reason) def send_reply(self, code, body = None, reason = "OK"): """ Send a reply to this request. """ + self.send_message(code = code, body = body, reason = reason) def send_message(self, code, reason = "OK", body = None): @@ -559,6 +588,7 @@ class http_server(http_stream): listen for next message; otherwise, queue up a close event for this stream so it will shut down once the reply has been sent. """ + self.logger.debug("Sending response %s %s", code, reason) if code >= 400: self.expect_close = True @@ -614,6 +644,7 @@ class http_listener(asyncore.dispatcher): Asyncore says we have an incoming connection, spawn an http_server stream for it and pass along all of our handler data. """ + try: res = self.accept() if res is None: @@ -630,6 +661,7 @@ class http_listener(asyncore.dispatcher): """ Asyncore signaled an error, pass it along or log it. """ + if sys.exc_info()[0] in (SystemExit, rpki.async.ExitNow): raise self.logger.exception("Error in HTTP listener") @@ -665,6 +697,7 @@ class http_client(http_stream): """ Create socket and request a connection. """ + if not use_adns: self.logger.debug("Not using ADNS") self.gotaddrinfo([(socket.AF_INET, self.host)]) @@ -681,12 +714,14 @@ class http_client(http_stream): Handle DNS lookup errors. For now, just whack the connection. Undoubtedly we should do something better with diagnostics here. """ + self.handle_error() def gotaddrinfo(self, addrinfo): """ Got address data from DNS, create socket and request connection. """ + try: self.af, self.address = random.choice(addrinfo) self.logger.debug("Connecting to AF %s host %s port %s addr %s", self.af, self.host, self.port, self.address) @@ -704,6 +739,7 @@ class http_client(http_stream): """ Asyncore says socket has connected. """ + self.logger.debug("Socket connected") self.set_state("idle") assert self.queue.client is self @@ -713,6 +749,7 @@ class http_client(http_stream): """ Set HTTP client connection state. """ + self.logger.debug("State transition %s => %s", self.state, state) self.state = state @@ -723,12 +760,14 @@ class http_client(http_stream): in this server stream). In this case we want to read until we reach the end of the data stream. """ + self.set_terminator(None) def send_request(self, msg): """ Queue up request message and kickstart connection. """ + self.logger.debug("Sending request %r", msg) assert self.state == "idle", "%r: state should be idle, is %s" % (self, self.state) self.set_state("request-sent") @@ -785,6 +824,7 @@ class http_client(http_stream): message now; if we were waiting for the response to a request we sent, signal the error. """ + http_stream.handle_close(self) self.logger.debug("State %s", self.state) if self.get_terminator() is None: @@ -799,6 +839,7 @@ class http_client(http_stream): Connection idle timer has expired. Shut down connection in any case, noisily if we weren't idle. """ + bad = self.state not in ("idle", "closing") if bad: self.logger.warning("Timeout while in state %s", self.state) @@ -816,6 +857,7 @@ class http_client(http_stream): Asyncore says something threw an exception. Log it, then shut down the connection and pass back the exception. """ + eclass, edata = sys.exc_info()[0:2] self.logger.warning("Error on HTTP client connection %s:%s %s %s", self.host, self.port, eclass, edata) http_stream.handle_error(self) @@ -843,6 +885,7 @@ class http_queue(object): """ Append http_request object(s) to this queue. """ + self.logger.debug("Adding requests %r", requests) self.queue.extend(requests) @@ -855,6 +898,7 @@ class http_queue(object): exception, or timeout) for the query currently in progress will call this method when it's time to kick out the next query. """ + try: if self.client is None: self.client = http_client(self, self.hostport) @@ -874,6 +918,7 @@ class http_queue(object): """ Kick out the next query in this queue, if any. """ + if self.queue: self.client.send_request(self.queue[0]) @@ -884,6 +929,7 @@ class http_queue(object): handling of what otherwise would be a nasty set of race conditions. """ + if client_ is self.client: self.logger.debug("Detaching client %r", client_) self.client = None @@ -1008,51 +1054,3 @@ def server(handlers, port, host = ""): http_listener(addrinfo = a, handlers = handlers) rpki.async.event_loop() - -class caller(object): - """ - Handle client-side mechanics for protocols based on HTTP, CMS, and - rpki.xml_utils. Calling sequence is intended to nest within - rpki.async.sync_wrapper. - """ - - debug = False - - def __init__(self, proto, client_key, client_cert, server_ta, server_cert, url, debug = None): - self.proto = proto - self.client_key = client_key - self.client_cert = client_cert - self.server_ta = server_ta - self.server_cert = server_cert - self.url = url - self.cms_timestamp = None - if debug is not None: - self.debug = debug - - def __call__(self, cb, eb, *pdus): - - def done(r_der): - """ - Handle CMS-wrapped XML response message. - """ - try: - r_cms = self.proto.cms_msg(DER = r_der) - r_msg = r_cms.unwrap((self.server_ta, self.server_cert)) - self.cms_timestamp = r_cms.check_replay(self.cms_timestamp, self.url) - if self.debug: - print "<!-- Reply -->" - print r_cms.pretty_print_content() - cb(r_msg) - except (rpki.async.ExitNow, SystemExit): - raise - except Exception, e: - eb(e) - - q_msg = self.proto.msg.query(*pdus) - q_cms = self.proto.cms_msg() - q_der = q_cms.wrap(q_msg, self.client_key, self.client_cert) - if self.debug: - print "<!-- Query -->" - print q_cms.pretty_print_content() - - client(url = self.url, msg = q_der, callback = done, errback = eb) |